具有multithreading或任务的进程队列

我有一个电话消息应用程序,其中有许多消息要处理。因为电话端口是有限的,所以消息将先进行处理。 每条消息都有一个标记’Acknowledge’,表示处理的是哪一个。 当然它被初始化为假。

我想将所有消息放入队列,然后使用多个线程或任务处理它们。

public class MessageQueue { public Queue MessageWorkItem { get; set; } public Messages Message { get; set; } public MessageQueue() { MessageWorkItem = new Queue(); Message = new Messages(); } public void GetMessageMetaData() { try { // It is just a test, add only one item into the queue Message.MessageID = Guid.NewGuid(); Message.NumberToCall = "1111111111"; Message.FacilityID = "3333"; Message.NumberToDial = "2222222222"; Message.CountryCode = "1"; Message.Acknowledge = false; } catch (Exception ex) { } } public void AddingItemToQueue() { GetMessageMetaData(); if (!Message.Acknowledge) { lock (MessageWorkItem) { MessageWorkItem.Enqueue(Message); } } } } public class Messages { public Guid MessageID { get; set; } public string NumberToCall { get; set; } public string FacilityID { get; set; } public string NumberToDial { get; set; } public string CountryCode { get; set; } public bool Acknowledge { get; set; } } 

现在我的问题是如何使用multithreading将项目从队列中出列。 对于队列中的每个项目,我想运行一个脚本。

  public void RunScript(Message item) { try { PlayMessage(item); return; } catch (HangupException hex) { Log.WriteWithId("Caller Hungup!", hex.Message); } catch (Exception ex) { Log.WriteException(ex, "Unexpected exception: {0}"); } } 

我的想法是,如果

if(MessageWorkItem.Count> = 1)然后做一些事情,但我确实需要代码帮助。

如果您可以使用.Net 4.5,我建议您从任务并行库(TPL)查看Dataflow 。

该页面引出了许多示例演练,例如如何:实现生产者 – 消费者数据流模式和演练:在Windows窗体应用程序中使用数据流 。

查看该文档,看看它是否对您有所帮助。 这需要相当多,但我认为这可能是你最好的方法。

或者,您可以考虑使用BlockingCollection及其GetConsumingEnumerable()方法来访问队列中的项目。

您所做的是将工作拆分为要以某种方式处理的对象,并使用BlockingCollection来管理队列。

使用ints而不是对象作为工作项的一些示例代码将有助于演示:

当工作线程完成其当前项目时,它将从工作队列中删除新项目,处理该项目,然后将其添加到输出队列。

单独的使用者线程从输出队列中删除已完成的项目并对其执行某些操作。

最后,我们必须等待所有工作者完成(Task.WaitAll(workers)),然后才能将输出队列标记为已完成(outputQueue.CompleteAdding())。

 using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; namespace Demo { class Program { static void Main(string[] args) { new Program().run(); } void run() { int threadCount = 4; Task[] workers = new Task[threadCount]; Task.Factory.StartNew(consumer); for (int i = 0; i < threadCount; ++i) { int workerId = i; Task task = new Task(() => worker(workerId)); workers[i] = task; task.Start(); } for (int i = 0; i < 100; ++i) { Console.WriteLine("Queueing work item {0}", i); inputQueue.Add(i); Thread.Sleep(50); } Console.WriteLine("Stopping adding."); inputQueue.CompleteAdding(); Task.WaitAll(workers); outputQueue.CompleteAdding(); Console.WriteLine("Done."); Console.ReadLine(); } void worker(int workerId) { Console.WriteLine("Worker {0} is starting.", workerId); foreach (var workItem in inputQueue.GetConsumingEnumerable()) { Console.WriteLine("Worker {0} is processing item {1}", workerId, workItem); Thread.Sleep(100); // Simulate work. outputQueue.Add(workItem); // Output completed item. } Console.WriteLine("Worker {0} is stopping.", workerId); } void consumer() { Console.WriteLine("Consumer is starting."); foreach (var workItem in outputQueue.GetConsumingEnumerable()) { Console.WriteLine("Consumer is using item {0}", workItem); Thread.Sleep(25); } Console.WriteLine("Consumer is finished."); } BlockingCollection inputQueue = new BlockingCollection(); BlockingCollection outputQueue = new BlockingCollection(); } } 

来自TPL的 Parallel.ForEach 。 每个都是平行的。

示例(将MessageWorkItem更改为通用队列):

  public class MessageQueue { public Queue MessageWorkItem { get; set; } public MessageQueue() { MessageWorkItem = new Queue(); } public Message GetMessageMetaData() { try { // It is just a test, add only one item into the queue return new Message() { MessageID = Guid.NewGuid(), NumberToCall = "1111111111", FacilityID = "3333", NumberToDial = "2222222222", CountryCode = "1", Acknowledge = false }; } catch (Exception ex) { return null; } } public void AddingItemToQueue() { var message = GetMessageMetaData(); if (!message.Acknowledge) { lock (MessageWorkItem) { MessageWorkItem.Enqueue(message); } } } } public class Message { public Guid MessageID { get; set; } public string NumberToCall { get; set; } public string FacilityID { get; set; } public string NumberToDial { get; set; } public string CountryCode { get; set; } public bool Acknowledge { get; set; } } class Program { static void Main(string[] args) { MessageQueue me = new MessageQueue(); for (int i = 0; i < 10000; i++) me.AddingItemToQueue(); Console.WriteLine(me.MessageWorkItem.Count); Parallel.ForEach(me.MessageWorkItem, RunScript); } static void RunScript(Message item) { // todo: ... Console.WriteLine(item.MessageID); Thread.Sleep(300); } }