如何并行读取队列中的消息?
情况
我们有一个消息队列。 我们希望并行处理消息并限制同时处理的消息的数量。
我们下面的试用代码会并行处理消息,但只有在前一个进程完成时才开始新的一批进程。 我们想在完成后重启任务。
换句话说:只要消息队列不为空,任务的最大数量应始终处于活动状态。
试用代码
static string queue = @".\Private$\concurrenttest"; private static void Process(CancellationToken token) { Task.Factory.StartNew(async () => { while (true) { IEnumerable consumerTasks = ConsumerTasks(); await Task.WhenAll(consumerTasks); await PeekAsync(new MessageQueue(queue)); } }); } private static IEnumerable ConsumerTasks() { for (int i = 0; i { Console.WriteLine("id: " + message.id + ", name: " + message.name); Thread.Sleep(1000); }); } } private static Task PeekAsync(MessageQueue msMq) { return Task.Factory.FromAsync(msMq.BeginPeek(), msMq.EndPeek); }
编辑
我花了很多时间考虑泵的可靠性 – 特别是如果从MessageQueue
收到消息,取消变得棘手 – 所以我提供了两种终止队列的方法:
- 发信号通知
CancellationToken
会尽快停止管道,并可能导致丢失消息。 - 调用
MessagePump.Stop()
终止泵,但允许在MessagePump.Completion
任务转换为RanToCompletion
之前完全处理已从队列中获取的所有消息。
该解决方案使用TPL Dataflow(NuGet:Microsoft.Tpl.Dataflow)。
全面实施:
using System; using System.Messaging; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; namespace StackOverflow.Q34437298 { /// /// Pumps the message queue and processes messages in parallel. /// public sealed class MessagePump { /// /// Creates a and immediately starts pumping. /// public static MessagePump Run( MessageQueue messageQueue, Func processMessage, int maxDegreeOfParallelism, CancellationToken ct = default(CancellationToken)) { if (messageQueue == null) throw new ArgumentNullException(nameof(messageQueue)); if (processMessage == null) throw new ArgumentNullException(nameof(processMessage)); if (maxDegreeOfParallelism <= 0) throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism)); ct.ThrowIfCancellationRequested(); return new MessagePump(messageQueue, processMessage, maxDegreeOfParallelism, ct); } private readonly TaskCompletionSource _stop = new TaskCompletionSource (); /// /// which completes when this instance /// stops due to a or cancellation request. /// public Task Completion { get; } /// /// Maximum number of parallel message processors. /// public int MaxDegreeOfParallelism { get; } /// /// that is pumped by this instance. /// public MessageQueue MessageQueue { get; } /// /// Creates a new instance. /// private MessagePump(MessageQueue messageQueue, Func processMessage, int maxDegreeOfParallelism, CancellationToken ct) { MessageQueue = messageQueue; MaxDegreeOfParallelism = maxDegreeOfParallelism; // Kick off the loop. Completion = RunAsync(processMessage, ct); } /// /// Soft-terminates the pump so that no more messages will be pumped. /// Any messages already removed from the message queue will be /// processed before this instance fully completes. /// public void Stop() { // Multiple calls to Stop are fine. _stop.TrySetResult(true); } /// /// Pump implementation. /// private async Task RunAsync(Func processMessage, CancellationToken ct = default(CancellationToken)) { using (CancellationTokenSource producerCTS = ct.CanBeCanceled ? CancellationTokenSource.CreateLinkedTokenSource(ct) : new CancellationTokenSource()) { // This CancellationToken will either be signaled // externally, or if our consumer errors. ct = producerCTS.Token; // Handover between producer and consumer. DataflowBlockOptions bufferOptions = new DataflowBlockOptions { // There is no point in dequeuing more messages than we can process, // so we'll throttle the producer by limiting the buffer capacity. BoundedCapacity = MaxDegreeOfParallelism, CancellationToken = ct }; BufferBlock buffer = new BufferBlock (bufferOptions); Task producer = Task.Run(async () => { try { while (_stop.Task.Status != TaskStatus.RanToCompletion) { // This line and next line are the *only* two cancellation // points which will not cause dropped messages. ct.ThrowIfCancellationRequested(); Task peekTask = WithCancellation(PeekAsync(MessageQueue), ct); if (await Task.WhenAny(peekTask, _stop.Task).ConfigureAwait(false) == _stop.Task) { // Stop was signaled before PeekAsync returned. Wind down the producer gracefully // by breaking out and propagating completion to the consumer blocks. break; } await peekTask.ConfigureAwait(false); // Observe Peek exceptions. ct.ThrowIfCancellationRequested(); // Zero timeout means that we will error if someone else snatches the // peeked message from the queue before we get to it (due to a race). // I deemed this better than getting stuck waiting for a message which // may never arrive, or, worse yet, let this ReceiveAsync run onobserved // due to a cancellation (if we choose to abandon it like we do PeekAsync). // You will have to restart the pump if this throws. // Omit timeout if this behaviour is undesired. Message message = await ReceiveAsync(MessageQueue, timeout: TimeSpan.Zero).ConfigureAwait(false); await buffer.SendAsync(message, ct).ConfigureAwait(false); } } finally { buffer.Complete(); } }, ct); // Wire up the parallel consumers. ExecutionDataflowBlockOptions executionOptions = new ExecutionDataflowBlockOptions { CancellationToken = ct, MaxDegreeOfParallelism = MaxDegreeOfParallelism, SingleProducerConstrained = true, // We don't require thread safety guarantees. BoundedCapacity = MaxDegreeOfParallelism, }; ActionBlock consumer = new ActionBlock (async message => { ct.ThrowIfCancellationRequested(); await processMessage(message).ConfigureAwait(false); }, executionOptions); buffer.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true }); if (await Task.WhenAny(producer, consumer.Completion).ConfigureAwait(false) == consumer.Completion) { // If we got here, consumer probably errored. Stop the producer // before we throw so we don't go dequeuing more messages. producerCTS.Cancel(); } // Task.WhenAll checks faulted tasks before checking any // canceled tasks, so if our consumer threw a legitimate // execption, that's what will be rethrown, not the OCE. await Task.WhenAll(producer, consumer.Completion).ConfigureAwait(false); } } /// /// APM -> TAP conversion for MessageQueue.Begin/EndPeek. /// private static Task PeekAsync(MessageQueue messageQueue) { return Task.Factory.FromAsync(messageQueue.BeginPeek(), messageQueue.EndPeek); } /// /// APM -> TAP conversion for MessageQueue.Begin/EndReceive. /// private static Task ReceiveAsync(MessageQueue messageQueue, TimeSpan timeout) { return Task.Factory.FromAsync(messageQueue.BeginReceive(timeout), messageQueue.EndPeek); } /// /// Allows abandoning tasks which do not natively /// support cancellation. Use with caution. /// private static async Task WithCancellation (Task task, CancellationToken ct) { ct.ThrowIfCancellationRequested(); TaskCompletionSource tcs = new TaskCompletionSource (); using (ct.Register(s => ((TaskCompletionSource )s).TrySetResult(true), tcs, false)) { if (task != await Task.WhenAny(task, tcs.Task).ConfigureAwait(false)) { // Cancellation task completed first. // We are abandoning the original task. throw new OperationCanceledException(ct); } } // Task completed: synchronously return result or propagate exceptions. return await task.ConfigureAwait(false); } } }
用法:
using (MessageQueue msMq = GetQueue()) { MessagePump pump = MessagePump.Run( msMq, async message => { await Task.Delay(50); Console.WriteLine($"Finished processing message {message.Id}"); }, maxDegreeOfParallelism: 4 ); for (int i = 0; i < 100; i++) { msMq.Send(new Message()); Thread.Sleep(25); } pump.Stop(); await pump.Completion; }
不整齐但functionunit testing:
https://gist.github.com/KirillShlenskiy/7f3e2c4b28b9f940c3da
原始答案
正如我在评论中提到的,在.NET中已经建立了生产者/消费者模式,其中之一就是管道。 一个很好的例子可以在微软自己的Stephen Toub的“并行编程模式”中找到(全文: https : //www.microsoft.com/en-au/download/details.aspx? id = 19222,page 55)。
这个想法很简单:生产者不断地把东西扔进队列,消费者把它拉出来并处理(与生产者并行,可能是另一个)。
下面是一个消息管道的示例,消费者使用同步的阻塞方法在项目到达时处理这些项目(我已根据您的方案对消费者进行了并行化):
void MessageQueueWithBlockingCollection() { // If your processing is continuous and never stops throughout the lifetime of // your application, you can ignore the fact that BlockingCollection is IDisposable. using (BlockingCollection messages = new BlockingCollection ()) { Task producer = Task.Run(() => { try { for (int i = 0; i < 10; i++) { // Hand over the message to the consumer. messages.Add(new Message()); // Simulated arrival delay for the next message. Thread.Sleep(10); } } finally { // Notify consumer that there is no more data. messages.CompleteAdding(); } }); Task consumer = Task.Run(() => { ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = 4 }; Parallel.ForEach(messages.GetConsumingEnumerable(), options, message => { ProcessMessage(message); }); }); Task.WaitAll(producer, consumer); } } void ProcessMessage(Message message) { Thread.Sleep(40); }
上述代码在大约130-140毫秒内完成,这正是您在消费者并行化时所期望的。
现在,在您的场景中,您正在使用Task
和async
/ await
更适合TPL Dataflow(官方Microsoft支持的库,适用于并行和异步序列处理)。
这是一个小演示,展示了您将用于作业的不同类型的TPL数据流处理块:
async Task MessageQueueWithTPLDataflow() { // Set up our queue. BufferBlock queue = new BufferBlock (); // Set up our processing stage (consumer). ExecutionDataflowBlockOptions options = new ExecutionDataflowBlockOptions { CancellationToken = CancellationToken.None, // Plug in your own in case you need to support cancellation. MaxDegreeOfParallelism = 4 }; ActionBlock consumer = new ActionBlock (m => ProcessMessageAsync(m), options); // Link the queue to the consumer. queue.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true }); // Wire up our producer. Task producer = Task.Run(async () => { try { for (int i = 0; i < 10; i++) { queue.Post(new Message()); await Task.Delay(10).ConfigureAwait(false); } } finally { // Signal to the consumer that there are no more items. queue.Complete(); } }); await consumer.Completion.ConfigureAwait(false); } Task ProcessMessageAsync(Message message) { return Task.Delay(40); }
使用MessageQueue
进行上述调整并不难,您可以确保最终结果没有线程问题。 如果我今天/明天有更多时间,我会这样做的。
您有一组要处理的事物。 您为正在处理的事物创建另一个集合(这可能是您的任务对象或某种引用任务的项目)。
只要你有工作要做,你就创建一个循环。 也就是说,工作项目正在等待启动,或者您仍在处理工作项目。
在循环开始时,您可以使用要同时运行的任务来填充活动任务集合,并在添加它们时启动它们。
你让事情运行一段时间(如Thread.Sleep(10);)。
您创建一个内部循环,检查所有已启动的任务是否完成。 如果已完成,则将其删除并报告结果或执行任何合适的操作。
而已。 在下一个回合中,外部循环的上半部分将向正在运行的任务集合添加任务,直到该数字等于您设置的最大值,从而保持您的正在进行的工作集合。
您可能希望在工作线程上执行所有这些操作并监视循环中的取消请求。
.NET中的任务库用于执行并行执行的许多任务。 虽然有多种方法可以限制活动任务的数量,但库本身将根据计算机CPU限制活动任务的数量。
需要回答的第一个问题是为什么需要创建另一个限制? 如果任务库施加的限制是正常的,那么您可以保留创建任务并依赖任务库以良好的性能执行它。
如果没有问题,那么只要您从MSMQ收到消息,只需启动一个处理消息的任务,跳过等待(WhenAll调用),重新开始并等待下一条消息。
您可以使用自定义任务计划程序来限制并发任务的数量。 有关MSDN的更多信息: https : //msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler%28v=vs.110%29.aspx 。
我的同事想出了下面的解决方案。 此解决方案有效,但我会在Code Review上审核此代码。
根据给出的答案和我们自己的一些研究,我们找到了解决方案。 我们使用SemaphoreSlim
来限制并行任务的数量。
static string queue = @".\Private$\concurrenttest"; private static async Task Process(CancellationToken token) { MessageQueue msMq = new MessageQueue(queue); msMq.Formatter = new XmlMessageFormatter(new Type[] { typeof(Command1) }); SemaphoreSlim s = new SemaphoreSlim(15, 15); while (true) { await s.WaitAsync(); await PeekAsync(msMq); Command1 message = await ReceiveAsync(msMq); Task.Run(async () => { try { await HandleMessage(message); } catch (Exception) { // Exception handling } s.Release(); }); } } private static Task HandleMessage(Command1 message) { Console.WriteLine("id: " + message.id + ", name: " + message.name); Thread.Sleep(1000); return Task.FromResult(1); } private static Task PeekAsync(MessageQueue msMq) { return Task.Factory.FromAsync (msMq.BeginPeek(), msMq.EndPeek); } public class Command1 { public int id { get; set; } public string name { get; set; } } private static async Task ReceiveAsync(MessageQueue msMq) { var receiveAsync = await Task.Factory.FromAsync(msMq.BeginReceive(), msMq.EndPeek); return (Command1)receiveAsync.Body; }
您应该考虑使用Microsoft的Reactive Framework。
您的代码可能如下所示:
var query = from command1 in FromQueue(queue) from text in Observable.Start(() => { Thread.Sleep(1000); return "id: " + command1.id + ", name: " + command1.name; }) select text; var subscription = query .Subscribe(text => Console.WriteLine(text));
这将并行执行所有处理,并确保处理在所有核心上正确分布。 当一个值结束时另一个值开始。
要取消订阅,只需调用subscription.Dispose()
。
FromQueue
的代码是:
static IObservable FromQueue (string serverQueue) { return Observable.Create (observer => { var responseQueue = Environment.MachineName + "\\Private$\\" + Guid.NewGuid().ToString(); var queue = MessageQueue.Create(responseQueue); var frm = new System.Messaging.BinaryMessageFormatter(); var srv = new MessageQueue(serverQueue); srv.Formatter = frm; queue.Formatter = frm; srv.Send("S " + responseQueue); var loop = NewThreadScheduler.Default.ScheduleLongRunning(cancel => { while (!cancel.IsDisposed) { var msg = queue.Receive(); observer.OnNext((T)msg.Body); } }); return new CompositeDisposable( loop, Disposable.Create(() => { srv.Send("D " + responseQueue); MessageQueue.Delete(responseQueue); }) ); }); }
只需NuGet“Rx-Main”即可得到这些位。
为了限制并发性,您可以这样做:
int maxConcurrent = 2; var query = FromQueue(queue) .Select(command1 => Observable.Start(() => { Thread.Sleep(1000); return "id: " + command1.id + ", name: " + command1.name; })) .Merge(maxConcurrent);