如何访问阻塞集合的基础默认并发队列
我有多个生产者和一个消费者。 但是,如果队列中存在尚未消耗的内容,则生产者不应再次对其进行排队。 (唯一没有重复的阻塞集合使用默认的并发队列)
if (!myBlockingColl.Contains(item)) myBlockingColl.Add(item)
但是,阻塞coll没有包含方法,也没有提供任何类型的trypeek()方法。 如何访问底层并发队列,以便我可以做类似的事情
if (!myBlockingColl.myConcurQ.trypeek(item) myBlockingColl.Add(item)
尾巴旋转。 请帮忙。 谢谢
这是个有趣的问题。 这是我第一次看到有人要求阻止重复的阻塞队列。 奇怪的是,我找不到任何你想要的东西,这已经存在于BCL中了。 我说这很奇怪,因为BlockingCollection
可以接受IProducerConsumerCollection
作为底层集合,它具有IProducerConsumerCollection
方法,该方法被IProducerConsumerCollection
为在检测到重复项时能够失败。 问题是我没有看到IProducerConsumerCollection
具体实现,它可以防止重复。 至少我们可以写自己的。
public class NoDuplicatesConcurrentQueue : IProducerConsumerCollection { // TODO: You will need to fully implement IProducerConsumerCollection. private Queue queue = new Queue (); public bool TryAdd(T item) { lock (queue) { if (!queue.Contains(item)) { queue.Enqueue(item); return true; } return false; } } public bool TryTake(out T item) { lock (queue) { item = null; if (queue.Count > 0) { item = queue.Dequeue(); } return item != null; } } }
既然我们的IProducerConsumerCollection
不接受重复项,我们可以像这样使用它:
public class Example { private BlockingCollection
您可能不喜欢我的NoDuplicatesConcurrentQueue
实现。 如果您认为需要TPL集合提供的低锁性能,您当然可以使用ConcurrentQueue
实现自己的任务。
更新:
我今天早上能够测试代码。 有一些好消息和坏消息。 好消息是,这将在技术上有效。 坏消息是你可能不想这样做,因为BlockingCollection.TryAdd
拦截了底层IProducerConsumerCollection.TryAdd
方法的返回值,并在检测到false
时抛出exception。 是的,这是对的。 它不会像您期望的那样返回false
,而是生成exception。 我必须说实话,这既令人惊讶又荒谬。 TryXXX方法的重点是它们不应该抛出exception。 我深感失望。
除了Brian Gideon在Update之后提到的警告之外,他的解决方案还存在以下性能问题:
- 队列上的O(n)操作(
queue.Contains(item)
)随着队列的增长对性能产生严重影响 - 锁限制并发(他确实提到)
以下代码改进了Brian的解决方案
- 使用哈希集进行O(1)查找
- 合并
System.Collections.Concurrent
命名空间中的2个数据结构
注意:由于没有ConcurrentHashSet
,我使用的是ConcurrentDictionary
,忽略了这些值。
在这种罕见的情况下,幸运的是,可以简单地从多个更简单的并发数据结构中构建更复杂的并发数据结构,而无需添加锁。 这两个并发数据结构的操作顺序非常重要。
public class NoDuplicatesConcurrentQueue : IProducerConsumerCollection { private readonly ConcurrentDictionary existingElements = new ConcurrentDictionary(); private readonly ConcurrentQueue queue = new ConcurrentQueue (); public bool TryAdd(T item) { if (existingElements.TryAdd(item, false)) { queue.Enqueue(item); return true; } return false; } public bool TryTake(out T item) { if (queue.TryDequeue(out item)) { bool _; existingElements.TryRemove(item, out _); return true; } return false; } ... }
NB查看此问题的另一种方法:您需要一个保留插入顺序的集合 。
我建议用锁实现你的操作,这样你就不会以破坏它的方式读写项目,使它们成为primefaces。 例如,使用任何IEnumerable:
object bcLocker = new object(); // ... lock (bcLocker) { bool foundTheItem = false; foreach (someClass nextItem in myBlockingColl) { if (nextItem.Equals(item)) { foundTheItem = true; break; } } if (foundTheItem == false) { // Add here } }