在监听消耗IEnumerable 时,Blocking.Peek()的类似于BlockingCollection
我正在使用Pipelines模式实现来将消费者与生产者分离,以避免缓慢的消费者问题。
如果在消息处理阶段[1]
上有任何exception,它将丢失并且不会被分派到其他服务/层[2]
。 我怎样才能在[3]
处理这样的问题,所以信息不会丢失,重要的是什么! 消息的顺序不会混淆,因此上层服务/层将按照它们进入的顺序获取消息。我有一个想法涉及其他中间Queue
但它看起来很复杂? 不幸的是, BlockingCollection
没有暴露Queue.Peek()
方法的任何模拟,所以我只能读取下一个可用的消息,并且在成功处理的情况下执行Dequeue()
private BlockingCollection messagesQueue; // TPL Task does following: // Listen to new messages and as soon as any comes in - process it foreach (var cachedMessage in messagesQueue.GetConsumingEnumerable(cancellation)) { const int maxRetries = 3; int retriesCounter = 0; bool isSent = false; // On this point a message already is removed from messagesQueue while (!isSent && retriesCounter++ <= maxRetries) { try { // [1] Preprocess a message // [2] Dispatch to an other service/layer clientProxyCallback.SendMessage(cachedMessage); isSent = true; } catch(Exception exception) { // [3] // logging if (!isSent && retriesCounter < maxRetries) { Thread.Sleep(NSeconds); } } if (!isSent && retriesCounter == maxRetries) { // just log, message is lost on this stage! } } }
编辑 :忘了说这是IIS托管的WCF服务,它通过客户端回调契约将消息发送回Silverlight客户端WCF代理。
编辑2:下面是我如何使用Peek()
,我错过了什么?
bool successfullySent = true; try { var item = queue.Peek(); PreProcessItem(item); SendItem(item); } catch(Exception exception) { successfullySent = false; } finally { if (successfullySent) { // just remove already sent item from the queue queue.Dequeue(); } }
EDIT3:当然我可以使用while循环,bool标志, Queue
和AutoResetEvent
旧样式方法,但我只是想知道是否可以使用BlockingCollection
和GetConsumingEnumerable()
我认为像Peek
这样的设备在与消耗可枚举一起使用时会非常有用,因为否则所有管道模式实现示例新的东西,如BlockingCollection
和GetConsumingEnumerable()
看起来不耐用,我不得不回到旧的方法。
你应该考虑中间队列。
由于其性质, BlockingCollection
无法“偷看”项目 – 可能有多个消费者。 其中一个可以偷看一个项目,另一个可以拿它 – 因此,第一个将尝试采取已经采取的项目。
正如Dennis在评论中所说, BlockingCollection
为IProducerConsumerCollection
接口的任何实现者提供了一个阻塞包装器。
如您所见, IProducerConsumerCollection
按设计,未定义Peek
或实现其中所需的其他方法。 这意味着BlockingCollection
不能像Peek
那样为Peek
提供一个分析。
如果你考虑,这可以大大减少Peek
实现中实用程序交易所产生的并发问题。 如何在不消费的情况下消费? 要同时Peek
你必须锁定集合的头部,直到Peek
操作完成,我和BlockingCollection
的设计者视为次优。 我认为这也很麻烦,难以实施,需要某种一次性的窥视环境。
如果您使用消息并且其消耗失败,则您将不得不处理它。 您可以将其添加到另一个故障队列,将其重新添加到正常处理队列以进行重新计算,或者只记录其后代故障或适合您的上下文的其他操作。
如果您不想同时使用消息,则不需要使用BlockingCollection
因为您不需要并发使用。 您可以直接使用ConcurrentQueue
,您仍然可以获得同步性,并且您可以安全地使用TryPeek
,因为您可以控制单个消费者。 如果消费失败,您可以通过无限重试循环停止消费,但我建议这需要一些设计思路。
BlockingCollection
是IProducerConsumerCollection
的包装器,它比ConcurrentQueue
更通用,并为实现者提供了不必实现(Try)Peek
方法的自由。
但是,您始终可以直接在底层队列上调用TryPeek
:
ConcurrentQueue useOnlyForPeeking = new ConcurrentQueue (); BlockingCollection blockingCollection = new BlockingCollection (useOnlyForPeeking); ... useOnlyForPeeking.TryPeek(...)
但请注意, 您不能通过useOnlyForPeeking
修改您的队列 ,否则blockingCollection
会混淆并可能抛出InvalidOperationException
,但如果在此并发数据结构上调用非修改TryPeek
将是一个问题,我会感到惊讶。
您可以使用ConcurrentQueue
,它具有TryDequeue()
方法。
ConcurrentQueue
尝试删除并返回并发队列开头的对象 , 如果删除了一个元素并从ConcurrentQueue的开头成功返回,则返回true 。.TryDequeue(out T result)
所以,不需要先查看Peek。
TryDequeue()
是线程安全的:
ConcurrentQueue
内部处理所有同步 。 如果两个线程在同一时刻调用TryDequeue(T),则两个操作都不会被阻止。
据我所知,只有队列为空时才返回false :
如果队列中填充了q.Enqueue(“a”)等代码; q.Enqueue( “B”); q.Enqueue( “C”); 并且两个线程同时尝试使一个元素出列,一个线程将使一个线程出列,另一个线程将出列b。 对TryDequeue(T)的两次调用都将返回true,因为它们都能够使元素出列。 如果每个线程返回以使其他元素出列,则其中一个线程将使队列出队并返回true,而另一个线程将查找队列为空并返回false。
http://msdn.microsoft.com/en-us/library/dd287208%28v=vs.100%29.aspx
UPDATE
也许,最简单的选择是使用TaskScheduler
Class。 有了它,您可以将所有处理任务包装到队列的项目中,并简化同步的实现。