Tag: tpl dataflow

在TryReceiveAll之后使用OutputAvailableAsync的BufferBlock死锁

在处理这个问题 的答案时 ,我写了这个片段: var buffer = new BufferBlock(); var producer = Task.Run(async () => { while (true) { await Task.Delay(TimeSpan.FromMilliseconds(100)); buffer.Post(null); Console.WriteLine(“Post ” + buffer.Count); } }); var consumer = Task.Run(async () => { while (await buffer.OutputAvailableAsync()) { IList items; buffer.TryReceiveAll(out items); Console.WriteLine(“TryReceiveAll ” + buffer.Count); } }); await Task.WhenAll(consumer, producer); 生产者应该每100毫秒将项目发布到缓冲区,并且消费者应该清除缓冲区中的所有项目并异步地等待更多项目显示。 实际发生的是生产者清除所有项目一次,然后再也不会超出OutputAvailableAsync 。 如果我切换消费者逐个删除项目,它将作为例外工作: […]

EventSourceException:操作系统中没有可用的免费缓冲区

完整的例外文本是: EventSourceException:操作系统没有可用的免费缓冲区(例如事件速率太快)。 由于递归,我称这种方法大约一百万次。 它不会停止,我只是在VS2013的Output Debug windown中获取exception文本。 但它超级慢。 private static IEnumerable RecursiveFindServices(ISymbol sym, Solution sln) { List list = new List(); var callers = SymbolFinder.FindCallersAsync(sym, sln).Result; // this line may cause the EventSourceException (try not to call Async) foreach(var caller in callers) { string name = GetMethodName(caller); if (caller.CallingSymbol.ContainingType.Name.EndsWith(“Test”)) continue; if (recursiveList.Contains(name)) continue; recursiveList.Add(name); if (IsWebservice(caller)) […]

异步日志记录抛出NullReferenceException

我试图使用AsyncTargetingPack异步将一些信息记录到针对.NET 4.0的MVC 4控制器操作中的SQL Server。 我会直接跳到.NET 4.5,但我的应用程序存在于Azure中, 我们仍在等待更新 … 此代码按预期工作(一行写入我的数据库,没有抛出exception): public class SystemActionLogger : ISystemActionLogger { private readonly ActionBlock<Tuple> actionBlock; public SystemActionLogger(ISystemActionLogEntryRepository repository) { actionBlock = new ActionBlock<Tuple>( entry => TaskEx.Run(async () => { string data = await JsonConvert.SerializeObjectAsync(entry.Item2); await repository.PersistAsync(new SystemActionLogEntry(entry.Item1, data)); })); } public void Log(SystemAction systemAction, object data) { actionBlock.Post(new Tuple(systemAction, data)); } […]

TPL完成与完成

我需要从遗留数据库导入客户相关数据,并在此过程中执行多次转换。 这意味着单个条目需要执行其他“事件”(同步产品,创建发​​票等)。 我最初的解决方案是简单的并行方法。 它工作正常,但有时它有问题。 如果当前处理的客户需要等待相同类型的事件,他们的处理队列可能会卡住并最终超时,导致每个基础事件也失败(它们依赖于失败的事件)。 它不会一直发生,但它很烦人。 所以我有了另一个想法,分批工作。 我的意思不仅是限制同时处理的客户数量,还包括广播到队列的事件数量。 在寻找想法时,我找到了这个答案,它指向了TPL DataFlow 。 我制作了一个骨架来熟悉它。 我建立了一个简单的管道,但我对Complete()的使用和等待Completion()有点困惑。 步骤如下 制作一个数字列表(要导入的客户的ID) – 这是在导入逻辑之外,它只是在那里能够触发其余的逻辑 创建BatchBlock (以便能够同时限制要处理的客户数量) 基于id创建单个MyClass1项( TransformBlock ) 执行一些逻辑并生成MyClass2的集合( TransformManyBlock ) – 例如,睡眠1秒 对集合的每个项目执行一些逻辑( ActionBlock ) – 例如,hibernate1秒钟 这是完整的代码: public static class Program { private static void Main(string[] args) { var batchBlock = new BatchBlock(2); for (var i = 1; i […]

TPL数据流:有限容量并等待完成

下面我为了简单起见将一个真实场景复制为LINQPad脚本: var total = 1 * 1000 * 1000; var cts = new CancellationTokenSource(); var threads = Environment.ProcessorCount; int capacity = 10; var edbOptions = new ExecutionDataflowBlockOptions{BoundedCapacity = capacity, CancellationToken = cts.Token, MaxDegreeOfParallelism = threads}; var dbOptions = new DataflowBlockOptions {BoundedCapacity = capacity, CancellationToken = cts.Token}; var gdbOptions = new GroupingDataflowBlockOptions {BoundedCapacity = capacity, CancellationToken […]

在ITargetBlock 中重试策略

我需要在工作流程中引入重试策略。 假设有3个块以这种方式连接: var executionOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 }; var buffer = new BufferBlock(); var processing = new TransformBlock(…, executionOptions); var send = new ActionBlock(…); buffer.LinkTo(processing); processing.LinkTo(send); 因此,有一个缓冲区累积数据,然后将其发送到变换块,该变换块一次不处理3个项目,然后将结果发送到动作块。 在处理过程中可能会出现变换块瞬态错误,如果错误是瞬态错误,我想重试该块。 我知道块通常不可重试(传递到块中的委托可以被重试)。 其中一个选项是包装传递给支持重试的委托。 我也知道有一个非常好的库TransientFaultHandling.Core ,它为瞬态故障提供重试机制。 这是一个很棒的图书馆,但不是我的情况。 如果我将传递给转换块的委托包装到RetryPolicy.ExecuteAsync方法中,则转换块内的消息将被锁定,并且在重试完成或失败之前,转换块将无法接收新消息。 想象一下,如果所有3条消息都输入到重试中(假设下一次重试尝试将在2分钟内完成)并且失败,则变换块将被卡住,直到至少有一条消息离开变换块。 我看到的唯一解决方案是扩展ITargetBlock (实际上, ITargetBlock也足够了),并手动重试(如此处所示): do { try { return await transform(input); } catch { if( numRetries processing.Post(message)); […]

散列/分片ActionBlocks

我需要并行处理某些项目,因此我使用的是TPL Dataflow 。 问题在于,共享相同密钥(类似于字典)的项目应按FIFO顺序处理,而不是彼此平行(它们可以与具有不同值的其他项目并行)。 正在完成的工作是CPU绑定最小的异步锁,所以我的解决方案是创建一个ActionBlock的数组,其大小为Environment.ProcessorCount ,没有并行性,并根据密钥的GetHashCode值发布到它们。 创建: _actionBlocks = new ActionBlock[Environment.ProcessorCount]; for (int i = 0; i < _actionBlocks.Length; i++) { _actionBlocks[i] = new ActionBlock(_ => ProcessItemAsync(_)); } 用法: bool ProcessItem(Key key, Item item) { var actionBlock = _actionBlocks[(uint)key.GetHashCode() % _actionBlocks.Length]; return actionBlock.Post(item); } 所以,我的问题是,这是我问题的最佳解决方案吗? 我是否会损害性能/可扩展性? 我错过了什么吗?

为什么块按此顺序运行?

这是一个简短的代码示例,可以快速向您介绍我的问题: using System; using System.Linq; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; namespace DataflowTest { class Program { static void Main(string[] args) { var firstBlock = new TransformBlock(x => x, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); var secondBlock = new TransformBlock(async x => { if (x == 12) { await Task.Delay(5000); return $”{DateTime.Now}: Message is {x} (This […]

TPL Dataflow,如何将项目转发到许多链接目标块中的一个特定目标块?

我正在寻找一个TPL数据流块解决方案,它可以容纳多个项目,它可以链接到多个目标块,但是它能够将项目转发到仅通过filter/谓词的特定目标块。 任何时候都不应该将项目同时传递给多个目标块,始终只能与匹配filter的项目一起传递,否则该项目可以被丢弃。 我不喜欢BroadCastBlock,因为如果我理解正确,它不能保证传送(或者是吗?)并且过滤是在目标块侧完成的,这意味着BroadCastBlock基本上将每个项目的副本发送到所有linkedTo目标块。 如果我理解正确的话,它也不会在任何时候持有多个项目。 我不想使用Post / Async但维护LinkTo链。 是否有办法绕过完整的自定义数据流块? 或者我误解了BroadCastBlock的工作原理? 不幸的是,实际上没有太多文档可以详细介绍并涵盖用例。 任何想法都受到高度赞赏。