

我们有一个消息队列。 我们希望并行处理消息并限制同时处理的消息的数量。

我们下面的试用代码会并行处理消息,但只有在前一个进程完成时才开始新的一批进程。 我们想在完成后重启任务。



static string queue = @".\Private$\concurrenttest"; private static void Process(CancellationToken token) { Task.Factory.StartNew(async () => { while (true) { IEnumerable consumerTasks = ConsumerTasks(); await Task.WhenAll(consumerTasks); await PeekAsync(new MessageQueue(queue)); } }); } private static IEnumerable ConsumerTasks() { for (int i = 0; i  { Console.WriteLine("id: " + message.id + ", name: " + message.name); Thread.Sleep(1000); }); } } private static Task PeekAsync(MessageQueue msMq) { return Task.Factory.FromAsync(msMq.BeginPeek(), msMq.EndPeek); } 


我花了很多时间考虑泵的可靠性 – 特别是如果从MessageQueue收到消息,取消变得棘手 – 所以我提供了两种终止队列的方法:

  • 发信号通知CancellationToken会尽快停止管道,并可能导致丢失消息。
  • 调用MessagePump.Stop()终止泵,但允许在MessagePump.Completion任务转换为RanToCompletion之前完全处理已从队列中获取的所有消息。

该解决方案使用TPL Dataflow(NuGet:Microsoft.Tpl.Dataflow)。


 using System; using System.Messaging; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; namespace StackOverflow.Q34437298 { ///  /// Pumps the message queue and processes messages in parallel. ///  public sealed class MessagePump { ///  /// Creates a  and immediately starts pumping. ///  public static MessagePump Run( MessageQueue messageQueue, Func processMessage, int maxDegreeOfParallelism, CancellationToken ct = default(CancellationToken)) { if (messageQueue == null) throw new ArgumentNullException(nameof(messageQueue)); if (processMessage == null) throw new ArgumentNullException(nameof(processMessage)); if (maxDegreeOfParallelism <= 0) throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism)); ct.ThrowIfCancellationRequested(); return new MessagePump(messageQueue, processMessage, maxDegreeOfParallelism, ct); } private readonly TaskCompletionSource _stop = new TaskCompletionSource(); ///  ///  which completes when this instance /// stops due to a  or cancellation request. ///  public Task Completion { get; } ///  /// Maximum number of parallel message processors. ///  public int MaxDegreeOfParallelism { get; } ///  ///  that is pumped by this instance. ///  public MessageQueue MessageQueue { get; } ///  /// Creates a new  instance. ///  private MessagePump(MessageQueue messageQueue, Func processMessage, int maxDegreeOfParallelism, CancellationToken ct) { MessageQueue = messageQueue; MaxDegreeOfParallelism = maxDegreeOfParallelism; // Kick off the loop. Completion = RunAsync(processMessage, ct); } ///  /// Soft-terminates the pump so that no more messages will be pumped. /// Any messages already removed from the message queue will be /// processed before this instance fully completes. ///  public void Stop() { // Multiple calls to Stop are fine. _stop.TrySetResult(true); } ///  /// Pump implementation. ///  private async Task RunAsync(Func processMessage, CancellationToken ct = default(CancellationToken)) { using (CancellationTokenSource producerCTS = ct.CanBeCanceled ? CancellationTokenSource.CreateLinkedTokenSource(ct) : new CancellationTokenSource()) { // This CancellationToken will either be signaled // externally, or if our consumer errors. ct = producerCTS.Token; // Handover between producer and consumer. DataflowBlockOptions bufferOptions = new DataflowBlockOptions { // There is no point in dequeuing more messages than we can process, // so we'll throttle the producer by limiting the buffer capacity. BoundedCapacity = MaxDegreeOfParallelism, CancellationToken = ct }; BufferBlock buffer = new BufferBlock(bufferOptions); Task producer = Task.Run(async () => { try { while (_stop.Task.Status != TaskStatus.RanToCompletion) { // This line and next line are the *only* two cancellation // points which will not cause dropped messages. ct.ThrowIfCancellationRequested(); Task peekTask = WithCancellation(PeekAsync(MessageQueue), ct); if (await Task.WhenAny(peekTask, _stop.Task).ConfigureAwait(false) == _stop.Task) { // Stop was signaled before PeekAsync returned. Wind down the producer gracefully // by breaking out and propagating completion to the consumer blocks. break; } await peekTask.ConfigureAwait(false); // Observe Peek exceptions. ct.ThrowIfCancellationRequested(); // Zero timeout means that we will error if someone else snatches the // peeked message from the queue before we get to it (due to a race). // I deemed this better than getting stuck waiting for a message which // may never arrive, or, worse yet, let this ReceiveAsync run onobserved // due to a cancellation (if we choose to abandon it like we do PeekAsync). // You will have to restart the pump if this throws. // Omit timeout if this behaviour is undesired. Message message = await ReceiveAsync(MessageQueue, timeout: TimeSpan.Zero).ConfigureAwait(false); await buffer.SendAsync(message, ct).ConfigureAwait(false); } } finally { buffer.Complete(); } }, ct); // Wire up the parallel consumers. ExecutionDataflowBlockOptions executionOptions = new ExecutionDataflowBlockOptions { CancellationToken = ct, MaxDegreeOfParallelism = MaxDegreeOfParallelism, SingleProducerConstrained = true, // We don't require thread safety guarantees. BoundedCapacity = MaxDegreeOfParallelism, }; ActionBlock consumer = new ActionBlock(async message => { ct.ThrowIfCancellationRequested(); await processMessage(message).ConfigureAwait(false); }, executionOptions); buffer.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true }); if (await Task.WhenAny(producer, consumer.Completion).ConfigureAwait(false) == consumer.Completion) { // If we got here, consumer probably errored. Stop the producer // before we throw so we don't go dequeuing more messages. producerCTS.Cancel(); } // Task.WhenAll checks faulted tasks before checking any // canceled tasks, so if our consumer threw a legitimate // execption, that's what will be rethrown, not the OCE. await Task.WhenAll(producer, consumer.Completion).ConfigureAwait(false); } } ///  /// APM -> TAP conversion for MessageQueue.Begin/EndPeek. ///  private static Task PeekAsync(MessageQueue messageQueue) { return Task.Factory.FromAsync(messageQueue.BeginPeek(), messageQueue.EndPeek); } ///  /// APM -> TAP conversion for MessageQueue.Begin/EndReceive. ///  private static Task ReceiveAsync(MessageQueue messageQueue, TimeSpan timeout) { return Task.Factory.FromAsync(messageQueue.BeginReceive(timeout), messageQueue.EndPeek); } ///  /// Allows abandoning tasks which do not natively /// support cancellation. Use with caution. ///  private static async Task WithCancellation(Task task, CancellationToken ct) { ct.ThrowIfCancellationRequested(); TaskCompletionSource tcs = new TaskCompletionSource(); using (ct.Register(s => ((TaskCompletionSource)s).TrySetResult(true), tcs, false)) { if (task != await Task.WhenAny(task, tcs.Task).ConfigureAwait(false)) { // Cancellation task completed first. // We are abandoning the original task. throw new OperationCanceledException(ct); } } // Task completed: synchronously return result or propagate exceptions. return await task.ConfigureAwait(false); } } } 


 using (MessageQueue msMq = GetQueue()) { MessagePump pump = MessagePump.Run( msMq, async message => { await Task.Delay(50); Console.WriteLine($"Finished processing message {message.Id}"); }, maxDegreeOfParallelism: 4 ); for (int i = 0; i < 100; i++) { msMq.Send(new Message()); Thread.Sleep(25); } pump.Stop(); await pump.Completion; } 

不整齐但functionunit testing:



正如我在评论中提到的,在.NET中已经建立了生产者/消费者模式,其中之一就是管道。 一个很好的例子可以在微软自己的Stephen Toub的“并行编程模式”中找到(全文: https : //www.microsoft.com/en-au/download/details.aspx? id = 19222,page 55)。



 void MessageQueueWithBlockingCollection() { // If your processing is continuous and never stops throughout the lifetime of // your application, you can ignore the fact that BlockingCollection is IDisposable. using (BlockingCollection messages = new BlockingCollection()) { Task producer = Task.Run(() => { try { for (int i = 0; i < 10; i++) { // Hand over the message to the consumer. messages.Add(new Message()); // Simulated arrival delay for the next message. Thread.Sleep(10); } } finally { // Notify consumer that there is no more data. messages.CompleteAdding(); } }); Task consumer = Task.Run(() => { ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = 4 }; Parallel.ForEach(messages.GetConsumingEnumerable(), options, message => { ProcessMessage(message); }); }); Task.WaitAll(producer, consumer); } } void ProcessMessage(Message message) { Thread.Sleep(40); } 


现在,在您的场景中,您正在使用Taskasync / await更适合TPL Dataflow(官方Microsoft支持的库,适用于并行和异步序列处理)。


 async Task MessageQueueWithTPLDataflow() { // Set up our queue. BufferBlock queue = new BufferBlock(); // Set up our processing stage (consumer). ExecutionDataflowBlockOptions options = new ExecutionDataflowBlockOptions { CancellationToken = CancellationToken.None, // Plug in your own in case you need to support cancellation. MaxDegreeOfParallelism = 4 }; ActionBlock consumer = new ActionBlock(m => ProcessMessageAsync(m), options); // Link the queue to the consumer. queue.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true }); // Wire up our producer. Task producer = Task.Run(async () => { try { for (int i = 0; i < 10; i++) { queue.Post(new Message()); await Task.Delay(10).ConfigureAwait(false); } } finally { // Signal to the consumer that there are no more items. queue.Complete(); } }); await consumer.Completion.ConfigureAwait(false); } Task ProcessMessageAsync(Message message) { return Task.Delay(40); } 

使用MessageQueue进行上述调整并不难,您可以确保最终结果没有线程问题。 如果我今天/明天有更多时间,我会这样做的。

您有一组要处理的事物。 您为正在处理的事物创建另一个集合(这可能是您的任务对象或某种引用任务的项目)。

只要你有工作要做,你就创建一个循环。 也就是说,工作项目正在等待启动,或者您仍在处理工作项目。



您创建一个内部循环,检查所有已启动的任务是否完成。 如果已完成,则将其删除并报告结果或执行任何合适的操作。

而已。 在下一个回合中,外部循环的上半部分将向正在运行的任务集合添加任务,直到该数字等于您设置的最大值,从而保持您的正在进行的工作集合。


.NET中的任务库用于执行并行执行的许多任务。 虽然有多种方法可以限制活动任务的数量,但库本身将根据计算机CPU限制活动任务的数量。

需要回答的第一个问题是为什么需要创建另一个限制? 如果任务库施加的限制是正常的,那么您可以保留创建任务并依赖任务库以良好的性能执行它。


您可以使用自定义任务计划程序来限制并发任务的数量。 有关MSDN的更多信息: https : //msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler%28v=vs.110%29.aspx 。

我的同事想出了下面的解决方案。 此解决方案有效,但我会在Code Review上审核此代码。

根据给出的答案和我们自己的一些研究,我们找到了解决方案。 我们使用SemaphoreSlim来限制并行任务的数量。

 static string queue = @".\Private$\concurrenttest"; private static async Task Process(CancellationToken token) { MessageQueue msMq = new MessageQueue(queue); msMq.Formatter = new XmlMessageFormatter(new Type[] { typeof(Command1) }); SemaphoreSlim s = new SemaphoreSlim(15, 15); while (true) { await s.WaitAsync(); await PeekAsync(msMq); Command1 message = await ReceiveAsync(msMq); Task.Run(async () => { try { await HandleMessage(message); } catch (Exception) { // Exception handling } s.Release(); }); } } private static Task HandleMessage(Command1 message) { Console.WriteLine("id: " + message.id + ", name: " + message.name); Thread.Sleep(1000); return Task.FromResult(1); } private static Task PeekAsync(MessageQueue msMq) { return Task.Factory.FromAsync(msMq.BeginPeek(), msMq.EndPeek); } public class Command1 { public int id { get; set; } public string name { get; set; } } private static async Task ReceiveAsync(MessageQueue msMq) { var receiveAsync = await Task.Factory.FromAsync(msMq.BeginReceive(), msMq.EndPeek); return (Command1)receiveAsync.Body; } 

您应该考虑使用Microsoft的Reactive Framework。


 var query = from command1 in FromQueue(queue) from text in Observable.Start(() => { Thread.Sleep(1000); return "id: " + command1.id + ", name: " + command1.name; }) select text; var subscription = query .Subscribe(text => Console.WriteLine(text)); 

这将并行执行所有处理,并确保处理在所有核心上正确分布。 当一个值结束时另一个值开始。



 static IObservable FromQueue(string serverQueue) { return Observable.Create(observer => { var responseQueue = Environment.MachineName + "\\Private$\\" + Guid.NewGuid().ToString(); var queue = MessageQueue.Create(responseQueue); var frm = new System.Messaging.BinaryMessageFormatter(); var srv = new MessageQueue(serverQueue); srv.Formatter = frm; queue.Formatter = frm; srv.Send("S " + responseQueue); var loop = NewThreadScheduler.Default.ScheduleLongRunning(cancel => { while (!cancel.IsDisposed) { var msg = queue.Receive(); observer.OnNext((T)msg.Body); } }); return new CompositeDisposable( loop, Disposable.Create(() => { srv.Send("D " + responseQueue); MessageQueue.Delete(responseQueue); }) ); }); } 



 int maxConcurrent = 2; var query = FromQueue(queue) .Select(command1 => Observable.Start(() => { Thread.Sleep(1000); return "id: " + command1.id + ", name: " + command1.name; })) .Merge(maxConcurrent);