如何访问阻塞集合的基础默认并发队列

我有多个生产者和一个消费者。 但是,如果队列中存在尚未消耗的内容,则生产者不应再次对其进行排队。 (唯一没有重复的阻塞集合使用默认的并发队列)

if (!myBlockingColl.Contains(item)) myBlockingColl.Add(item) 

但是,阻塞coll没有包含方法,也没有提供任何类型的trypeek()方法。 如何访问底层并发队列,以便我可以做类似的事情

 if (!myBlockingColl.myConcurQ.trypeek(item) myBlockingColl.Add(item) 

尾巴旋转。 请帮忙。 谢谢

这是个有趣的问题。 这是我第一次看到有人要求阻止重复的阻塞队列。 奇怪的是,我找不到任何你想要的东西,这已经存在于BCL中了。 我说这很奇怪,因为BlockingCollection可以接受IProducerConsumerCollection作为底层集合,它具有IProducerConsumerCollection方法,该方法被IProducerConsumerCollection为在检测到重复项时能够失败。 问题是我没有看到IProducerConsumerCollection具体实现,它可以防止重复。 至少我们可以写自己的。

 public class NoDuplicatesConcurrentQueue : IProducerConsumerCollection { // TODO: You will need to fully implement IProducerConsumerCollection. private Queue queue = new Queue(); public bool TryAdd(T item) { lock (queue) { if (!queue.Contains(item)) { queue.Enqueue(item); return true; } return false; } } public bool TryTake(out T item) { lock (queue) { item = null; if (queue.Count > 0) { item = queue.Dequeue(); } return item != null; } } } 

既然我们的IProducerConsumerCollection不接受重复项,我们可以像这样使用它:

 public class Example { private BlockingCollection queue = new BlockingCollection(new NoDuplicatesConcurrentQueue()); public Example() { new Thread(Consume).Start(); } public void Produce(object item) { bool unique = queue.TryAdd(item); } private void Consume() { while (true) { object item = queue.Take(); } } } 

您可能不喜欢我的NoDuplicatesConcurrentQueue实现。 如果您认为需要TPL集合提供的低锁性能,您当然可以使用ConcurrentQueue实现自己的任务。

更新:

我今天早上能够测试代码。 有一些好消息和坏消息。 好消息是,这将在技术上有效。 坏消息是你可能不想这样做,因为BlockingCollection.TryAdd拦截了底层IProducerConsumerCollection.TryAdd方法的返回值,并在检测到false时抛出exception。 是的,这是对的。 它不会像您期望的那样返回false ,而是生成exception。 我必须说实话,这既令人惊讶又荒谬。 TryXXX方法的重点是它们不应该抛出exception。 我深感失望。

除了Brian Gideon在Update之后提到的警告之外,他的解决方案还存在以下性能问题:

  • 队列上的O(n)操作( queue.Contains(item) )随着队列的增长对性能产生严重影响
  • 锁限制并发(他确实提到)

以下代码改进了Brian的解决方案

  • 使用哈希集进行O(1)查找
  • 合并System.Collections.Concurrent命名空间中的2个数据结构

注意:由于没有ConcurrentHashSet ,我使用的是ConcurrentDictionary ,忽略了这些值。

在这种罕见的情况下,幸运的是,可以简单地从多个更简单的并发数据结构中构建更复杂的并发数据结构,而无需添加锁。 这两个并发数据结构的操作顺序非常重要。

 public class NoDuplicatesConcurrentQueue : IProducerConsumerCollection { private readonly ConcurrentDictionary existingElements = new ConcurrentDictionary(); private readonly ConcurrentQueue queue = new ConcurrentQueue(); public bool TryAdd(T item) { if (existingElements.TryAdd(item, false)) { queue.Enqueue(item); return true; } return false; } public bool TryTake(out T item) { if (queue.TryDequeue(out item)) { bool _; existingElements.TryRemove(item, out _); return true; } return false; } ... } 

NB查看此问题的另一种方法:您需要一个保留插入顺序的集合

我建议用锁实现你的操作,这样你就不会以破坏它的方式读写项目,使它们成为primefaces。 例如,使用任何IEnumerable:

 object bcLocker = new object(); // ... lock (bcLocker) { bool foundTheItem = false; foreach (someClass nextItem in myBlockingColl) { if (nextItem.Equals(item)) { foundTheItem = true; break; } } if (foundTheItem == false) { // Add here } }