Tag: tpl dataflow

在TPL Dataflow中,是否可以在创建块之后但在使用之前更改DataflowBlockOptions?

……让它生效吗? 我想推迟设置ExecutionDataflowBlockOptions.SingleProducerConstrained属性,直到我准备将网络链接在一起。 (因为,我想用它们的语义分开创建块,用它的语义将网络链接在一起。) 但据我所知,只能在创建块时设置ExecutionDataflowBlockOptions(例如,对于TransformBlock,TransformManyBlock等,您将其传递给构造函数,否则它不可见)。 但是……我没有注意到这些物业有公共制定者。 所以…我可以用ExecutionDataflowBlockOptions的占位符实例创建块并保持它以便以后我可以设置SingleProducerConstrained = true,如果我愿意,将块链接在一起(并且它将生效)? (顺便说一句,有没有什么方法可以判断SingleProducerConstrained除了测量吞吐量之外是否有任何影响?) 更新: @ i3amon在他的回答中正确指出这是无法完成的,因为数据流块会克隆您传入的DataflowBlockOptions并使用它。 但无论如何,我使用内部数据结构,我可以通过reflection和动态访问。 我把它放在下面的答案中。

如何在TPL数据流中安排流量控制?

我试图控制TPL Dataflow中的数据流。 我有一个非常快的制作人和一个非常慢的消费者。 (我的真实代码更复杂,但是,这是一个非常好的模型,它重现了问题。) 当我运行它时,代码开始饮用内存,就像它的样式一样 – 并且生产者的输出队列尽可能快地填满。 我真正希望看到的是制作人停止运行一段时间,直到消费者有机会要求它。 根据我对文档的阅读,这是应该发生的事情:也就是说,我认为生产者等待消费者有空间。 显然情况并非如此。 如何修复它以便队列不会发疯? using System; using System.Linq; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using System.Threading; namespace MemoryLeakTestCase { class Program { static void Main(string[] args) { var CreateData = new TransformManyBlock(ignore => { return Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => “Hello, World ” + i); }); var ParseFile […]

TPL数据流:将传入的集合展平为sequentiel项目

我正在使用TPL数据流构建应用程序。 其实我有以下问题。 我有一个transformblock var tfb1 = new TranformBlock<InMsg, IReadOnlyCollection> 。 因此tfb1接收消息并创建消息列表。 这个out-messages列表应链接到路由器数据块,它接收OutMsg作为输入(而不是IReadOnlyCollection )。 我如TransformBlock IReadOnlyCollection以便包含的消息可以用作例如TransformBlockforms的变换块的输入。 是否可以通过LinkTo() ? 谢谢

如何表示我的数据流完成?

我有一个类实现了一个使用TPL Dataflow由3个步骤组成的数据流。 在构造函数中,我将步骤创建为TransformBlocks并使用LinkTo将其链接起来,并将DataflowLinkOptions.PropagateCompletion设置为true。 该类公开了一个方法,通过在第一步调用SendAsync来启动工作流。 该方法返回工作流程最后一步的“完成”属性。 目前,工作流中的步骤似乎按预期执行,但最终步骤永远不会完成,除非我明确地在其上调用Complete。 但这样做会使工作流程短路并且没有执行任何步骤? 我究竟做错了什么? public class MessagePipeline { private TransformBlock step1; private TransformBlock step2; private TransformBlock step3; public MessagePipeline() { var linkOptions = new DataflowLinkOptions { PropagateCompletion = true }; step1 = new TransformBlock( x => { Console.WriteLine(“Step1…”); return x; }); step2 = new TransformBlock( x => { Console.WriteLine(“Step2…”); return x; […]

如何使用委托构造TransformManyBlock

我是C#TPL和DataFlow的新手,我正在努力研究如何实现TPL DataFlow TransformManyBlock 。 我正在将其他一些代码翻译成DataFlow。 我的(简化)原始代码是这样的: private IEnumerable ExtractFromByteStream(Byte[] byteStream) { yield return byteStream; // Plus operations on the array } 在另一种方法中,我会这样称呼它: foreach (byte[] myByteArray in ExtractFromByteStream(byteStream)) { // Do stuff with myByteArray } 我正在尝试创建一个TransformManyBlock来生成来自较大输入数组(实际上是二进制流)的多个小数组(实际上是数据包),因此in和out都是byte[]类型。 我尝试了下面的内容,但我知道我错了。 我想使用与之前相同的函数构造此块,并将TransformManyBlock包装在其周围。 我收到一个错误“电话不明确……” var streamTransformManyBlock = new TransformManyBlock(ExtractFromByteStream);

TPL Dataflow如何删除块之间的链接

我想知道。 如何删除块之间的链接? 换一种说法。 我想要与LinkTo相反。 我想写一个基于tlp数据流的记录器。 我编写了这个接口,并希望在需要时删除ILogListener的订阅。 public interface ILogManager { void RemoveListener(ILogListener listener); }

TPL Dataflow本地存储或类似的东西

我想要完成的是我有一个MaxDegreeOfParallelism = 4的动作块。 我想为每个并行路径创建一个会话对象的本地实例,所以我想要总共4个会话对象。 如果这是线程,我会创建类似于: ThreadLocal sessionPerThread = new ThreadLocal(() => new Session()); 我知道块不是线程所以我正在寻找类似但是块的东西。 有没有办法创造这个? 这个区块在服务中并且可以连续运行数月。 在此期间,大量线程用于块的每个并发槽,因此线程本地存储不合适。 我需要一些与逻辑块槽相关的东西。 此块也永远不会完成,它会运行服务的整个生命周期。 注意:以上建议的答案对我要求的内容无效。 我特别要求与本地线程不同的东西,上面的答案是使用本地线程。 这完全是一个不同的问题。

如何在multithreading应用程序中安全地使用SmtpClient.SendAsync

在我的应用程序中,我使用Dataflow库中的ActionBlock ,使用SmtpClient.SendAsync()方法发送电子邮件警报,该方法不阻止调用线程。( ActionBlock从BufferBlock获取数据,并使用bufferBlock.LinkTo(actionBlock)绑定块bufferBlock.LinkTo(actionBlock) )。 但是,如果.SendAsync()另一个.SendAsync()调用,则此方法将抛出InvalidOperationException 。 根据MSDN 文档 ,发送操作完成时会引发public event SendCompletedEventHandler SendCompleted 。 我如何确保ActionBlock生成的线程(或Tasks )之间的竞争不会导致抛出InvalidOperationException ? 到目前为止,有一个想法是将我的类(发送电子邮件)添加到SendAsync()调用和将被分配给SendCompleted事件的私有函数的私有锁。 当线程到达SendAsync()它获得锁定,并且当引发事件时,私有函数解锁锁定,允许其他线程获得锁定和进度。

对缓冲的Observable进行排序

我有一批令牌非常快,而且处理器相对较慢。 令牌有三种子类型,我希望它们按优先级处理。 因此,我希望令牌在生成后进行缓冲,并等待处理并按优先级对缓冲区进行排序。 这是我的课程: public enum Priority { High = 3, Medium = 2, Low = 1 } public class Base : IComparable { public int Id { get; set; } public int CompareTo(Base other) { return Id.CompareTo(other.Id); } } public class Foo : Base { } public class Bar : Base { } public […]

表观BufferBlock.Post/Receive/ReceiveAsync种族/错误

交叉发布到http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9 我知道……我并没有真正使用TplDataflow来发挥它的最大潜力。 ATM我只是使用BufferBlock作为消息传递的安全队列,其中生产者和消费者以不同的速率运行。 我看到一些奇怪的行为让我感到难以理解如何继续。 private BufferBlock messageQueue = new BufferBlock(); public void Send(object message) { var accepted=messageQueue.Post(message); logger.Info(“Send message was called qlen = {0} accepted={1}”, messageQueue.Count,accepted); } public async Task GetMessageAsync() { try { var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30)); //despite messageQueue.Count>0 next line //occasionally does not execute logger.Info(“message received”); //……. } catch(TimeoutException) { //do something […]