具有指定结果的任务并行库WaitAny
我正在尝试编写一些代码,这些代码将使Web服务并行调用多个不同的服务器,因此TPL似乎是一个明显的选择。
只有我的一个Web服务调用将返回我想要的结果,而其他所有调用都不会。 我正在尝试找到一种有效地拥有Task.WaitAny
的方法,但只有在匹配条件的第一个Task
返回时才会解锁。
我尝试过WaitAny
但无法确定filter的位置。 我到目前为止:
public void SearchServers() { var servers = new[] {"server1", "server2", "server3", "server4"}; var tasks = servers .Select(s => Task.Factory.StartNew(server => CallServer((string)server), s)) .ToArray(); Task.WaitAny(tasks); //how do I say "WaitAny where the result is true"? //Omitted: cancel any outstanding tasks since the correct server has been found } private bool CallServer(string server) { //... make the call to the server and return the result ... }
编辑 :快速澄清,以防上面有任何混淆。 我正在尝试执行以下操作:
- 对于每个服务器,启动一个
Task
来检查它 - 要么等到服务器返回true(只有最多1个服务器将返回true)
- 或者,等到所有服务器都返回false,即没有匹配。
我能想到的最好的是为每个Task
指定一个ContinueWith
,检查结果,如果是,则取消其他任务。 要取消任务,您可能需要使用CancellationToken 。
var tasks = servers .Select(s => Task.Run(...) .ContinueWith(t => if (t.Result) { // cancel other threads } ) ).ToArray();
更新:替代解决方案将是WaitAny
直到正确的任务完成(但它有一些缺点,例如从列表中删除已完成的任务并从剩余的任务中创建一个新arrays是一个非常繁重的操作):
List> tasks = servers.Select(s => Task.Factory.StartNew(server => CallServer((string)server), s)).ToList(); bool result; do { int idx = Task.WaitAny(tasks.ToArray()); result = tasks[idx].Result; tasks.RemoveAt(idx); } while (!result && tasks.Count > 0); // cancel other tasks
更新2:现在我会用Rx做到这一点:
[Fact] public async Task AwaitFirst() { var servers = new[] { "server1", "server2", "server3", "server4" }; var server = await servers .Select(s => Observable .FromAsync(ct => CallServer(s, ct)) .Where(p => p) .Select(_ => s) ) .Merge() .FirstAsync(); output.WriteLine($"Got result from {server}"); } private async Task CallServer(string server, CancellationToken ct) { try { if (server == "server1") { await Task.Delay(TimeSpan.FromSeconds(1), ct); output.WriteLine($"{server} finished"); return false; } if (server == "server2") { await Task.Delay(TimeSpan.FromSeconds(2), ct); output.WriteLine($"{server} finished"); return false; } if (server == "server3") { await Task.Delay(TimeSpan.FromSeconds(3), ct); output.WriteLine($"{server} finished"); return true; } if (server == "server4") { await Task.Delay(TimeSpan.FromSeconds(4), ct); output.WriteLine($"{server} finished"); return true; } } catch(OperationCanceledException) { output.WriteLine($"{server} Cancelled"); throw; } throw new ArgumentOutOfRangeException(nameof(server)); }
我的机器上的测试需要3.32秒(这意味着它没有等待第4台服务器),我得到了以下输出:
server1 finished server2 finished server3 finished server4 Cancelled Got result from server3
您可以使用OrderByCompletion()
库中的OrderByCompletion()
,它在完成任务时返回任务。 您的代码可能类似于:
var tasks = servers .Select(s => Task.Factory.StartNew(server => CallServer((string)server), s)) .OrderByCompletion(); foreach (var task in tasks) { if (task.Result) { Console.WriteLine("found"); break; } Console.WriteLine("not found yet"); } // cancel any outstanding tasks since the correct server has been found
使用Interlocked.CompareExchange就可以做到这一点,只有一个Task能够在serverReturedData上写入
public void SearchServers() { ResultClass serverReturnedData = null; var servers = new[] {"server1", "server2", "server3", "server4"}; var tasks = servers.Select(s => Task.Factory.StartNew(server => { var result = CallServer((string)server), s); Interlocked.CompareExchange(ref serverReturnedData, result, null); }).ToArray(); Task.WaitAny(tasks); //how do I say "WaitAny where the result is true"? // // use serverReturnedData as you want. // }
编辑:正如Jasd所说,上面的代码可以在变量serverReturnedData具有有效值之前返回(如果服务器返回空值,则可能发生这种情况),以确保您可以将结果包装在自定义对象中。
这是基于svick答案的通用解决方案:
public static async Task GetFirstResult ( this IEnumerable>> taskFactories, Action exceptionHandler, Predicate predicate) { T ret = default(T); var cts = new CancellationTokenSource(); var proxified = taskFactories.Select(tf => tf(cts.Token)).ProxifyByCompletion(); int i; for (i = 0; i < proxified.Length; i++) { try { ret = await proxified[i].ConfigureAwait(false); } catch (Exception e) { exceptionHandler(e); continue; } if (predicate(ret)) { break; } } if (i == proxified.Length) { throw new InvalidOperationException("No task returned the expected value"); } cts.Cancel(); //we have our value, so we can cancel the rest of the tasks for (int j = i+1; j < proxified.Length; j++) { //observe remaining tasks to prevent process crash proxified[j].ContinueWith( t => exceptionHandler(t.Exception), TaskContinuationOptions.OnlyOnFaulted) .Forget(); } return ret; }
将ProxifyByCompletion
实现为:
public static Task[] ProxifyByCompletion (this IEnumerable> tasks) { var inputTasks = tasks.ToArray(); var buckets = new TaskCompletionSource [inputTasks.Length]; var results = new Task [inputTasks.Length]; for (int i = 0; i < buckets.Length; i++) { buckets[i] = new TaskCompletionSource (); results[i] = buckets[i].Task; } int nextTaskIndex = -1; foreach (var inputTask in inputTasks) { inputTask.ContinueWith(completed => { var bucket = buckets[Interlocked.Increment(ref nextTaskIndex)]; if (completed.IsFaulted) { Trace.Assert(completed.Exception != null); bucket.TrySetException(completed.Exception.InnerExceptions); } else if (completed.IsCanceled) { bucket.TrySetCanceled(); } else { bucket.TrySetResult(completed.Result); } }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); } return results; }
而Forget
是一种压制CS4014的空方法:
public static void Forget(this Task task) //suppress CS4014 { }