在C#/ .NET中为producer / cosumer实现异步流

有一个lib将其结果输出到给定的Stream对象中。 我想在lib完成之前开始使用结果。 如果生产者提前跑得太远,Stream应该是阻塞的,以简化使用并避免过多的内存消耗; 线程安全,允许生产者和消费者的独立存在。

一旦lib完成,生产者线程应关闭流,从而通知消费者没有更多数据。

我正在考虑使用NetworkStream或PipeStream(匿名),但两者都可能因为通过内核发送数据而变慢。

任何推荐?

var stream = new AsyncBlockingBufferedStream(); void ProduceData() { // In producer thread externalLib.GenerateData(stream); stream.Close(); } void ConsumeData() { // In consumer thread int read; while ((read = stream.Read(...)) != 0) { ... } } 

var stream = new AsyncBlockingBufferedStream(); void ProduceData() { // In producer thread externalLib.GenerateData(stream); stream.Close(); } void ConsumeData() { // In consumer thread int read; while ((read = stream.Read(...)) != 0) { ... } }

var stream = new AsyncBlockingBufferedStream(); void ProduceData() { // In producer thread externalLib.GenerateData(stream); stream.Close(); } void ConsumeData() { // In consumer thread int read; while ((read = stream.Read(...)) != 0) { ... } }

根据Chris Taylor先前的回答,这是我自己的修订版,具有更快的基于块的操作和更正的写入完成通知。 它现在标记为维基,所以你可以改变它。

 public class BlockingStream : Stream { private readonly BlockingCollection _blocks; private byte[] _currentBlock; private int _currentBlockIndex; public BlockingStream(int streamWriteCountCache) { _blocks = new BlockingCollection(streamWriteCountCache); } public override bool CanTimeout { get { return false; } } public override bool CanRead { get { return true; } } public override bool CanSeek { get { return false; } } public override bool CanWrite { get { return true; } } public override long Length { get { throw new NotSupportedException(); } } public override void Flush() {} public long TotalBytesWritten { get; private set; } public int WriteCount { get; private set; } public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } } public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } public override void SetLength(long value) { throw new NotSupportedException(); } public override int Read(byte[] buffer, int offset, int count) { ValidateBufferArgs(buffer, offset, count); int bytesRead = 0; while (true) { if (_currentBlock != null) { int copy = Math.Min(count - bytesRead, _currentBlock.Length - _currentBlockIndex); Array.Copy(_currentBlock, _currentBlockIndex, buffer, offset + bytesRead, copy); _currentBlockIndex += copy; bytesRead += copy; if (_currentBlock.Length <= _currentBlockIndex) { _currentBlock = null; _currentBlockIndex = 0; } if (bytesRead == count) return bytesRead; } if (!_blocks.TryTake(out _currentBlock, Timeout.Infinite)) return bytesRead; } } public override void Write(byte[] buffer, int offset, int count) { ValidateBufferArgs(buffer, offset, count); var newBuf = new byte[count]; Array.Copy(buffer, offset, newBuf, 0, count); _blocks.Add(newBuf); TotalBytesWritten += count; WriteCount++; } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (disposing) { _blocks.Dispose(); } } public override void Close() { CompleteWriting(); base.Close(); } public void CompleteWriting() { _blocks.CompleteAdding(); } private static void ValidateBufferArgs(byte[] buffer, int offset, int count) { if (buffer == null) throw new ArgumentNullException("buffer"); if (offset < 0) throw new ArgumentOutOfRangeException("offset"); if (count < 0) throw new ArgumentOutOfRangeException("count"); if (buffer.Length - offset < count) throw new ArgumentException("buffer.Length - offset < count"); } } 

我将在这里,这是一个非常简约的实现,我没有时间真正测试这个性能特征。 可能只是可以对您自己进行一些性能测试。 我在查看您的问题时得到的想法是创建一个使用BlockingCollection作为存储介质的自定义Stream。

基本上,这将为您提供一个流,您可以从不同的线程读取/写入,如果消费者方面落后,将限制生产者。 我重新尝试,这不是一个强大的实现,只是一个快速的概念certificate,需要更多的错误检查,参数validation和处理流的Close的体面方案。 目前,如果在底层BlockingCollection中仍有数据时关闭流,则无法再读取数据。 如果我明天得到一些时间,我会再多一点,但也许你可以先给出一些反馈。

更新: Yurik已将此解决方案的实现作为维基提供,应针对该答案进行增强。

public class BlockingStream:Stream {private BlockingCollection _data;
private CancellationTokenSource _cts = new CancellationTokenSource(); private int _readTimeout = -1; private int _writeTimeout = -1;

  public BlockingStream(int maxBytes) { _data = new BlockingCollection(maxBytes); } public override int ReadTimeout { get { return _readTimeout; } set { _readTimeout = value; } } public override int WriteTimeout { get { return _writeTimeout; } set { _writeTimeout = value; } } public override bool CanTimeout { get { return true; } } public override bool CanRead { get { return true; } } public override bool CanSeek { get { return false; } } public override bool CanWrite { get { return true; } } public override void Flush() { return; } public override long Length { get { throw new NotImplementedException(); } } public override long Position { get { throw new NotImplementedException(); } set { throw new NotImplementedException(); } } public override long Seek(long offset, SeekOrigin origin) { throw new NotImplementedException(); } public override void SetLength(long value) { throw new NotImplementedException(); } public override int ReadByte() { int returnValue = -1; try { byte b; if (_data.TryTake(out b, ReadTimeout, _cts.Token)) { returnValue = (int)b; } } catch (OperationCanceledException) { } return returnValue; } public override int Read(byte[] buffer, int offset, int count) { int bytesRead = 0; byte b; try { while (bytesRead < count && _data.TryTake(out b, ReadTimeout, _cts.Token)) { buffer[offset + bytesRead] = b; bytesRead++; } } catch (OperationCanceledException) { bytesRead = 0; } return bytesRead; } public override void WriteByte(byte value) { try { _data.TryAdd(value, WriteTimeout, _cts.Token); } catch (OperationCanceledException) { } } public override void Write(byte[] buffer, int offset, int count) { try { for (int i = offset; i < offset + count; ++i) { _data.TryAdd(buffer[i], WriteTimeout, _cts.Token); } } catch (OperationCanceledException) { } } public override void Close() { _cts.Cancel(); base.Close(); } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (disposing) { _data.Dispose(); } } } 

构造流时,在阻止编写器之前传递流应缓冲的最大字节数。 这是对function的一个小测试,这是唯一完成的测试......

  class Program { static BlockingStream _dataStream = new BlockingStream(10); static Random _rnd = new Random(); [STAThread] static void Main(string[] args) { Task producer = new Task(() => { Thread.Sleep(1000); for (int i = 0; i < 100; ++i) { _dataStream.WriteByte((byte)_rnd.Next(0, 255)); } }); Task consumer = new Task(() => { int i = 0; while (true) { Console.WriteLine("{0} \t-\t {1}",_dataStream.ReadByte(), i++); // Slow the consumer down. Thread.Sleep(500); } }); producer.Start(); consumer.Start(); Console.ReadKey(); } 

我使用Yuric BlockingStream一段时间,直到我们的代码运行20分钟到一小时后性能急剧下降。 我相信性能下降是由于垃圾收集器和在该方法中使用它来快速传输大量数据时创建的过多缓冲区(我没有时间来certificate它)。 我最终创建了一个环形缓冲区版本,与我们的代码一起使用时不会出现性能下降。

 ///  /// A ring-buffer stream that you can read from and write to from /// different threads. ///  public class RingBufferedStream : Stream { private readonly byte[] store; private readonly ManualResetEventAsync writeAvailable = new ManualResetEventAsync(false); private readonly ManualResetEventAsync readAvailable = new ManualResetEventAsync(false); private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); private int readPos; private int readAvailableByteCount; private int writePos; private int writeAvailableByteCount; private bool disposed; ///  /// Initializes a new instance of the  /// class. ///  ///  /// The maximum number of bytes to buffer. ///  public RingBufferedStream(int bufferSize) { this.store = new byte[bufferSize]; this.writeAvailableByteCount = bufferSize; this.readAvailableByteCount = 0; } ///  public override bool CanRead => true; ///  public override bool CanSeek => false; ///  public override bool CanWrite => true; ///  public override long Length { get { throw new NotSupportedException( "Cannot get length on RingBufferedStream"); } } ///  public override int ReadTimeout { get; set; } = Timeout.Infinite; ///  public override int WriteTimeout { get; set; } = Timeout.Infinite; ///  public override long Position { get { throw new NotSupportedException( "Cannot set position on RingBufferedStream"); } set { throw new NotSupportedException( "Cannot set position on RingBufferedStream"); } } ///  /// Gets the number of bytes currently buffered. ///  public int BufferedByteCount => this.readAvailableByteCount; ///  public override void Flush() { // nothing to do } ///  /// Set the length of the current stream. Always throws . ///  ///  /// The desired length of the current stream in bytes. ///  public override void SetLength(long value) { throw new NotSupportedException( "Cannot set length on RingBufferedStream"); } ///  /// Sets the position in the current stream. Always throws . ///  ///  /// The byte offset to the  parameter. ///  ///  /// A value of type  indicating the reference /// point used to obtain the new position. ///  ///  /// The new position within the current stream. ///  public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException("Cannot seek on RingBufferedStream"); } ///  public override void Write(byte[] buffer, int offset, int count) { if (this.disposed) { throw new ObjectDisposedException("RingBufferedStream"); } Monitor.Enter(this.store); bool haveLock = true; try { while (count > 0) { if (this.writeAvailableByteCount == 0) { this.writeAvailable.Reset(); Monitor.Exit(this.store); haveLock = false; bool canceled; if (!this.writeAvailable.Wait( this.WriteTimeout, this.cancellationTokenSource.Token, out canceled) || canceled) { break; } Monitor.Enter(this.store); haveLock = true; } else { var toWrite = this.store.Length - this.writePos; if (toWrite > this.writeAvailableByteCount) { toWrite = this.writeAvailableByteCount; } if (toWrite > count) { toWrite = count; } Array.Copy( buffer, offset, this.store, this.writePos, toWrite); offset += toWrite; count -= toWrite; this.writeAvailableByteCount -= toWrite; this.readAvailableByteCount += toWrite; this.writePos += toWrite; if (this.writePos == this.store.Length) { this.writePos = 0; } this.readAvailable.Set(); } } } finally { if (haveLock) { Monitor.Exit(this.store); } } } ///  public override void WriteByte(byte value) { if (this.disposed) { throw new ObjectDisposedException("RingBufferedStream"); } Monitor.Enter(this.store); bool haveLock = true; try { while (true) { if (this.writeAvailableByteCount == 0) { this.writeAvailable.Reset(); Monitor.Exit(this.store); haveLock = false; bool canceled; if (!this.writeAvailable.Wait( this.WriteTimeout, this.cancellationTokenSource.Token, out canceled) || canceled) { break; } Monitor.Enter(this.store); haveLock = true; } else { this.store[this.writePos] = value; --this.writeAvailableByteCount; ++this.readAvailableByteCount; ++this.writePos; if (this.writePos == this.store.Length) { this.writePos = 0; } this.readAvailable.Set(); break; } } } finally { if (haveLock) { Monitor.Exit(this.store); } } } ///  public override int Read(byte[] buffer, int offset, int count) { if (this.disposed) { throw new ObjectDisposedException("RingBufferedStream"); } Monitor.Enter(this.store); int ret = 0; bool haveLock = true; try { while (count > 0) { if (this.readAvailableByteCount == 0) { this.readAvailable.Reset(); Monitor.Exit(this.store); haveLock = false; bool canceled; if (!this.readAvailable.Wait( this.ReadTimeout, this.cancellationTokenSource.Token, out canceled) || canceled) { break; } Monitor.Enter(this.store); haveLock = true; } else { var toRead = this.store.Length - this.readPos; if (toRead > this.readAvailableByteCount) { toRead = this.readAvailableByteCount; } if (toRead > count) { toRead = count; } Array.Copy( this.store, this.readPos, buffer, offset, toRead); offset += toRead; count -= toRead; this.readAvailableByteCount -= toRead; this.writeAvailableByteCount += toRead; ret += toRead; this.readPos += toRead; if (this.readPos == this.store.Length) { this.readPos = 0; } this.writeAvailable.Set(); } } } finally { if (haveLock) { Monitor.Exit(this.store); } } return ret; } ///  public override int ReadByte() { if (this.disposed) { throw new ObjectDisposedException("RingBufferedStream"); } Monitor.Enter(this.store); int ret = -1; bool haveLock = true; try { while (true) { if (this.readAvailableByteCount == 0) { this.readAvailable.Reset(); Monitor.Exit(this.store); haveLock = false; bool canceled; if (!this.readAvailable.Wait( this.ReadTimeout, this.cancellationTokenSource.Token, out canceled) || canceled) { break; } Monitor.Enter(this.store); haveLock = true; } else { ret = this.store[this.readPos]; ++this.writeAvailableByteCount; --this.readAvailableByteCount; ++this.readPos; if (this.readPos == this.store.Length) { this.readPos = 0; } this.writeAvailable.Set(); break; } } } finally { if (haveLock) { Monitor.Exit(this.store); } } return ret; } ///  protected override void Dispose(bool disposing) { if (disposing) { this.disposed = true; this.cancellationTokenSource.Cancel(); } base.Dispose(disposing); } } 

该类使用我们的ManualResetEventAsync来帮助完成关闭。

 ///  /// Asynchronous version of  ///  public sealed class ManualResetEventAsync { ///  /// The task completion source. ///  private volatile TaskCompletionSource taskCompletionSource = new TaskCompletionSource(); ///  /// Initializes a new instance of the  /// class with a  value indicating whether to set the /// initial state to signaled. ///  ///  /// True to set the initial state to signaled; false to set the initial /// state to non-signaled. ///  public ManualResetEventAsync(bool initialState) { if (initialState) { this.Set(); } } ///  /// Return a task that can be consumed by  ///  ///  /// The asynchronous waiter. ///  public Task GetWaitTask() { return this.taskCompletionSource.Task; } ///  /// Mark the event as signaled. ///  public void Set() { var tcs = this.taskCompletionSource; Task.Factory.StartNew( s => ((TaskCompletionSource)s).TrySetResult(true), tcs, CancellationToken.None, TaskCreationOptions.PreferFairness, TaskScheduler.Default); tcs.Task.Wait(); } ///  /// Mark the event as not signaled. ///  public void Reset() { while (true) { var tcs = this.taskCompletionSource; if (!tcs.Task.IsCompleted #pragma warning disable 420 || Interlocked.CompareExchange( ref this.taskCompletionSource, new TaskCompletionSource(), tcs) == tcs) #pragma warning restore 420 { return; } } } ///  /// Waits for the  to be signaled. ///  ///  /// The  waiting  /// was canceled -or- an exception was thrown during the execution /// of the  waiting . ///  public void Wait() { this.GetWaitTask().Wait(); } ///  /// Waits for the  to be signaled. ///  ///  /// A  to observe while waiting for /// the task to complete. ///  ///  /// The  was canceled. ///  ///  /// The  waiting  was /// canceled -or- an exception was thrown during the execution of the ///  waiting . ///  public void Wait(CancellationToken cancellationToken) { this.GetWaitTask().Wait(cancellationToken); } ///  /// Waits for the  to be signaled. ///  ///  /// A  to observe while waiting for /// the task to complete. ///  ///  /// Set to true if the wait was canceled via the . ///  public void Wait(CancellationToken cancellationToken, out bool canceled) { try { this.GetWaitTask().Wait(cancellationToken); canceled = false; } catch (Exception ex) when (ex is OperationCanceledException || (ex is AggregateException && ex.InnerOf() != null)) { canceled = true; } } ///  /// Waits for the  to be signaled. ///  ///  /// A  that represents the number of /// milliseconds to wait, or a  that /// represents -1 milliseconds to wait indefinitely. ///  ///  /// true if the  was signaled within /// the allotted time; otherwise, false. ///  ///  ///  is a negative number other than -1 /// milliseconds, which represents an infinite time-out -or- /// timeout is greater than . ///  public bool Wait(TimeSpan timeout) { return this.GetWaitTask().Wait(timeout); } ///  /// Waits for the  to be signaled. ///  ///  /// The number of milliseconds to wait, or ///  (-1) to wait /// indefinitely. ///  ///  /// true if the  was signaled within /// the allotted time; otherwise, false. ///  ///  ///  is a negative number other /// than -1, which represents an infinite time-out. ///  public bool Wait(int millisecondsTimeout) { return this.GetWaitTask().Wait(millisecondsTimeout); } ///  /// Waits for the  to be signaled. ///  ///  /// The number of milliseconds to wait, or ///  (-1) to wait /// indefinitely. ///  ///  /// A  to observe while waiting for the ///  to be signaled. ///  ///  /// true if the  was signaled within /// the allotted time; otherwise, false. ///  ///  /// The  waiting  /// was canceled -or- an exception was thrown during the execution of /// the  waiting . ///  ///  ///  is a negative number other /// than -1, which represents an infinite time-out. ///  ///  /// The  was canceled. ///  public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken) { return this.GetWaitTask().Wait(millisecondsTimeout, cancellationToken); } ///  /// Waits for the  to be signaled. ///  ///  /// The number of milliseconds to wait, or ///  (-1) to wait /// indefinitely. ///  ///  /// A  to observe while waiting for the ///  to be signaled. ///  ///  /// Set to true if the wait was canceled via the . ///  ///  /// true if the  was signaled within /// the allotted time; otherwise, false. ///  ///  ///  is a negative number other /// than -1, which represents an infinite time-out. ///  public bool Wait( int millisecondsTimeout, CancellationToken cancellationToken, out bool canceled) { bool ret = false; try { ret = this.GetWaitTask().Wait(millisecondsTimeout, cancellationToken); canceled = false; } catch (Exception ex) when (ex is OperationCanceledException || (ex is AggregateException && ex.InnerOf() != null)) { canceled = true; } return ret; } } 

并且, ManualResetEventAsync使用InnerOf扩展名…

 ///  /// Extension functions. ///  public static class Extensions { ///  /// Finds the first exception of the requested type. ///  ///  /// The type of exception to return ///  ///  /// The exception to look in. ///  ///  /// The exception or the first inner exception that matches the /// given type; null if not found. ///  public static T InnerOf(this Exception ex) where T : Exception { return (T)InnerOf(ex, typeof(T)); } ///  /// Finds the first exception of the requested type. ///  ///  /// The exception to look in. ///  ///  /// The type of exception to return ///  ///  /// The exception or the first inner exception that matches the /// given type; null if not found. ///  public static Exception InnerOf(this Exception ex, Type t) { if (ex == null || t.IsInstanceOfType(ex)) { return ex; } var ae = ex as AggregateException; if (ae != null) { foreach (var e in ae.InnerExceptions) { var ret = InnerOf(e, t); if (ret != null) { return ret; } } } return InnerOf(ex.InnerException, t); } }