具有指定结果的任务并行库WaitAny

我正在尝试编写一些代码,这些代码将使Web服务并行调用多个不同的服务器,因此TPL似乎是一个明显的选择。

只有我的一个Web服务调用将返回我想要的结果,而其他所有调用都不会。 我正在尝试找到一种有效地拥有Task.WaitAny的方法,但只有在匹配条件的第一个Task返回时才会解锁。

我尝试过WaitAny但无法确定filter的位置。 我到目前为止:

 public void SearchServers() { var servers = new[] {"server1", "server2", "server3", "server4"}; var tasks = servers .Select(s => Task.Factory.StartNew(server => CallServer((string)server), s)) .ToArray(); Task.WaitAny(tasks); //how do I say "WaitAny where the result is true"? //Omitted: cancel any outstanding tasks since the correct server has been found } private bool CallServer(string server) { //... make the call to the server and return the result ... } 

编辑 :快速澄清,以防上面有任何混淆。 我正在尝试执行以下操作:

  1. 对于每个服务器,启动一个Task来检查它
  2. 要么等到服务器返回true(只有最多1个服务器将返回true)
  3. 或者,等到所有服务器都返回false,即没有匹配。

我能想到的最好的是为每个Task指定一个ContinueWith ,检查结果,如果是,则取消其他任务。 要取消任务,您可能需要使用CancellationToken 。

 var tasks = servers .Select(s => Task.Run(...) .ContinueWith(t => if (t.Result) { // cancel other threads } ) ).ToArray(); 

更新:替代解决方案将是WaitAny直到正确的任务完成(但它有一些缺点,例如从列表中删除已完成的任务并从剩余的任务中创建一个新arrays是一个非常繁重的操作):

 List> tasks = servers.Select(s => Task.Factory.StartNew(server => CallServer((string)server), s)).ToList(); bool result; do { int idx = Task.WaitAny(tasks.ToArray()); result = tasks[idx].Result; tasks.RemoveAt(idx); } while (!result && tasks.Count > 0); // cancel other tasks 

更新2:现在我会用Rx做到这一点:

 [Fact] public async Task AwaitFirst() { var servers = new[] { "server1", "server2", "server3", "server4" }; var server = await servers .Select(s => Observable .FromAsync(ct => CallServer(s, ct)) .Where(p => p) .Select(_ => s) ) .Merge() .FirstAsync(); output.WriteLine($"Got result from {server}"); } private async Task CallServer(string server, CancellationToken ct) { try { if (server == "server1") { await Task.Delay(TimeSpan.FromSeconds(1), ct); output.WriteLine($"{server} finished"); return false; } if (server == "server2") { await Task.Delay(TimeSpan.FromSeconds(2), ct); output.WriteLine($"{server} finished"); return false; } if (server == "server3") { await Task.Delay(TimeSpan.FromSeconds(3), ct); output.WriteLine($"{server} finished"); return true; } if (server == "server4") { await Task.Delay(TimeSpan.FromSeconds(4), ct); output.WriteLine($"{server} finished"); return true; } } catch(OperationCanceledException) { output.WriteLine($"{server} Cancelled"); throw; } throw new ArgumentOutOfRangeException(nameof(server)); } 

我的机器上的测试需要3.32秒(这意味着它没有等待第4台服务器),我得到了以下输出:

 server1 finished server2 finished server3 finished server4 Cancelled Got result from server3 

您可以使用OrderByCompletion()库中的OrderByCompletion() ,它在完成任务时返回任务。 您的代码可能类似于:

 var tasks = servers .Select(s => Task.Factory.StartNew(server => CallServer((string)server), s)) .OrderByCompletion(); foreach (var task in tasks) { if (task.Result) { Console.WriteLine("found"); break; } Console.WriteLine("not found yet"); } // cancel any outstanding tasks since the correct server has been found 

使用Interlocked.CompareExchange就可以做到这一点,只有一个Task能够在serverReturedData上写入

  public void SearchServers() { ResultClass serverReturnedData = null; var servers = new[] {"server1", "server2", "server3", "server4"}; var tasks = servers.Select(s => Task.Factory.StartNew(server => { var result = CallServer((string)server), s); Interlocked.CompareExchange(ref serverReturnedData, result, null); }).ToArray(); Task.WaitAny(tasks); //how do I say "WaitAny where the result is true"? // // use serverReturnedData as you want. // } 

编辑:正如Jasd所说,上面的代码可以在变量serverReturnedData具有有效值之前返回(如果服务器返回空值,则可能发生这种情况),以确保您可以将结果包装在自定义对象中。

这是基于svick答案的通用解决方案:

 public static async Task GetFirstResult( this IEnumerable>> taskFactories, Action exceptionHandler, Predicate predicate) { T ret = default(T); var cts = new CancellationTokenSource(); var proxified = taskFactories.Select(tf => tf(cts.Token)).ProxifyByCompletion(); int i; for (i = 0; i < proxified.Length; i++) { try { ret = await proxified[i].ConfigureAwait(false); } catch (Exception e) { exceptionHandler(e); continue; } if (predicate(ret)) { break; } } if (i == proxified.Length) { throw new InvalidOperationException("No task returned the expected value"); } cts.Cancel(); //we have our value, so we can cancel the rest of the tasks for (int j = i+1; j < proxified.Length; j++) { //observe remaining tasks to prevent process crash proxified[j].ContinueWith( t => exceptionHandler(t.Exception), TaskContinuationOptions.OnlyOnFaulted) .Forget(); } return ret; } 

ProxifyByCompletion实现为:

 public static Task[] ProxifyByCompletion(this IEnumerable> tasks) { var inputTasks = tasks.ToArray(); var buckets = new TaskCompletionSource[inputTasks.Length]; var results = new Task[inputTasks.Length]; for (int i = 0; i < buckets.Length; i++) { buckets[i] = new TaskCompletionSource(); results[i] = buckets[i].Task; } int nextTaskIndex = -1; foreach (var inputTask in inputTasks) { inputTask.ContinueWith(completed => { var bucket = buckets[Interlocked.Increment(ref nextTaskIndex)]; if (completed.IsFaulted) { Trace.Assert(completed.Exception != null); bucket.TrySetException(completed.Exception.InnerExceptions); } else if (completed.IsCanceled) { bucket.TrySetCanceled(); } else { bucket.TrySetResult(completed.Result); } }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); } return results; } 

Forget是一种压制CS4014的空方法:

 public static void Forget(this Task task) //suppress CS4014 { }