使用TaskCompletionSource作为WaitHandle替代品是否可以接受?

我的代码处理与远程主机的TCP连接,使用ConcurrentQueue存储传出消息。 它打算在单个线程中运行。 连接的生命周期包含在RunAsync而单独的对象包含连接的“公共状态”:

 class PublicState { internal readonly ConcurrentQueue OutgoingMessageQueue = new ConcurrentQueue(); internal TaskCompletionSource OutgoingMessageTcs = null; internal readonly TaskCompletionSource ConnectedTcs = new TaskCompletionSource(); public void EnqueueMessages(IEnumerable messages) { foreach( Message m in messages ) this.OutgoingMessageQueue.Enqueue( m); if( this.OutgoingMessageTcs == null ) this.OutgoingMessageTcs = new TaskCompletionSource(); this.OutgoingMessageTcs.SetResult( null ); } } static async Task RunAsync(IPEndPoint endPoint, PublicState state) { using( TcpClient tcp = new TcpClient() ) { await tcp.ConnectAsync( endPoint.Address, endPoint.Port ).ConfigureAwait(false); Byte[] reusableBuffer = new Byte[ 4096 ]; using( NetworkStream ns = tcp.GetStream() ) { state.ConnectedTcs.SetResult( null ); Task nsReadTask = null; while( tcp.Connected ) { if( !state.writeQueue.IsEmpty ) { await WriteMessagesAsync( ... ).ConfigureAwait( false ); } if( ns.DataAvailable ) { await ReadMessagesAsync( ... ).ConfigureAwait( false ); } // Wait for new data to arrive from remote host or for new messages to send: if( state.OutgoingMessageTcs == null ) state.OutgoingMessageTcs = new TaskCompletionSource(); if( nsReadTask == null ) nsReadTask = ns.ReadAsync( reusableBuffer, 0, 0 ).ConfigureAwait( false ); Task c = await Task.WhenAny( state.OutgoingMessageTcs, nsReadTask ).ConfigureAwait( false ); if( c == state.OutgoingMessageTcs.Task ) state.OutgoingMessageTcs = null; else if( c == nsReadTask ) nsReadTask = null; } } } } 

像这样使用:

 public async Task Main(String[] args) { PublicState state = new PublicState(); Task clientTask = Client.RunAsync( new IPEndPoint(args[0]), state ); await state.ConnectedTcs.Task; // awaits until TCP connection is established state.EnqueueMessage( new Message("foo") ); state.EnqueueMessage( new Message("bar") ); state.EnqueueMessage( new Message("baz") ); await clientTask; // awaits until the TCP connection is closed } 

这段代码有效,但我不喜欢它:感觉我正在使用TaskCompletionSource ,它意味着代表一个实际的Task或某种后台操作,而我真的使用TaskCompletionSource作为一种廉价的TaskCompletionSource 。 我没有使用EventWaitHandle因为它是IDisposable (我不想冒泄漏本机资源的风险)并且它缺少WaitAsyncWaitOneAsync方法。 我可以使用SemaphoreSlim (这是等待的,但包装了一个EventWaitHandle),但我的代码并没有真正代表信号量的良好用途。

我是否可以使用TaskCompletionSource ,或者在将项目添加到OutgoingMessageQueue时,是否有更好的方法在RunAsync “取消等待”执行?

我认为它“错误”的另一个原因是TaskCompletionSource只能使用一次,然后需要更换。 我很想避免无关的分配。

如果我理解正确 – TPL BufferBlock可能就是您所需要的。 当前Enqueue模拟是Post ,您可以通过ReceiveAsync扩展方法接收下一个值。

所以使用BufferBlock你的代码就像这样:

 class PublicState { internal readonly BufferBlock OutgoingMessageQueue = new BufferBlock(); internal readonly TaskCompletionSource ConnectedTcs = new TaskCompletionSource(); public void EnqueueMessage(Message message) { this.OutgoingMessageQueue.Post(message); } } static async Task RunAsync(IPEndPoint endPoint, PublicState state) { using (TcpClient tcp = new TcpClient()) { await tcp.ConnectAsync(endPoint.Address, endPoint.Port).ConfigureAwait(false); Byte[] reusableBuffer = new Byte[4096]; using (NetworkStream ns = tcp.GetStream()) { state.ConnectedTcs.SetResult(null); Task nsReadTask = null; Task newMessageTask = null; while (tcp.Connected) { // Wait for new data to arrive from remote host or for new messages to send: if (nsReadTask == null) nsReadTask = ns.ReadAsync(reusableBuffer, 0, 0); if (newMessageTask == null) newMessageTask = state.OutgoingMessageQueue.ReceiveAsync(); var completed = await Task.WhenAny(nsReadTask, newMessageTask).ConfigureAwait(false); if (completed == newMessageTask) { var result = await newMessageTask; // do stuff newMessageTask = null; } else { var bytesRead = await nsReadTask; nsReadTask = null; } } } } } 

作为奖励,这个版本(我认为)是线程安全的,而你当前的版本不是,因为你使用OutgoingMessageTcs从潜在的多个线程( RunAsync线程和EnqueueMessages调用者的线程)进行非线程安全的事情。

如果由于某种原因你不喜欢BufferBlock – 你可以完全以同样的方式使用来自Nito.AsyncEx nuget包的Nito.AsyncEx 。 初始化变为:

 internal readonly AsyncCollection OutgoingMessageQueue = new AsyncCollection(new ConcurrentQueue()); 

并取:

 if (newMessageTask == null) newMessageTask = state.OutgoingMessageQueue.TakeAsync(); 

为了备份其他人提到的内容,它看起来像微软的文档提到,甚至鼓励开发一个Semaphore类,它在这里写在Task对象之上:

您还可以构建一个不依赖于等待句柄的异步信号量,而是完全使用任务。 为此,您可以使用诸如使用基于任务的异步模式中讨论的技术来在Task之上构建数据结构。

这确实让我想知道为什么这样的预先包装的类还不存在,但它肯定表明这很好。