如何实现自己的高级生产者/消费者场景?


注意:
我对我的问题进行了彻底的修改。 您可以通过更改历史记录查看原始问题。


我需要一个“强大”的队列,它提供以下function:

  • 我有一组对象的特定范围。 这意味着A 组,B组 ……将拥有自己的队列
  • 我在组范围的线程中填充队列线程A (生产者)
  • 我正在读取组范围的线程中的队列线程B (消费者)

所以我将有以下场景:

  1. 队列中有没有项目(因为作业是用空的“targetgroup”调用的): 线程B应该逃脱循环
  2. 队列中当前没有项目,因为线程A正在处理要排队的项目: 线程B应该等待
  3. 队列中有项目: 线程B应该能够出列并处理该项目
  4. 队列中没有项目,因为线程A没有更多要排队的项目: 线程B应该转义循环

现在我想出了以下实现:

public class MightyQueue where T : class { private readonly Queue _queue = new Queue(); private bool? _runable; private volatile bool _completed; public bool Runable { get { while (!this._runable.HasValue) { Thread.Sleep(100); } return this._runable ?? false; } set { this._runable = value; } } public void Enqueue(T item) { if (item == null) { throw new ArgumentNullException("item"); } this._queue.Enqueue(item); } public void CompleteAdding() { this._completed = true; } public bool TryDequeue(out T item) { if (!this.Runable) { item = null; return false; } while (this._queue.Count == 0) { if (this._completed) { item = null; return false; } Thread.Sleep(100); } item = this._queue.Dequeue(); return true; } } 

然后将使用

制片人

 if (anythingToWorkOn) { myFooMightyQueueInstance.Runable = false; } else { myFooMightyQueueInstance.Runable = true; while (condition) { myFooMightyQueueInstance.Enqueue(item); } myFooMightyQueueInstance.CompleteAdding(); } 

消费者

 if (!myFooMightyQueueInstance.Runable) { return; } T item; while (myFooMightyQueueInstance.TryDequeue(out item)) { //work with item } 

但我相信,这种方法是错误的,因为我在那里使用一些Thread.Sleep() stuff(不能有一些waitHandle或其他东西?)…我不是关于算法本身…有谁能请帮帮我?

您应该从通用的Producer-Consumer队列开始并使用它。 在队列中实现这个并不是一个好主意,因为这会阻止您使用信号量来表示线程(或者,您可以在队列中使用公共信号量,但这是一个非常糟糕的主意)。

一旦线程A将单个工作项排入队列,它就必须发信号通知线程B.当线程B处理完所有项目时,它应该发出信号通知信号通知其他人已完成。 你的主线程应该等待第二个信号量知道一切都已完成。

[编辑]

首先,你有一个生产者和一个消费者:

 public interface IProducer : IStoppable { ///  /// Notifies clients when a new item is produced. ///  event EventHandler> ItemProduced; } public interface IConsumer : IStoppable { ///  /// Performs processing of the specified item. ///  /// The item. void ConsumeItem(T item); } public interface IStoppable { void Stop(); } 

因此,在您的情况下,创建邮件的类将需要触发ItemProduced事件,发送它的类将需要实现ConsumeItem

然后将这两个实例传递给Worker的实例:

 public class Worker { private readonly Object _lock = new Object(); private readonly Queue _queuedItems = new Queue(); private readonly AutoResetEvent _itemReadyEvt = new AutoResetEvent(false); private readonly IProducer _producer; private readonly IConsumer _consumer; private volatile bool _ending = false; private Thread _workerThread; public Worker(IProducer producer, IConsumer consumer) { _producer = producer; _consumer = consumer; } public void Start(ThreadPriority priority) { _producer.ItemProduced += Producer_ItemProduced; _ending = false; // start a new thread _workerThread = new Thread(new ThreadStart(WorkerLoop)); _workerThread.IsBackground = true; _workerThread.Priority = priority; _workerThread.Start(); } public void Stop() { _producer.ItemProduced -= Producer_ItemProduced; _ending = true; // signal the consumer, in case it is idle _itemReadyEvt.Set(); _workerThread.Join(); } private void Producer_ItemProduced (object sender, ProducedItemEventArgs e) { lock (_lock) { _queuedItems.Enqueue(e.Item); } // notify consumer thread _itemReadyEvt.Set(); } private void WorkerLoop() { while (!_ending) { _itemReadyEvt.WaitOne(-1, false); T singleItem = default(T); lock (_lock) { if (_queuedItems.Count > 0) { singleItem = _queuedItems.Dequeue(); } } while (singleItem != null) { try { _consumer.ConsumeItem(singleItem); } catch (Exception ex) { // handle exception, fire an event // or something. Otherwise this // worker thread will die and you // will have no idea what happened } lock (_lock) { if (_queuedItems.Count > 0) { singleItem = _queuedItems.Dequeue(); } } } } } // WorkerLoop } // Worker 

这是一般的想法,可能需要一些额外的调整。

要使用它,您需要让您的类实现这两个接口:

 IProducer mailCreator = new MailCreator(); IConsumer mailSender = new MailSender(); Worker worker = new Worker(mailCreator, mailSender); worker.Start(); // produce an item - worker will add it to the // queue and signal the background thread mailCreator.CreateSomeMail(); // following line will block this (calling) thread // until all items are consumed worker.Stop(); 

关于这一点的好处是:

  • 你可以拥有自己喜欢的工人
  • 多个工人可以接受来自同一生产者的物品
  • 多个工作人员可以将项目分派给同一个使用者(虽然这意味着你需要采取以线程安全的方式实现消费者的情况)

如果您有.Net 4.0,请使用BlockingCollection 。 它通过CompleteAdding方法处理所有混乱,包括最后一点。

如果你有一个早期的.Net,那么升级(也就是说,我太懒了,无法解释如何实现已经为你完成的事情。)

编辑:我认为你的问题根本不保证线程。 只需提前创建所有电子邮件,然后睡到指定时间。

我写了一个简单的例子,对我来说很好,应该适合你的场景。 如果消费者正在运行取决于运行变量的设置方式,但您可以轻松地将其修改为更复杂的条件,例如“如果没有邮件但有人说我应该等待更多”。

 public class MailSystem { private readonly Queue mailQueue = new Queue(); private bool running; private Thread consumerThread; public static void Main(string[] args) { MailSystem mailSystem = new MailSystem(); mailSystem.StartSystem(); } public void StartSystem() { // init consumer running = true; consumerThread = new Thread(ProcessMails); consumerThread.Start(); // add some mails mailQueue.Enqueue(new Mail("Mail 1")); mailQueue.Enqueue(new Mail("Mail 2")); mailQueue.Enqueue(new Mail("Mail 3")); mailQueue.Enqueue(new Mail("Mail 4")); Console.WriteLine("producer finished, hit enter to stop consumer"); // wait for user interaction Console.ReadLine(); // exit the consumer running = false; Console.WriteLine("exited"); } private void ProcessMails() { while (running) { if (mailQueue.Count > 0) { Mail mail = mailQueue.Dequeue(); Console.WriteLine(mail.Text); Thread.Sleep(2000); } } } } internal class Mail { public string Text { get; set; } public Mail(string text) { Text = text; } } 

你想要什么可以使用conditionvariables。 我将编写一个伪代码示例,不应该太难实现。

一个线程有以下几点:

 while(run) compose message conditionvariable.lock() add message to queue conditionvariable.notifyOne() conditionvariable.release() 

而另一个线程沿着这些方向有所作为

 while(threadsafe_do_run()) while threadsafe_queue_empty() conditionvariable.wait() msg = queue.pop() if msg == "die" set_run(false) conditionvariable.release() send msg 

因此,如果你没有得到任何消息推送消息。 处理完所有消息后也是如此。

do_run()和queue_empty()应该线程安全地检查它们的东西,使用适当的锁。

wait()在调用notifyOne()时返回,然后队列有一个要发送的消息。 在大多数框架中,conditionvariable已经有了锁,你可能需要在.NET中自己添加lock语句。

我会使用一个线程来完成整个过程。 那就是生成邮件正文并发送。 仅仅因为生成邮件正文不会花费时间,但发送电子邮件会。

此外,如果您使用的是Windows附带的SMTP服务器,那么您只需将电子邮件放入队列文件夹中,smtp服务器就会负责发送电子邮件。

因此,您可以启动多个线程(保持数字上限)每个线程完成它的工作。 如果您正在处理一组作业(数据),那么您可以将数据并行化(即将数据拆分为与系统核心数量相匹配的块,例如拍摄作业(线程)。

使用任务将使所有这一切变得更加简单,即两个线程发送一个电子邮件或一个线程来完成整个作业,但使用多个线程并行执行多个作业。