如何创建自定义消息泵?

我正在做一个小练习,我需要创建一个类似于消息泵的东西。 我所拥有的是一个工作队列,我希望工作完全在一个线程上完成,而任何线程都可以将工作添加到队列中。

Queue queue; 

线程使用等待句柄告诉泵有工作要做。

 WaitHandle signal; 

只要有工作要做,泵就会循环,然后等待信号再次启动。

 while(ApplicationIsRunning){ while(queue.HasWork){ DoWork(queue.NextWorkItem) } signal.Reset(); signal.WaitOne(); } 

每个其他线程都可以向队列添加工作并发出等待句柄的信号……

 public void AddWork(WorkToDo work){ queue.Add(work); signal.Set(); } 

问题是,如果工作添加得足够快,就会出现工作可以留在队列中的情况,因为队列检查工作和WaitHandle重置之间,另一个线程可以将工作添加到队列中。

如果不在WaitHandle周围放置昂贵的互斥量,我将如何减轻这种情况?

您可以使用BlockingCollection轻松地实现队列,因为它将为您处理同步:

 public class MessagePump { private BlockingCollection actions = new BlockingCollection(); public void Run() //you may want to restrict this so that only one caller from one thread is running messages { foreach (var action in actions.GetConsumingEnumerable()) action(); } public void AddWork(Action action) { actions.Add(action); } public void Stop() { actions.CompleteAdding(); } } 

你不应该做一个完整的互斥,但你可以发一个锁定声明

 public void AddWork(WorkToDo work) { queue.Add(work); lock(lockingObject) { signal.Set(); } } 

使用任何你想要的锁定对象,大多数人会说使用信号本身是一个坏主意。

响应下面的@ 500 – 服务器内部错误评论,您可以在完成工作之前重置信号。 以下应该保护的东西:

 while(ApplicationIsRunning) { while(queue.HasWork) { WorkItem wi; lock(lockingObject) { wi = queue.NextWorkItem; if(!queue.HasWork) { signal.Reset(); } } DoWork(wi) } signal.WaitOne(); } 

这样,如果您有更多工作,内部队列将继续运行。 如果没有,它会掉到signal.WaitOne() ,我们只有在没有更多的工作已经排队的情况下才会重置。

这里唯一的缺点是,如果在DoWork执行时工作进入,我们可能会连续多次重置。

您可以使用WaitOne(TimeSpan)方法,以便拥有混合信号/轮询循环。 基本上指定最多等待1秒的时间跨度。 这将导致一个任务陷入该比赛最多持续一秒(或您指定的任何轮询时间)或直到另一个任务添加到您的队列。