在监听消耗IEnumerable 时,Blocking.Peek()的类似于BlockingCollection

我正在使用Pipelines模式实现来将消费者与生产者分离,以避免缓慢的消费者问题。

如果在消息处理阶段[1]上有任何exception,它将丢失并且不会被分派到其他服务/层[2] 。 我怎样才能在[3]处理这样的问题,所以信息不会丢失,重要的是什么! 消息的顺序不会混淆,因此上层服务/层将按照它们进入的顺序获取消息。我有一个想法涉及其他中间Queue但它看起来很复杂? 不幸的是, BlockingCollection没有暴露Queue.Peek()方法的任何模拟,所以我只能读取下一个可用的消息,并且在成功处理的情况下执行Dequeue()

 private BlockingCollection messagesQueue; // TPL Task does following: // Listen to new messages and as soon as any comes in - process it foreach (var cachedMessage in messagesQueue.GetConsumingEnumerable(cancellation)) { const int maxRetries = 3; int retriesCounter = 0; bool isSent = false; // On this point a message already is removed from messagesQueue while (!isSent && retriesCounter++ <= maxRetries) { try { // [1] Preprocess a message // [2] Dispatch to an other service/layer clientProxyCallback.SendMessage(cachedMessage); isSent = true; } catch(Exception exception) { // [3] // logging if (!isSent && retriesCounter < maxRetries) { Thread.Sleep(NSeconds); } } if (!isSent && retriesCounter == maxRetries) { // just log, message is lost on this stage! } } } 

编辑 :忘了说这是IIS托管的WCF服务,它通过客户端回调契约将消息发送回Silverlight客户端WCF代理。

编辑2:下面是我如何使用Peek() ,我错过了什么?

 bool successfullySent = true; try { var item = queue.Peek(); PreProcessItem(item); SendItem(item); } catch(Exception exception) { successfullySent = false; } finally { if (successfullySent) { // just remove already sent item from the queue queue.Dequeue(); } } 

EDIT3:当然我可以使用while循环,bool标志, QueueAutoResetEvent旧样式方法,但我只是想知道是否可以使用BlockingCollectionGetConsumingEnumerable()我认为像Peek这样的设备在与消耗可枚举一起使用时会非常有用,因为否则所有管道模式实现示例新的东西,如BlockingCollectionGetConsumingEnumerable()看起来不耐用,我不得不回到旧的方法。

你应该考虑中间队列。

由于其性质, BlockingCollection无法“偷看”项目 – 可能有多个消费者。 其中一个可以偷看一个项目,另一个可以拿它 – 因此,第一个将尝试采取已经采取的项目。

正如Dennis在评论中所说, BlockingCollectionIProducerConsumerCollection接口的任何实现者提供了一个阻塞包装器。

如您所见, IProducerConsumerCollection按设计,未定义Peek或实现其中所需的其他方法。 这意味着BlockingCollection不能像Peek那样为Peek提供一个分析。

如果你考虑,这可以大大减少Peek实现中实用程序交易所产生的并发问题。 如何在不消费的情况下消费? 要同时Peek你必须锁定集合的头部,直到Peek操作完成,我和BlockingCollection的设计者视为次优。 我认为这也很麻烦,难以实施,需要某种一次性的窥视环境。

如果您使用消息并且其消耗失败,则您将不得不处理它。 您可以将其添加到另一个故障队列,将其重新添加到正常处理队列以进行重新计算,或者只记录其后代故障或适合您的上下文的其他操作。

如果您不想同时使用消息,则不需要使用BlockingCollection因为您不需要并发使用。 您可以直接使用ConcurrentQueue ,您仍然可以获得同步性,并且您可以安全地使用TryPeek ,因为您可以控制单个消费者。 如果消费失败,您可以通过无限重试循环停止消费,但我建议这需要一些设计思路。

BlockingCollectionIProducerConsumerCollection的包装器,它比ConcurrentQueue更通用,并为实现者提供了不必实现(Try)Peek方法的自由。

但是,您始终可以直接在底层队列上调用TryPeek

 ConcurrentQueue useOnlyForPeeking = new ConcurrentQueue(); BlockingCollection blockingCollection = new BlockingCollection(useOnlyForPeeking); ... useOnlyForPeeking.TryPeek(...) 

但请注意, 您不能通过useOnlyForPeeking修改您的队列 ,否则blockingCollection会混淆并可能抛出InvalidOperationException ,但如果在此并发数据结构上调用非修改TryPeek将是一个问题,我会感到惊讶。

您可以使用ConcurrentQueue ,它具有TryDequeue()方法。

ConcurrentQueue.TryDequeue(out T result)尝试删除并返回并发队列开头的对象如果删除了一个元素并从ConcurrentQueue的开头成功返回,则返回true

所以,不需要先查看Peek。

TryDequeue()是线程安全的:

ConcurrentQueue 内部处理所有同步 。 如果两个线程在同一时刻调用TryDequeue(T),则两个操作都不会被阻止。

据我所知,只有队列为空时才返回false

如果队列中填充了q.Enqueue(“a”)等代码; q.Enqueue( “B”); q.Enqueue( “C”); 并且两个线程同时尝试使一个元素出列,一个线程将使一个线程出列,另一个线程将出列b。 对TryDequeue(T)的两次调用都将返回true,因为它们都能够使元素出列。 如果每个线程返回以使其他元素出列,则其中一个线程将使队列出队并返回true,而另一个线程将查找队列为空并返回false。

http://msdn.microsoft.com/en-us/library/dd287208%28v=vs.100%29.aspx

UPDATE

也许,最简单的选择是使用TaskScheduler Class。 有了它,您可以将所有处理任务包装到队列的项目中,并简化同步的实现。