Tag: tpl dataflow

System.Threading.Tasks.Dataflow和Microsoft.Tpl.Dataflow之间有什么区别

有2种不同的官方TPL Dataflow nuget包。 我很困惑,选择我应该使用哪一个。 据我所知,System.Threading.Tasks.Dataflow版本比其他版本稍微新一点,似乎System.Threading.Tasks.Dataflow是针对最新版本的.net。 谁能解释这些之间的差异?

.NET中的并行抓取

我工作的公司运行着几百个非常动态的网站。 它决定建立一个搜索引擎,我的任务是编写刮刀。 一些站点在旧硬件上运行,不能承受太多惩罚,而其他站点可以处理大量的并发用户。 我需要能够说对站点A使用5个并行请求,对站点B使用2个,对站点C使用1个并行请求。 我知道我可以使用线程,互斥量,信号量等来实现这一目标,但它会非常复杂。 任何更高级别的框架,如TPL,await / async,TPL Dataflow是否足够强大,能够以更简单的方式完成此应用程序?

TPL Dataflow:在保持秩序的同时设计并行性

我以前从未使用过TPL,所以我想知道是否可以用它完成:我的应用程序从很多帧创建一个gif图像动画文件。 我从一个Bitmap列表开始,它代表了gif文件的帧,需要为每个帧执行以下操作: 在框架上绘制一些文本/位图 裁剪框架 调整框架大小 将图像缩小为256色 显然,这个过程可以对列表中的所有帧并行完成,但是对于每个帧,步骤的顺序必须相同。 之后,我需要将所有帧写入gif文件。 因此,需要以与原始列表中相同的顺序接收所有帧。 最重要的是,此过程可以在第一帧准备就绪时开始,无需等到所有帧都被处理完毕。 这就是情况。 TPL Dataflow适合这个吗? 如果是的话,任何人都可以给我一个关于如何设计tpl块结构以反映上述过程的正确方向的提示吗? 与我发现的一些样品相比,这对我来说似乎相当复杂。

数据流与拆分工作到小作业,然后再次分组

我需要做这样的工作: 从数据库中获取Page对象 为每个页面获取所有图像并处理它们(IO绑定,例如,上传到CDN) 如果所有图像都成功完成,则将页面标记为在数据库中处理 由于我需要控制并行处理的页数,我决定使用TPL数据流: ____________________________ | Data pipe | | BufferBlock | | BoundedCapacity = 1 | |____________________________| | ____________________________ | Process images | | TransformBlock | | BoundedCapacity = 1 | | MaxDegreeOfParallelism = 8 | |____________________________| | ____________________________ | Save page | | ActionBlock | | BoundedCapacity = 1 | | MaxDegreeOfParallelism […]

下游的TPL数据流块如何获取源生成的数据?

我正在使用TPL Dataflow处理图像。 我收到处理请求,从流中读取图像,应用几个转换,然后将生成的图像写入另一个流: Request -> Stream -> Image -> Image … -> Stream 为此,我使用块: BufferBlock TransformBlock TransformBlock TransformBlock TransformBlock … writerBlock = new ActionBlock 问题是初始Request是包含创建结果Stream所需的一些数据以及此时我需要的一些其他信息。 我是否必须将原始Request (或其他一些上下文对象) writerBlock到所有其他块上的writerBlock ,如下所示: TransformBlock<Request,Tuple> TransformBlock<Tuple,Tuple> TransformBlock<Tuple,Tuple> … (这是丑陋的),或者有没有办法将第一个块链接到最后一个块(或者,推广到需要附加数据的块)?

tpl dataflow:固定缓冲区大小而不丢弃项目

在使用数据流后,我遇到了一个新问题。 我想限制所有块的输入队列。 我的产品块(ActionBlock)正在快速创建5000个元素并将它们发布到广播块。 因此,如果我将广播块的BoundedCapacity设置为100,则会抛出大量数据。 但我更希望生产块等待我的缓冲区输入队列中的新槽。 有没有办法摆脱这个问题?

如何标记TPL数据流周期完成?

在TPL数据流中给出以下设置。 var directory = new DirectoryInfo(@”C:\dev\kortforsyningen_dsm\tiles”); var dirBroadcast=new BroadcastBlock(dir=>dir); var dirfinder = new TransformManyBlock((dir) => { return directory.GetDirectories(); }); var tileFilder = new TransformManyBlock((dir) => { return directory.GetFiles(); }); dirBroadcast.LinkTo(dirfinder); dirBroadcast.LinkTo(tileFilder); dirfinder.LinkTo(dirBroadcast); var block = new XYZTileCombinerBlock(3, (file) => { var coordinate = file.FullName.Split(‘\\’).Reverse().Take(3).Reverse().Select(s => int.Parse(Path.GetFileNameWithoutExtension(s))).ToArray(); return XYZTileCombinerBlock.TileXYToQuadKey(coordinate[0], coordinate[1], coordinate[2]); }, (quad) => XYZTileCombinerBlock.QuadKeyToTileXY(quad, (z, […]

TPL数据流加速?

我想知道以下代码是否可以优化以更快地执行。 在一个非常简单的数据流结构中,我目前似乎每秒大约有140万条简单消息。 我知道这个示例进程同步传递/转换消息,但是,我目前正在测试TPL Dataflow作为我自己的基于任务和并发集合的自定义解决方案的替代。 我知道术语“并发”已经建议我并行运行,但是出于当前的测试目的,我通过同步推送消息在我自己的解决方案上,每秒我得到大约510万条消息。 我在这里缺少什么,我读到TPL Dataflow被推为高吞吐量,低延迟的解决方案,但到目前为止我必须忽略性能调整。 有谁能指出我正确的方向吗? class TPLDataFlowExperiments { public TPLDataFlowExperiments() { var buf1 = new BufferBlock(); var transform = new TransformBlock(t => { return “”; }); var action = new ActionBlock(s => { //Thread.Sleep(100); //Console.WriteLine(s); }); buf1.LinkTo(transform); transform.LinkTo(action); //Propagate all Completions down the flow buf1.Completion.ContinueWith(t => { transform.Complete(); transform.Completion.ContinueWith(u => { action.Complete(); […]

TPL Dataflow仅在完成所有源数据块时保证完成

两个完成转换后,如何重新编写代码完成的代码? 我认为完成意味着它被标记为完成并且“出队列”是空的? public Test() { broadCastBlock = new BroadcastBlock(i => { return i; }); transformBlock1 = new TransformBlock(i => { Console.WriteLine(“1 input count: ” + transformBlock1.InputCount); Thread.Sleep(50); return (“1_” + i); }); transformBlock2 = new TransformBlock(i => { Console.WriteLine(“2 input count: ” + transformBlock1.InputCount); Thread.Sleep(20); return (“2_” + i); }); processorBlock = new ActionBlock(i => […]

在数据流网络中使用BufferBlock 的好处

我想知道使用链接到一个或多个ActionBlocks的BufferBlock是否有好处,除了限制(使用BoundedCapacity),而不是直接发布到ActionBlock(只要不需要限制)。