如何在BlockingCollection上取消GetConsumingEnumerable()

在下面的代码中,我使用CancellationToken在生产者没有生成时唤醒GetConsumingEnumerable(),我想要脱离foreach并退出Task。 但我没有看到IsCancellationRequested被记录,我的Task.Wait(timeOut)等待整个timeOut期间。 我究竟做错了什么?

userToken.Task = Task.Factory.StartNew(state => { userToken.CancelToken = new CancellationTokenSource(); foreach (var broadcast in userToken.BroadcastQueue.GetConsumingEnumerable(userToken.CancelToken.Token)) { if (userToken.CancelToken.IsCancellationRequested) { Log.Write("BroadcastQueue IsCancellationRequested"); break; ... } } return 0; }, "TaskSubscribe", TaskCreationOptions.LongRunning); 

后来…

 UserToken.CancelToken.Cancel(); try { task.Wait(timeOut); } catch (AggregateException ar) { Log.Write("AggregateException " + ar.InnerException, MsgType.InfoMsg); } catch (OperationCanceledException) { Log.Write("BroadcastQueue Cancelled", MsgType.InfoMsg); } 

您可以使用CompleteAdding()表示不再向集合添加任何项目。 如果使用GetConsumingEnumerable,foreach将优雅地结束,因为它将知道等待更多项目没有意义。

基本上,一旦你完成向BlockingCollection添加项目,只需执行:myBlockingCollection.CompleteAdding()

使用GetConsumingEnumerable进行foreach循环的任何线程都将停止循环。

我已经创建了快速原型,它似乎对我有用。

注意在令牌取消请求之前的Thread.Sleep(1000)。 您可能正在为Token变量创建竞争条件 ,因为您在不同的线程中创建并访问item.CancelToken变量。

例如,用于取消任务的代码可能会在错误(先前或空)取消令牌上调用cancel。

 static void Main(string[] args) { CancellationTokenSource token = null; BlockingCollection coll = new BlockingCollection(); var t = Task.Factory.StartNew(state => { token = new CancellationTokenSource(); try { foreach (var broadcast in coll.GetConsumingEnumerable(token.Token)) { if (token.IsCancellationRequested) { return; } } } catch (OperationCanceledException) { Console.WriteLine("Cancel"); return; } }, "TaskSubscribe", TaskCreationOptions.LongRunning); Thread.Sleep(1000); token.Cancel(); t.Wait(); }