Tag: dataflow

如何使用委托构造TransformManyBlock

我是C#TPL和DataFlow的新手,我正在努力研究如何实现TPL DataFlow TransformManyBlock 。 我正在将其他一些代码翻译成DataFlow。 我的(简化)原始代码是这样的: private IEnumerable ExtractFromByteStream(Byte[] byteStream) { yield return byteStream; // Plus operations on the array } 在另一种方法中,我会这样称呼它: foreach (byte[] myByteArray in ExtractFromByteStream(byteStream)) { // Do stuff with myByteArray } 我正在尝试创建一个TransformManyBlock来生成来自较大输入数组(实际上是二进制流)的多个小数组(实际上是数据包),因此in和out都是byte[]类型。 我尝试了下面的内容,但我知道我错了。 我想使用与之前相同的函数构造此块,并将TransformManyBlock包装在其周围。 我收到一个错误“电话不明确……” var streamTransformManyBlock = new TransformManyBlock(ExtractFromByteStream);

表观BufferBlock.Post/Receive/ReceiveAsync种族/错误

交叉发布到http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9 我知道……我并没有真正使用TplDataflow来发挥它的最大潜力。 ATM我只是使用BufferBlock作为消息传递的安全队列,其中生产者和消费者以不同的速率运行。 我看到一些奇怪的行为让我感到难以理解如何继续。 private BufferBlock messageQueue = new BufferBlock(); public void Send(object message) { var accepted=messageQueue.Post(message); logger.Info(“Send message was called qlen = {0} accepted={1}”, messageQueue.Count,accepted); } public async Task GetMessageAsync() { try { var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30)); //despite messageQueue.Count>0 next line //occasionally does not execute logger.Info(“message received”); //……. } catch(TimeoutException) { //do something […]

为什么块按此顺序运行?

这是一个简短的代码示例,可以快速向您介绍我的问题: using System; using System.Linq; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; namespace DataflowTest { class Program { static void Main(string[] args) { var firstBlock = new TransformBlock(x => x, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); var secondBlock = new TransformBlock(async x => { if (x == 12) { await Task.Delay(5000); return $”{DateTime.Now}: Message is {x} (This […]