等待基于任务的队列

我想知道是否存在ConcurrentQueue的实现/包装器,类似于BlockingCollection ,其中从集合中获取不会阻塞,而是异步并且将导致异步等待直到将项放入队列中。

我已经提出了自己的实现,但它似乎没有按预期执行。 我想知道我是否正在重塑已经存在的东西。

这是我的实现:

public class MessageQueue { ConcurrentQueue queue = new ConcurrentQueue(); ConcurrentQueue<TaskCompletionSource> waitingQueue = new ConcurrentQueue<TaskCompletionSource>(); object queueSyncLock = new object(); public void Enqueue(T item) { queue.Enqueue(item); ProcessQueues(); } public async Task Dequeue() { TaskCompletionSource tcs = new TaskCompletionSource(); waitingQueue.Enqueue(tcs); ProcessQueues(); return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task; } private void ProcessQueues() { TaskCompletionSource tcs=null; T firstItem=default(T); while (true) { bool ok; lock (queueSyncLock) { ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem); if (ok) { waitingQueue.TryDequeue(out tcs); queue.TryDequeue(out firstItem); } } if (!ok) break; tcs.SetResult(firstItem); } } } 

我不知道无锁解决方案,但您可以查看新的Dataflow库 ,它是Async CTP的一部分。 一个简单的BufferBlock就足够了,例如:

 BufferBlock buffer = new BufferBlock(); 

通过数据流块类型的扩展方法可以轻松完成生产和消费。

生产就像:

 buffer.Post(13); 

和消费是异步准备:

 int item = await buffer.ReceiveAsync(); 

我建议你尽可能使用Dataflow; 使这样的缓冲区既高效又正确,比首次出现更困难。

我的尝试(在创建“promise”时会引发一个事件,外部生产者可以使用它来知道何时生成更多项目):

 public class AsyncQueue { private ConcurrentQueue _bufferQueue; private ConcurrentQueue> _promisesQueue; private object _syncRoot = new object(); public AsyncQueue() { _bufferQueue = new ConcurrentQueue(); _promisesQueue = new ConcurrentQueue>(); } ///  /// Enqueues the specified item. ///  /// The item. public void Enqueue(T item) { TaskCompletionSource promise; do { if (_promisesQueue.TryDequeue(out promise) && !promise.Task.IsCanceled && promise.TrySetResult(item)) { return; } } while (promise != null); lock (_syncRoot) { if (_promisesQueue.TryDequeue(out promise) && !promise.Task.IsCanceled && promise.TrySetResult(item)) { return; } _bufferQueue.Enqueue(item); } } ///  /// Dequeues the asynchronous. ///  /// The cancellation token. ///  public Task DequeueAsync(CancellationToken cancellationToken) { T item; if (!_bufferQueue.TryDequeue(out item)) { lock (_syncRoot) { if (!_bufferQueue.TryDequeue(out item)) { var promise = new TaskCompletionSource(); cancellationToken.Register(() => promise.TrySetCanceled()); _promisesQueue.Enqueue(promise); this.PromiseAdded.RaiseEvent(this, EventArgs.Empty); return promise.Task; } } } return Task.FromResult(item); } ///  /// Gets a value indicating whether this instance has promises. ///  ///  /// true if this instance has promises; otherwise, false. ///  public bool HasPromises { get { return _promisesQueue.Where(p => !p.Task.IsCanceled).Count() > 0; } } ///  /// Occurs when a new promise /// is generated by the queue ///  public event EventHandler PromiseAdded; } 

对于您的用例(考虑到学习曲线),它可能有点过分,但Reactive Extentions提供了您可能想要的异步合成的所有粘合剂 。

您基本上订阅了更改,并在它们可用时将它们推送给您,您可以让系统在单独的线程上推送更改。

这是我目前正在使用的实现。

 public class MessageQueue { ConcurrentQueue queue = new ConcurrentQueue(); ConcurrentQueue> waitingQueue = new ConcurrentQueue>(); object queueSyncLock = new object(); public void Enqueue(T item) { queue.Enqueue(item); ProcessQueues(); } public async Task DequeueAsync(CancellationToken ct) { TaskCompletionSource tcs = new TaskCompletionSource(); ct.Register(() => { lock (queueSyncLock) { tcs.TrySetCanceled(); } }); waitingQueue.Enqueue(tcs); ProcessQueues(); return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task; } private void ProcessQueues() { TaskCompletionSource tcs = null; T firstItem = default(T); lock (queueSyncLock) { while (true) { if (waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem)) { waitingQueue.TryDequeue(out tcs); if (tcs.Task.IsCanceled) { continue; } queue.TryDequeue(out firstItem); } else { break; } tcs.SetResult(firstItem); } } } } 

它工作得很好,但是在queueSyncLock上有很多争用,因为我正在大量使用CancellationToken来取消一些等待的任务。 当然,这会导致我用BlockingCollection看到的BlockingCollection要少得多,但……

我想知道是否有更顺畅,无锁的方法来达到同样的目的

您可以使用BlockingCollection (使用默认的ConcurrentQueue )并将调用包装到Take in a Task以便您可以await它:

 var bc = new BlockingCollection(); T element = await Task.Run( () => bc.Take() );