上升的事件没有阻塞并以正确的顺序接收事件

这首先需要一些解释。 有一个工作线程必须引发一些事件:

Task.Run(() => { for(int i = 0; i < 123456789; i++) { ... // some job OnSomeEvent(i); } }); 

同步上升事件将阻止作业,直到所有事件处理程序完成:

 void OnSomeEvent(int i) => SomeEvent?.Invoke(this, new SomeEventArgs(i)); 

异步事件上升不会再阻止工作了(耶!)

 void OnSomeEvent(int i) => Task.Run(() => SomeEvent?.Invoke(this, new SomeEventArgs(i))); 

但现在还有另一个问题:未按正确顺序收到事件:

 OnSomeEvent(1); OnSomeEvent(2); OnSomeEvent(3); ... // event handler SomeEvent += (s, e) => Console.WriteLine(eI); // possible output 1 3 2 

问题:如何实现以正确顺序发生的异步事件上升?

最近我了解了Dispatcher.InvokeAsync 使用队列的内容 。 看起来我必须做类似的事情。 如果我必须这样做:1)它应该是调用者的工作还是2)我是否应该同步保持事件上升,接收者必须组织生产者/消费者以防止阻塞工作? 或许还有另一种方式?

PS:这与ContinueWhith无关..除非存储任务列表是一个合适的解决方案。 我关注的是如何实现发射并忘记事件,其中:a)呼叫者未被阻止2)事件以相同的顺序被接收。

PPS:我不知道如何让MCVE重现这个问题。 它出现在具有大量UI,大量线程等的真实项目中。

您可以使用以下TaskQueue将异步操作添加到队列,以便在队列中的上一项完成时启动每个操作:

 public class TaskQueue { private Task previous = Task.FromResult(false); private object key = new object(); public Task Enqueue(Func> taskGenerator) { lock (key) { var next = previous.ContinueWith(t => taskGenerator()).Unwrap(); previous = next; return next; } } public Task Enqueue(Func taskGenerator) { lock (key) { var next = previous.ContinueWith(t => taskGenerator()).Unwrap(); previous = next; return next; } } } 

这允许你写:

 private TaskQueue taskQueue = new TaskQueue(); private void OnSomeEvent(int i) => taskQueue.Enqueue(() => Task.Run(() => SomeEvent?.Invoke(this, new SomeEventArgs(i)))); 

您可以使用TPL DataflowActionBlock队列来维护事件队列。

您将按如下方式创建队列:

 queue = new ActionBlock(item => SomeEvent?.Invoke(item)); 

然后你将事件添加到队列中,如下所示:

 queue.Post(new SomeEventArgs(value)); 

当不再需要队列时,您可以这样做:

 queue.Complete(); 

之后,如果您需要等待处理队列中的任何项目,您可以执行以下操作:

 queue.Completion.Wait(); 

但是,请注意, queue.Completion实际上是一个Task因此您经常将它与await一起使用。

这是一个完整的示例,显示了一种方法(它不会一直保持线程处于事件队列中):

 using System; using System.Threading; using System.Threading.Tasks.Dataflow; namespace Demo { public class SomeEventArgs : EventArgs { public SomeEventArgs(int value) { Value = value; } public int Value { get; } } internal class Program { public delegate void SomeEventHandler(SomeEventArgs e); public event SomeEventHandler SomeEvent; ActionBlock queue; private void run() { queue = new ActionBlock(item => SomeEvent?.Invoke(item)); // Subscribe to my own event (this just for demonstration purposes!) this.SomeEvent += Program_SomeEvent; // Raise 100 events. for (int i = 0; i < 100; ++i) { OnSomeEvent(i); Console.WriteLine("Raised event " + i); } Console.WriteLine("Signalling that queue is complete."); queue.Complete(); Console.WriteLine("Waiting for queue to be processed."); queue.Completion.Wait(); Console.WriteLine("Done."); } private void Program_SomeEvent(SomeEventArgs e) { Console.WriteLine("Handled " + e.Value); Thread.Sleep(1); // Simulate load. } private void OnSomeEvent(int value) { queue.Post(new SomeEventArgs(value)); } private static void Main() { new Program().run(); } } }