使用并发启用队列

我有一个先前的问题 ,我提供了解决方案; 但是,因为我在.Net 3.5上,所以我无法访问ConcurrentQueue 。 我需要Queue来允许并发。 我读了这个问题 ,如果一个项目不在队列中并且线程方法试图使项目出列,则似乎存在问题。

我现在的任务是确定是否可以派生自己的并发队列类。 这就是我提出的:

 public sealed class ConcurrentQueue : Queue { public event EventHandler TableQueued; private ICollection que; new public void Enqueue(DataTable Table) { lock (que.SyncRoot) { base.Enqueue(Table); } OnTableQueued(new TableQueuedEventArgs(Dequeue())); } // this is where I think I will have a problem... new public DataTable Dequeue() { DataTable table; lock (que.SyncRoot) { table = base.Dequeue(); } return table; } public void OnTableQueued(TableQueuedEventArgs table) { EventHandler handler = TableQueued; if (handler != null) { handler(this, table); } } } 

因此,当DataTable排队时,EventArgs会将一个出列表传递给事件订阅者。 这个实现会为我提供一个线程安全的队列吗?

当您将物品排入队列时,您的物品会出列。
您需要使用参数引发事件。

它是否真的是线程安全取决于你如何使用它。
如果您检查Count或检查空虚,它不是线程安全的,不能轻易地做线程安全。
如果不这样做,您可能会使用比队列更简单的东西。

快速访问我最喜欢的搜索引擎,发现我的记忆是正确的; 你甚至可以在.NET 3.5上获得任务并行库 。 另请参阅有关此主题的PFX团队博客文章 ,以及为了获取所需的System.Threading.dll下载的Reactive Extensions 。

您需要使用new来隐藏基类中的方法这一事实通常表明您应该使用组合而不是inheritance…

这是一个简单的同步队列,它不使用inheritance但仍依赖于标准Queue的行为:

 public class ConcurrentQueue : ICollection, IEnumerable { private readonly Queue _queue; public ConcurrentQueue() { _queue = new Queue(); } public IEnumerator GetEnumerator() { lock (SyncRoot) { foreach (var item in _queue) { yield return item; } } } IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } public void CopyTo(Array array, int index) { lock (SyncRoot) { ((ICollection)_queue).CopyTo(array, index); } } public int Count { get { // Assumed to be atomic, so locking is unnecessary return _queue.Count; } } public object SyncRoot { get { return ((ICollection)_queue).SyncRoot; } } public bool IsSynchronized { get { return true; } } public void Enqueue(T item) { lock (SyncRoot) { _queue.Enqueue(item); } } public T Dequeue() { lock(SyncRoot) { return _queue.Dequeue(); } } public T Peek() { lock (SyncRoot) { return _queue.Peek(); } } public void Clear() { lock (SyncRoot) { _queue.Clear(); } } } 

在最初的问题之后的一段时间,我知道(这与另一个问题的权利“相关”),但在类似情况下我已经使用了以下内容。 对CPU缓存的使用不尽如人意,但是如果操作之间经常存在较大的间隙,那么简单,无锁,线程安全以及CPU缓存的使用并不重要不是分配的接近程度可能会减少影响:

 internal sealed class LockFreeQueue { private sealed class Node { public readonly T Item; public Node Next; public Node(T item) { Item = item; } } private volatile Node _head; private volatile Node _tail; public LockFreeQueue() { _head = _tail = new Node(default(T)); } #pragma warning disable 420 // volatile semantics not lost as only by-ref calls are interlocked public void Enqueue(T item) { Node newNode = new Node(item); for(;;) { Node curTail = _tail; if (Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null) //append to the tail if it is indeed the tail. { Interlocked.CompareExchange(ref _tail, newNode, curTail); //CAS in case we were assisted by an obstructed thread. return; } else { Interlocked.CompareExchange(ref _tail, curTail.Next, curTail); //assist obstructing thread. } } } public bool TryDequeue(out T item) { for(;;) { Node curHead = _head; Node curTail = _tail; Node curHeadNext = curHead.Next; if (curHead == curTail) { if (curHeadNext == null) { item = default(T); return false; } else Interlocked.CompareExchange(ref _tail, curHeadNext, curTail); // assist obstructing thread } else { item = curHeadNext.Item; if (Interlocked.CompareExchange(ref _head, curHeadNext, curHead) == curHead) { return true; } } } } #pragma warning restore 420 } 

OnTableQueued(new TableQueuedEventArgs(Dequeue()));行中OnTableQueued(new TableQueuedEventArgs(Dequeue())); 在您的Enqueue方法中

使用Peek而不是Dequeue

它应该是

OnTableQueued(new TableQueuedEventArgs(base.Peek()));