GRPC异步响应流C#

如何从处理程序外部为RPC生成流响应值? (具体来说,来自IObservable)我目前正在执行以下操作,但这是创建跨线程问题,因为在RPC处理程序之间共享AnRxObservable

 public override Task GetTicker(RequestProto request, ServerCallContext context) { var subscription = AnRxObservable.Subscribe(value => { responseStream.WriteAsync(new ResponseProto { Value = value }); }); // Wait for the RPC to be canceled (my extension method // that returns a task that completes when the CancellationToken // is cancelled) await context.CancellationToken.WhenCancelled(); // Dispose of the buffered stream bufferedStream.Dispose(); // Dispose subscriber (tells rx that we aren't subscribed anymore) subscription.Dispose(); return Task.FromResult(1); } 

这段代码感觉不对……但是我看不到从RPC处理程序外部创建的共享源流式传输RPC响应的任何其他方式。

一般来说,当您尝试从推模型(IObservable)转换为拉模型(枚举响应以写入和写入它们)时,您需要一个消息的中间缓冲区 – 例如blockingQueue。 然后,处理程序主体可以是异步循环,它尝试获取队列的下一条消息(最好以异步方式)并将其写入responseStream。

此外,请注意,gRPC API仅允许您在任何给定时间进行1次飞行中响应 – 而且您的代码段不符合此要求。 因此,您需要在开始另一次写入之前等待WriteAsync()(这是您需要中间队列的另一个原因)。

这个链接可能有助于解释push vs pull范例: 何时使用IEnumerable vs IObservable?