非阻塞和重复发生的生产者/消费者通知者实施

搜索了一段代码,它做了我想要的,我很满意。 读这个 , 这有很大帮助。

我有一个场景,我需要单个消费者在新数据可用时通知单个生产者,但也希望定期通知消费者,无论新数据是否可用。 如果通知消费者的时间超过了重复发生的时间,那就没有问题,但不应该频繁通知消费者。

当消费者已经通知并正在工作时,可能会发生“新数据”的多个通知。 (所以SemaphoreSlim不太适合)。 因此,比生产者通知速率慢的消费者不会排队后续通知,他们只是“重新发信号通知”相同的“数据可用”标志而没有影响。

我还希望消费者异步等待通知(不阻塞线程)。

我已经将下面的类包裹在一起,它包含了TaskCompletionSource并且还使用了一个内部Timer。

 public class PeriodicalNotifier : IDisposable { // Need some dummy type since TaskCompletionSource has only the generic version internal struct VoidTypeStruct { } // Always reuse this allocation private static VoidTypeStruct dummyStruct; private TaskCompletionSource internalCompletionSource; private Timer reSendTimer; public PeriodicalNotifier(int autoNotifyIntervalMs) { internalCompletionSource = new TaskCompletionSource(); reSendTimer = new Timer(_ => Notify(), null, 0, autoNotifyIntervalMs); } public async Task WaitForNotifictionAsync(CancellationToken cancellationToken) { using (cancellationToken.Register(() => internalCompletionSource.TrySetCanceled())) { await internalCompletionSource.Task; // Recreate - to be able to set again upon the next wait internalCompletionSource = new TaskCompletionSource(); } } public void Notify() { internalCompletionSource.TrySetResult(dummyStruct); } public void Dispose() { reSendTimer.Dispose(); internalCompletionSource.TrySetCanceled(); } } 

这个类的用户可以这样做:

 private PeriodicalNotifier notifier = new PeriodicalNotifier(100); // ... In some task - which should be non-blocking while (some condition) { await notifier.WaitForNotifictionAsync(_tokenSource.Token); // Do some work... } // ... In some thread, producer added new data notifier.Notify(); 

效率对我来说很重要,场景是高频数据流,所以我想到了:

  • 等待的非阻塞性质。
  • 我假设Timer比重新创建Task.Delay更有效,如果它不是要通知的,则取消它。
  • 关注TaskCompletionSource的重新TaskCompletionSource

我的问题是:

  1. 我的代码是否正确解决了问题? 任何隐藏的陷阱?
  2. 我错过了这个用例的一些简单的解决方案/现有的块吗?

更新:

我得出的结论是,除了重新实现更精益的任务完成结构(就像在这里和这里 ),我没有更多的优化。 希望能帮助任何人看到类似的场景。

  1. 是的,您的实现是有意义的,但TaskCompletionSource重新创建应该在使用范围之外,否则“旧”取消标记可能会取消“新” TaskCompletionSource
  2. 我认为使用某种AsyncManualResetEvent结合Timer会更简单,更不容易出错。 Microsoft提供的Visual Studio SDK中有一个非常好的命名空间和异步工具。 您需要安装SDK ,然后引用Microsoft.VisualStudio.Threading程序集。 这是使用具有相同API的AsyncManualResetEvent的实现:
 public class PeriodicalNotifier : IDisposable { private readonly Timer _timer; private readonly AsyncManualResetEvent _asyncManualResetEvent; public PeriodicalNotifier(TimeSpan autoNotifyInterval) { _asyncManualResetEvent = new AsyncManualResetEvent(); _timer = new Timer(_ => Notify(), null, TimeSpan.Zero, autoNotifyInterval); } public async Task WaitForNotifictionAsync(CancellationToken cancellationToken) { await _asyncManualResetEvent.WaitAsync().WithCancellation(cancellationToken); _asyncManualResetEvent.Reset(); } public void Notify() { _asyncManualResetEvent.Set(); } public void Dispose() { _timer.Dispose(); } } 

您可以通过设置重置事件进行通知,使用WaitAsync异步等待,使用WithCancellation扩展方法启用取消,然后重置事件。 通过设置相同的重置事件来“合并”多个通知。

 Subject notifier = new Subject Observable.Interval(TimeSpan.FromMilliSeconds(100)) .Select(_ => value)).Switch() .Subscribe(value => DoSomething(value)); //Some other thread... notifier.OnNext(...); 

此Rx查询将每隔100毫秒继续发送值,直到新值出现。 然后我们每100毫秒通知该值。

如果我们每100毫秒接收一次比一次更快的值,那么我们基本上具有与输入相同的输出。