通过工作线程的异步接口使用WCF服务,如何确保从客户端“按顺序”发送事件

我正在编写一个Silverlight类库来抽象到WCF服务的接口。 WCF服务提供集中式日志记录服务。 Silverlight类库为日志记录提供了简化的类似log4net的接口(logger.Info,logger.Warn等)。 从类库我计划提供选项,以便记录的消息可以在客户端上累积并以“突发”forms发送到WCF日志记录服务,而不是在发生时发送每条消息。 一般来说,这很有效。 类库确实累积了消息,它确实将消息集合发送到WCF日志记录服务,在这里它们由底层日志记录框架记录。

我目前的问题是消息(来自具有单个线程的单个客户端 – 所有日志记录代码都在按钮单击事件中)在日志服务中变得交错。 我意识到至少部分原因可能是由于WCF日志服务的实例化(PerCall)或同步。 但是,似乎我的消息发生得如此迅速,以至于异步调用中留下的消息的“爆发”实际上是以与生成它们不同的顺序“离开”客户端。

我试图设置一个生产者消费者队列,如此处所描述的那样稍微(或应该是“轻微的”空中引号)改变Work方法阻塞(WaitOne)直到异步调用返回(即直到执行异步回调) 。 这个想法是当一个消息突发发送到WCF日志记录服务时,队列应该等到该突发被处理后才发送下一个突发。

也许我想做的事情是不可行的,或者我正在努力解决错误的问题,(或者我可能只是不知道我在做什么!)。

无论如何,这是我的生产者/消费者队列代码:

internal class ProducerConsumerQueue : IDisposable { EventWaitHandle wh = new AutoResetEvent(false); Thread worker; readonly object locker = new object(); Queue<ObservableCollection> logEventQueue = new Queue<ObservableCollection>(); LoggingService.ILoggingService loggingService; internal ProducerConsumerQueue(LoggingService.ILoggingService loggingService) { this.loggingService = loggingService; worker = new Thread(Work); worker.Start(); } internal void EnqueueLogEvents(ObservableCollection logEvents) { //Queue the next burst of messages lock(locker) { logEventQueue.Enqueue(logEvents); //Is this Set conflicting with the WaitOne on the async call in Work? wh.Set(); } } private void Work() { while(true) { ObservableCollection events = null; lock(locker) { if (logEventQueue.Count > 0) { events = logEventQueue.Dequeue(); if (events == null || events.Count == 0) return; } } if (events != null && events.Count > 0) { System.Diagnostics.Debug.WriteLine("1. Work - Sending {0} events", events.Count); // // This seems to be the key... // Send one burst of messages via an async call and wait until the async call completes. // loggingService.BeginLogEvents(events, ar => { try { loggingService.EndLogEvents(ar); System.Diagnostics.Debug.WriteLine("3. Work - Back"); wh.Set(); } catch (Exception ex) { } }, null); System.Diagnostics.Debug.WriteLine("2. Work - Waiting"); wh.WaitOne(); System.Diagnostics.Debug.WriteLine("4. Work - Finished"); } else { wh.WaitOne(); } } } #region IDisposable Members public void Dispose() { EnqueueLogEvents(null); worker.Join(); wh.Close(); } #endregion } 

在我的测试中它基本上被称为:

 //Inside of LogManager, get the LoggingService and set up the queue. ILoggingService loggingService = GetTheLoggingService(); ProducerConsumerQueue loggingQueue = new ProducerConsumerQueue(loggingService); //Inside of client code, get a logger and log with it ILog logger = LogManager.GetLogger("test"); for (int i = 0; i < 100; i++) { logger.InfoFormat("logging message [{0}]", i); } 

在内部,logger / LogManager在将该组消息添加到队列之前累积了一些日志消息(比如25)。 像这样的东西:

 internal void AddNewMessage(string message) { lock(logMessages) { logMessages.Add(message); if (logMessages.Count >= 25) { ObservableCollection messages = new ObservableCollection(logMessages); logMessages.Clear(); loggingQueue.EnqueueLogEvents(messages); } } } 

因此,在这种情况下,我希望每次有4条连发25条消息。 根据我的ProducerConsumerQueue代码中的Debug语句(可能不是调试它的最佳方法?),我希望看到类似这样的东西:

  1. 工作 – 发送25个活动
  2. 工作 – 等待
  3. 工作 – 返回
  4. 工作 – 完成

重复4次。

相反,我看到这样的事情:

* 1。 工作 – 发送25个活动

* 2。 工作 – 等待

* 4。 工作 – 完成

* 1。 工作 – 发送25个活动

* 2。 工作 – 等待

* 3。 工作 – 返回

* 4。 工作 – 完成

* 1。 工作 – 发送25个活动

* 2。 工作 – 等待

* 3。 工作 – 返回

* 4。 工作 – 完成

* 1。 工作 – 发送25个活动

* 2。 工作 – 等待

* 3。 工作 – 返回

* 3。 工作 – 返回

* 4。 工作 – 完成

(添加了前导*以便这些行不会被SO自动编号)

我想我会预料到,队列会允许添加多个突发消息,但它会在处理下一个突发之前完全处理一个突发(等待acync调用完成)。 它似乎没有这样做。 它似乎没有可靠地等待完成异步调用。 我确实调用了EnqueueLogEvents中的EnqueueLogEvents ,也许是从Work方法中取消了WaitOne

所以,我有几个问题:1。我对我想要完成的事情的解释是否有意义(我的解释清楚,不是一个好主意或不是)?

  1. 我正在尝试(从客户端传输 – 来自单个线程的消息,按它们发生的顺序,一次完全处理一组消息)一个好主意?

  2. 我接近了吗?

  3. 可以吗?

  4. 它应该完成吗?

谢谢你的帮助!

[编辑]经过更多的调查并感谢Brian的建议,我们得以实现这一目标。 我复制了修改后的代码。 关键是我们现在严格使用“wh”等待句柄来生成ProducerConsumerQueue函数。 我们现在正在等待由BeginLogEvents调用返回的res.AsyncWaitHandle,而不是使用wh来等待异步调用完成。

  internal class LoggingQueue : IDisposable { EventWaitHandle wh = new AutoResetEvent(false); Thread worker; readonly object locker = new object(); bool working = false; Queue<ObservableCollection> logEventQueue = new Queue<ObservableCollection>(); LoggingService.ILoggingService loggingService; internal LoggingQueue(LoggingService.ILoggingService loggingService) { this.loggingService = loggingService; worker = new Thread(Work); worker.Start(); } internal void EnqueueLogEvents(ObservableCollection logEvents) { lock (locker) { logEventQueue.Enqueue(logEvents); //System.Diagnostics.Debug.WriteLine("EnqueueLogEvents calling Set"); wh.Set(); } } private void Work() { while (true) { ObservableCollection events = null; lock (locker) { if (logEventQueue.Count > 0) { events = logEventQueue.Dequeue(); if (events == null || events.Count == 0) return; } } if (events != null && events.Count > 0) { //System.Diagnostics.Debug.WriteLine("1. Work - Sending {0} events", events.Count); IAsyncResult res = loggingService.BeginLogEvents(events, ar => { try { loggingService.EndLogEvents(ar); //System.Diagnostics.Debug.WriteLine("3. Work - Back"); } catch (Exception ex) { } }, null); //System.Diagnostics.Debug.WriteLine("2. Work - Waiting"); // Block until async call returns. We are doing this so that we can be sure that all logging messages // are sent FROM the client in the order they were generated. ALSO, we don't want interleave blocks of logging // messages from the same client by sending a new block of messages before the previous block has been // completely processed. res.AsyncWaitHandle.WaitOne(); //System.Diagnostics.Debug.WriteLine("4. Work - Finished"); } else { wh.WaitOne(); } } } #region IDisposable Members public void Dispose() { EnqueueLogEvents(null); worker.Join(); wh.Close(); } #endregion } 

正如我在最初的问题和我对Jon和Brian的评论中所提到的,我仍然不知道是否所有这些工作都是一个好主意,但至少代码做了我想要它做的事情。 这意味着我至少可以选择以这种方式或其他方式(例如在事后恢复顺序)而不是没有选择。

您获得这种排序的原因是因为您尝试使用生产者 – 使用者队列用于不同目的的相同等待句柄。 这将导致各种混乱。 在某些时候,事情会变得越来越糟,队列最终会被锁定。 你真的应该创建一个单独的WaitHandle来等待日志服务的完成。 或者,如果BeginLoggingEvents符合标准模式,它将返回包含WaitHandleIAsyncResult ,您可以使用它而不是创建自己的WaitHandle

作为旁注,我真的不喜欢Albarahi网站上提供的生产者 – 消费者模式。 问题是它对多个消费者来说是不安全的(显然这对你来说并不重要)。 我说得非常尊重,因为我认为他的网站是multithreading编程的最佳资源之一。 如果您可以使用BlockingCollection ,则使用它。

我可以建议所有这些协调有一个简单的替代方案吗? 使用便宜的单调递增ID(例如,使用Interlocked.Increment() )来生成序列,这样无论客户端或服务器发生什么顺序,您都可以在以后重新生成原始顺序。

这应该让你高效灵活,无需等待确认就可以异步发送任何你想要的东西,但不会丢失顺序。

显然,这意味着ID(或可能是保证唯一的时间戳字段)需要成为您的WCF服务的一部分,但如果您控制两端应该相当简单。