如何聚合来自异步生成器的数据并将其写入文件?

我正在学习C#中的异步/等待模式。 目前我正在尝试解决这样的问题:

  • 有一个生产者(硬件设备)每秒生成1000个数据包。 我需要将此数据记录到文件中。

  • 设备只有一个ReadAsync()方法一次报告一个数据包。

  • 我需要缓冲数据包并按照它们生成的顺序将它们写入文件,每秒只执行一次。

  • 如果写入过程没有在下一批数据包准备好写入时及时完成,则写操作应该失败。

到目前为止,我写了类似下面的内容。 它有效,但我不确定这是否是解决问题的最佳方法。 有任何意见或建议吗? 在消费者需要汇总从生产者处收到的数据时,采用这种生产者/消费者问题的最佳做法是什么?

 static async Task TestLogger(Device device, int seconds) { const int bufLength = 1000; bool firstIteration = true; Task writerTask = null; using (var writer = new StreamWriter("test.log"))) { do { var buffer = new byte[bufLength][]; for (int i = 0; i  { foreach (var b in buffer) writer.WriteLine(ToHexString(b)); }); firstIteration = false; } while (--seconds > 0); } } 

您可以使用以下想法,前提是flush的条件是数据包的数量(最多1000个)。 我没有测试它。 它利用了Stephen Cleary的AsyncProducerConsumerQueue在这个问题中的特色。

 AsyncProducerConsumerQueue _queue; Stream _stream; // producer async Task ReceiveAsync(CancellationToken token) { while (true) { var list = new List(); while (true) { token.ThrowIfCancellationRequested(token); var packet = await _device.ReadAsync(token); list.Add(packet); if (list.Count == 1000) break; } // push next batch await _queue.EnqueueAsync(list.ToArray(), token); } } // consumer async Task LogAsync(CancellationToken token) { Task previousFlush = Task.FromResult(0); CancellationTokenSource cts = null; while (true) { token.ThrowIfCancellationRequested(token); // get next batch var nextBatch = await _queue.DequeueAsync(token); if (!previousFlush.IsCompleted) { cts.Cancel(); // cancel the previous flush if not ready throw new Exception("failed to flush on time."); } await previousFlush; // it's completed, observe for any errors // start flushing cts = CancellationTokenSource.CreateLinkedTokenSource(token); previousFlush = _stream.WriteAsync(nextBatch, 0, nextBatch.Count, cts.Token); } } 

如果您不想让记录器失败,而是希望取消刷新并继续下一批,则可以对此代码进行最小的更改。

回应@ l3arnon评论:

  1. 数据包不是字节,而是byte []。 2.您还没有使用过OP的ToHexString。 3. AsyncProducerConsumerQueue的稳健性和测试性都不如.Net的TPL数据流。 4.在抛出导致该行冗余的exception之后,等待previousFlush出现错误。 简而言之:我认为可能的附加值并不能certificate这个非常复杂的解决方案。
  1. “数据包不是一个字节,它是字节[]” – 数据包一个字节,这从OP的代码中很明显: buffer[i] = await device.ReadAsync() 。 然后,一批数据包是byte[]
  2. “你还没有使用OP的ToHexString。” – 目标是展示如何使用Stream.WriteAsync 本身接受取消令牌,而不是WriteLineAsync ,它不允许取消。 将ToHexStringStream.WriteAsync一起使用并仍然利用取消支持是微不足道的:

     var hexBytes = Encoding.ASCII.GetBytes(ToHexString(nextBatch) + Environment.NewLine); _stream.WriteAsync(hexBytes, 0, hexBytes.Length, token); 
  3. “AsyncProducerConsumerQueue的稳健性和测试性都不如.Net的TPL数据流” – 我不认为这是一个确定的事实。 但是,如果OP关心它,他可以使用常规的BlockingCollection ,它不会阻塞生产者线程。 在等待下一批时阻止使用者线程是可以的,因为写入是并行完成的。 与此相反,您的TPL Dataflow版本带有一个冗余 CPU和锁密集操作:使用logAction.Post(packet)将数据从生产者管道移动到写入器pipleline,逐字节。 我的代码不这样做。

  4. “在你抛出exception导致该行冗余之后,你等待previousFlush的错误。” – 这条线不是多余的。 也许,你错过了这一点:当previousFlush.IsFaultedpreviousFlush.IsCancelled也为true时, previousFlush.IsFaulted可能为true 。 因此, await previousFlush与之相关,以观察已完成任务上的任何错误(例如,写入失败),否则将丢失。

一个更好的方法恕我直言将有2个“工人”,一个制片人和一个消费者。 制作人从设备中读取并简单填写列表。 消费者每秒“醒来”并将批次写入文件。

 List _data = new List(); async Task Producer(Device device) { while (true) { _data.Add(await device.ReadAsync()); } } async Task Consumer(Device device) { using (var writer = new StreamWriter("test.log"))) { while (true) { Stopwatch watch = Stopwatch.StartNew(); var batch = _data; _data = new List(); foreach (var packet in batch) { writer.WriteLine(ToHexString(packet)); if (watch.Elapsed >= TimeSpan.FromSeconds(1)) { throw new Exception("Write Time Out!"); } } await Task.Delay(TimeSpan.FromSeconds(1) - watch.Elapsed); } } } 

while (true)应该可以由系统范围的取消令牌替换。

假设您可以按金额(1000)而不是时间(1秒)批量处理,最简单的解决方案可能是使用TPL DataflowBatchBlock ,它会按大小自动批量项目流:

 async Task TestLogger(Device device, int seconds) { var writer = new StreamWriter("test.log"); var batch = new BatchBlock(1000); var logAction = new ActionBlock( packet => { return writer.WriteLineAsync(ToHexString(packet)); }); ActionBlock transferAction; transferAction = new ActionBlock( bytes => { foreach (var packet in bytes) { if (transferAction.InputCount > 0) { return; // or throw new Exception("Write Time Out!"); } logAction.Post(packet); } } ); batch.LinkTo(transferAction); logAction.Completion.ContinueWith(_ => writer.Dispose()); while (true) { batch.Post(await device.ReadAsync()); } }