Tag: producer consumer

非阻塞和重复发生的生产者/消费者通知者实施

搜索了一段代码,它做了我想要的,我很满意。 读这个 , 这有很大帮助。 我有一个场景,我需要单个消费者在新数据可用时通知单个生产者,但也希望定期通知消费者,无论新数据是否可用。 如果通知消费者的时间超过了重复发生的时间,那就没有问题,但不应该频繁通知消费者。 当消费者已经通知并正在工作时,可能会发生“新数据”的多个通知。 (所以SemaphoreSlim不太适合)。 因此,比生产者通知速率慢的消费者不会排队后续通知,他们只是“重新发信号通知”相同的“数据可用”标志而没有影响。 我还希望消费者异步等待通知(不阻塞线程)。 我已经将下面的类包裹在一起,它包含了TaskCompletionSource并且还使用了一个内部Timer。 public class PeriodicalNotifier : IDisposable { // Need some dummy type since TaskCompletionSource has only the generic version internal struct VoidTypeStruct { } // Always reuse this allocation private static VoidTypeStruct dummyStruct; private TaskCompletionSource internalCompletionSource; private Timer reSendTimer; public PeriodicalNotifier(int autoNotifyIntervalMs) { internalCompletionSource = […]

为什么GC不收集未使用的对象?

我为我的实验用BlockingCollection实现了Producer / Consumer模式。 PerformanceCounter c = null; void Main() { var p =System.Diagnostics.Process.GetCurrentProcess(); c = new PerformanceCounter(“Process”, “Working Set – Private”, p.ProcessName); (c.RawValue/1024).Dump(“start”); var blocking = new BlockingCollection(); var t = Task.Factory.StartNew(()=>{ for (int i = 0; i Path.GetRandomFileName())) }); } blocking.CompleteAdding(); }); var t2 = Task.Factory.StartNew(()=>{ int x=0; foreach (var element in blocking.GetConsumingEnumerable()) { […]

Rx如何从pub / sub模式创建序列

我正在尝试使用Rx来评估从发布/子模式创建序列(即经典观察者模式,其中下一个元素由生产者发布)。 这基本上与.net事件相同,除了我们需要概括它以使得事件不是必需的,所以我无法利用Observable.FromEvent。 我玩过Observable.Create和Observable.Generate,发现自己最终不得不编写代码来处理pub / sub(即我必须编写生产者/消费者代码来存储已发布的项目,然后通过用它来调用IObserver.OnNext()),所以看起来我并没有真正利用Rx …… 我向下看正确的路径还是适合Rx? 谢谢

如何聚合来自异步生成器的数据并将其写入文件?

我正在学习C#中的异步/等待模式。 目前我正在尝试解决这样的问题: 有一个生产者(硬件设备)每秒生成1000个数据包。 我需要将此数据记录到文件中。 设备只有一个ReadAsync()方法一次报告一个数据包。 我需要缓冲数据包并按照它们生成的顺序将它们写入文件,每秒只执行一次。 如果写入过程没有在下一批数据包准备好写入时及时完成,则写操作应该失败。 到目前为止,我写了类似下面的内容。 它有效,但我不确定这是否是解决问题的最佳方法。 有任何意见或建议吗? 在消费者需要汇总从生产者处收到的数据时,采用这种生产者/消费者问题的最佳做法是什么? static async Task TestLogger(Device device, int seconds) { const int bufLength = 1000; bool firstIteration = true; Task writerTask = null; using (var writer = new StreamWriter(“test.log”))) { do { var buffer = new byte[bufLength][]; for (int i = 0; i { foreach (var […]

将BlockingCollection 用作单生成器,单用户FIFO查询是否合适?

我需要单生成器,单用户FIFO查询,因为 我需要按照收到的顺序处理邮件。 我需要这样做异步,因为调用者不应该在我处理消息时等待。 只有在完成上一个消息处理时,才应启动下一个消息处理。 有时“接收”消息的频率高于“处理”消息的频率。 但平均而言,我应该能够处理所有消息,有时候我必须“排队”它们。 所以它就像我认为的TCP / IP,你有一个生产者和一个消费者,有时你可以比你可以处理的更快地接收消息,所以你必须查询它们。 在哪里订单很重要,并且调用者对你用这些东西做什么完全不感兴趣。 这听起来很容易,我可能会使用通用Queue ,但我想使用BlockingCollection ,因为我不想用ManualResetEvent等编写任何代码。 BlockingCollection对我的任务有多适合,可能你还可以推荐别的东西?

C#生产者/消费者模式

我有简单的一个生产者/两个消费者代码如下,但输出显示只有C2消耗。 我的代码中有任何错误吗? class Program { static void Main(string[] args) { Object lockObj = new object(); Queue queue = new Queue(); Producer p = new Producer(queue, lockObj); Consumer c1 = new Consumer(queue, lockObj, “c1”); Consumer c2 = new Consumer(queue, lockObj, “c2”); Thread t1 = new Thread(c1.consume); Thread t2 = new Thread(c2.consume); t1.Start(); t2.Start(); Thread t = […]

在C#中实现生产者/消费者模式

如何使用事件和代理在C#中实现Producer / Consumer模式? 在使用这些设计模式时,我需要注意什么? 我需要注意哪些边缘情况?