如何异步处理消息,在处理时丢弃任何新消息?

我有一个C#应用程序订阅我们的消息传递系统上的主题以进行价值更新。 当一个新值出现时,我会进行一些处理,然后继续。 问题是,更新可以比应用程序处理它们更快。 我想要做的就是坚持最新的价值,所以我不想要一个队列。 例如,源发布值“1”,我的应用程序接收它; 在处理时,源在我的应用程序完成处理之前发布序列(2,3,4,5); 我的应用程序然后处理值“5”,抛弃先前的值。

由于它是基于专有的消息传递库,所以很难发布一个有效的代码示例,但我认为这是一个常见的模式,我只是无法弄清楚它的名称是什么……似乎处理函数必须运行在一个单独的线程上,而不是消息回调,但我不知道如何组织它,例如如何通知该线程的值更改。 关于我需要做什么的一般提示?

一个非常简单的方法可能是这样的:

private IMessage _next; public void ReceiveMessage(IMessage message) { Interlocked.Exchange(ref _next, message); } public void Process() { IMessage next = Interlocked.Exchange(ref _next, null); if (next != null) { //... } } 

一般来说,使用消息传递系统来防止丢失消息。 我对解决方案的初步反应是接收入站数据的线程,该入口数据尝试将其传递给您的处理线程,如果处理线程已经在运行,那么您将丢弃数据并等待下一个元素并重复。

显然,消息传递库的设计可以影响处理此问题的最佳方法。 我过去如何使用类似function的库来完成它,我有一个侦听事件的线程,并将它们放入一个队列,然后我让Threadpool工作人员将消息出列并处理它们。

您可以阅读multithreading异步作业队列:

Mutlithreaded Job Queue

工作队列线程

一种简单的方法是使用成员变量来保存最后接收的值,并用锁包装它。 另一种方法是将传入值压入堆栈。 当你准备好一个新值时,调用Stack.Pop()然后调用Stack.Clear():

 public static class Incoming { private static object locker = new object(); private static object lastMessage = null; public static object GetMessage() { lock (locker) { object tempMessage = lastMessage; lastMessage = null; return tempMessage; } } public static void SetMessage(object messageArg) { lock (locker) { lastMessage = messageArg; } } private static Stack messageStack = new Stack(); public static object GetMessageStack() { lock (locker) { object tempMessage = messageStack.Count > 0 ? messageStack.Pop() : null; messageStack.Clear(); return tempMessage; } } public static void SetMessageStack(object messageArg) { lock (locker) { messageStack.Push(messageArg); } } } 

将处理函数放在单独的线程上是个好主意。 要么使用处理线程中的回调方法来表示它已准备好接收另一条消息,要么让它发出信号表明已完成,然后让主线程在收到消息时启动新的处理器线程(通过上面的SetMessage …) 。

这不是“模式”,但您可以使用共享数据结构来保存该值。 如果从消息传递库中只收到一个值,那么一个简单的对象就可以。 否则,您可以使用哈希表来存储多个消息值(如果需要)。

例如,在消息接收线程上:当消息进入时,使用其值添加/更新数据结构。 在线程方面,您可以定期检查此数据结构以确保您仍具有相同的值。 如果不这样做,则丢弃已完成的任何处理,并使用新值重新处理。

当然,您需要确保数据结构在线程之间正确同步。