Tag: blockingcollection

如何使用C#任务并行库和IProducerConsumerCollection实现通用回调?

我有一个组件向基于Web的API提交请求,但必须限制这些请求,以免违反API的数据限制。 这意味着所有请求必须通过队列来控制它们的提交速率,但它们可以(并且应该)同时执行以实现最大吞吐量。 每个请求必须在完成后的某个时刻将某些数据返回给调用代码。 我正在努力创建一个很好的模型来处理数据的返回。 使用BlockingCollection我不能只从Schedule方法返回Task ,因为入队和出队进程位于缓冲区的两端。 因此,我创建了一个RequestItem类型,其中包含Action<Task>forms的回调。 这个想法是,一旦一个项目被从队列中拉出,就可以用启动的任务调用回调,但是我已经丢失了那个点的generics类型参数,而且我还在使用reflection和各种恶作剧(如果它是甚至可能)。 例如: public class RequestScheduler { private readonly BlockingCollection _queue = new BlockingCollection(); public RequestScheduler() { this.Start(); } // This can’t return Task, so returns void. // Instead RequestItem is generic but this poses problems when adding to the queue public void Schedule(RequestItem request) { _queue.Add(request); } private […]

使用BlockingCollection更新ObservableCollection

我订阅了一个服务,当收到一个新元素时会引发一个事件,我将这个元素添加到一个BlockingCollection 。 我有第二个线程运行,它将循环BlockingCollection以在可观察集合中添加/更新元素。 问题是你如何添加ObservableCollection ? 我知道我不能在这种类型的集合上做一个.add ,因为它需要在UI线程上完成。 所以我尝试使用不同的ObservableCollection子类,它使用调度程序来编组元素的添加,但每次都会出现同样的错误 “Unknown Module中发生了’System.StackOverflowException’类型的未处理exception。” 有一个疑难解答提示: 确保你没有无限循环或无限递归。 嗯,实际上,我确实有一些无限循环,因为BlockingQueue一直在接收新的元素,比如每秒5到10个。 如果我没有将控件绑定到observablecollection ,或者如果我使用List,那么我不会得到exception。 Class ElementHolder { private ExternalService _externalService; private ObservableCollection _elementsList = new ObservableCollection(); private BlockingCollection _elementsReceived = new BlockingCollection(); public ObservableCollection ElementsList { get { return _elementList; } set { _elementList = value; } } public ElementHolder() { Task.Factory.StartNew(ReadElements); _externalService = […]

如何在BlockingCollection上取消GetConsumingEnumerable()

在下面的代码中,我使用CancellationToken在生产者没有生成时唤醒GetConsumingEnumerable(),我想要脱离foreach并退出Task。 但我没有看到IsCancellationRequested被记录,我的Task.Wait(timeOut)等待整个timeOut期间。 我究竟做错了什么? userToken.Task = Task.Factory.StartNew(state => { userToken.CancelToken = new CancellationTokenSource(); foreach (var broadcast in userToken.BroadcastQueue.GetConsumingEnumerable(userToken.CancelToken.Token)) { if (userToken.CancelToken.IsCancellationRequested) { Log.Write(“BroadcastQueue IsCancellationRequested”); break; … } } return 0; }, “TaskSubscribe”, TaskCreationOptions.LongRunning); 后来… UserToken.CancelToken.Cancel(); try { task.Wait(timeOut); } catch (AggregateException ar) { Log.Write(“AggregateException ” + ar.InnerException, MsgType.InfoMsg); } catch (OperationCanceledException) { Log.Write(“BroadcastQueue Cancelled”, MsgType.InfoMsg); }

为什么迭代GetConsumingEnumerable()不会完全清空底层的阻塞集合

在尝试创建简单管道时,使用任务并行库, BlockingCollection , ConcurrentQueue和GetConsumingEnumerable ,我有一个可量化且可重复的问题。 简而言之,将条目添加到一个线程的默认BlockingCollection (在引擎盖下依赖于ConcurrentQueue )并不能保证它们将从另一个调用该线程的线程中弹出BlockingCollection 。 GetConsumingEnumerable()方法。 我创建了一个非常简单的Winforms应用程序来重现/模拟它,它只是将整数打印到屏幕上。 Timer1负责排队工作项……它使用一个名为_tracker的并发字典,以便它知道它已经添加到阻塞集合中的内容。 Timer2只记录BlockingCollection和_tracker的计数状态 START按钮启动Paralell.ForEach ,它只是遍历阻塞集合GetConsumingEnumerable()并开始将它们打印到第二个列表框。 STOP按钮停止Timer1防止更多条目被添加到阻塞集合中。 public partial class Form1 : Form { private int Counter = 0; private BlockingCollection _entries; private ConcurrentDictionary _tracker; private CancellationTokenSource _tokenSource; private TaskFactory _factory; public Form1() { _entries = new BlockingCollection(); _tracker = new ConcurrentDictionary(); _tokenSource = new CancellationTokenSource(); […]