Tag: task parallel library

Cold Tasks和TaskExtensions.Unwrap

我有一个缓存类,它使用冷(未启动)任务来避免多次运行昂贵的东西。 public class AsyncConcurrentDictionary : System.Collections.Concurrent.ConcurrentDictionary<TKey, Task> { internal Task GetOrAddAsync(TKey key, Task newTask) { var cachedTask = base.GetOrAdd(key, newTask); if (cachedTask == newTask && cachedTask.Status == TaskStatus.Created) // We won! our task is now the cached task, so run it cachedTask.Start(); return cachedTask; } } 这很有效,直到你的任务实际使用C#5的await ,ala实现 cache.GetOrAddAsync(“key”, new Task(async () => { var […]

异步unit testing中的Stubbing Task返回方法

假设我有以下类和它依赖的接口: public class MyController { private IRepository _repository; public MyController(IRepository repository) { _repository = repository; } public async Task MethodUnderTest(int someId) { var o = await _repository.FindById(someId); // update o await _repository.Commit(); } } public interface IRepository { Task Commit(); } 当我对这个方法进行unit testing时,我可以执行以下操作(使用xUnit和Rhino Mocks): [Fact] public async Task MyTest() { IRepository repositoryStub = MockRepository.GenerateStub(); MyController […]

在ITargetBlock 中重试策略

我需要在工作流程中引入重试策略。 假设有3个块以这种方式连接: var executionOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 }; var buffer = new BufferBlock(); var processing = new TransformBlock(…, executionOptions); var send = new ActionBlock(…); buffer.LinkTo(processing); processing.LinkTo(send); 因此,有一个缓冲区累积数据,然后将其发送到变换块,该变换块一次不处理3个项目,然后将结果发送到动作块。 在处理过程中可能会出现变换块瞬态错误,如果错误是瞬态错误,我想重试该块。 我知道块通常不可重试(传递到块中的委托可以被重试)。 其中一个选项是包装传递给支持重试的委托。 我也知道有一个非常好的库TransientFaultHandling.Core ,它为瞬态故障提供重试机制。 这是一个很棒的图书馆,但不是我的情况。 如果我将传递给转换块的委托包装到RetryPolicy.ExecuteAsync方法中,则转换块内的消息将被锁定,并且在重试完成或失败之前,转换块将无法接收新消息。 想象一下,如果所有3条消息都输入到重试中(假设下一次重试尝试将在2分钟内完成)并且失败,则变换块将被卡住,直到至少有一条消息离开变换块。 我看到的唯一解决方案是扩展ITargetBlock (实际上, ITargetBlock也足够了),并手动重试(如此处所示): do { try { return await transform(input); } catch { if( numRetries processing.Post(message)); […]

使用带有条件延续的任务

我对如何使用条件连续的任务有点困惑。 如果我有一个任务,然后我想继续处理成功和错误的任务,然后等待那些完成。 void FunctionThrows() {throw new Exception(“faulted”);} static void MyTest() { var taskThrows = Task.Factory.StartNew(() => FunctionThrows()); var onSuccess = taskThrows.ContinueWith( prev => Console.WriteLine(“success”), TaskContinuationOptions.OnlyOnRanToCompleted); var onError = taskThrows.ContinueWith( prev => Console.WriteLine(prev.Exception), TaskContinuationOptions.OnlyOnFaulted); //so far, so good //this throws because onSuccess was cancelled before it was started Task.WaitAll(onSuccess, onError); } 这是执行任务成功/失败分支的首选方式吗? 另外,我应该如何加入所有这些任务,假设我已经创建了一长串的延续,每个都有自己的error handling。 //for example […]

散列/分片ActionBlocks

我需要并行处理某些项目,因此我使用的是TPL Dataflow 。 问题在于,共享相同密钥(类似于字典)的项目应按FIFO顺序处理,而不是彼此平行(它们可以与具有不同值的其他项目并行)。 正在完成的工作是CPU绑定最小的异步锁,所以我的解决方案是创建一个ActionBlock的数组,其大小为Environment.ProcessorCount ,没有并行性,并根据密钥的GetHashCode值发布到它们。 创建: _actionBlocks = new ActionBlock[Environment.ProcessorCount]; for (int i = 0; i < _actionBlocks.Length; i++) { _actionBlocks[i] = new ActionBlock(_ => ProcessItemAsync(_)); } 用法: bool ProcessItem(Key key, Item item) { var actionBlock = _actionBlocks[(uint)key.GetHashCode() % _actionBlocks.Length]; return actionBlock.Post(item); } 所以,我的问题是,这是我问题的最佳解决方案吗? 我是否会损害性能/可扩展性? 我错过了什么吗?

从Task.Run获取返回值

我在这里有以下代码 public static async Task Start(IProgress progress) { const int total = 10; for (var i = 0; i RunLongTask(i.ToString(CultureInfo.InvariantCulture))); if (progress != null) { var args = new ProcessTaskAsyncExProgress { ProgressPercentage = (int)(i / (double)total * 100.0), Text = “processing ” + i }; progress.Report(args); } } return “Done”; } private static string RunLongTask(string […]

将异步 – 等待C#代码转换为与调度程序相关的F#

我想知道这是否是一个太广泛的问题,但最近我让自己遇到了一段代码,我想确定如何从C#转换为适当的F#。 旅程从这里开始(1) (TPL-F#交互的原始问题),并继续这里(2) (我正在考虑转换为F#的一些示例代码)。 示例代码在这里重现的时间太长,但有趣的函数是ActivateAsync , RefreshHubs和AddHub 。 特别有趣的是 AddHub具有private async Task AddHub(string address)的签名。 RefreshHubs在一个循环中调用AddHub并收集一个tasks列表,然后通过await Task.WhenAll(tasks)它最终await Task.WhenAll(tasks) ,因此返回值与其private async Task RefreshHubs(object _)签名相匹配。 RefreshHubs由ActivateAsync调用,就像await RefreshHubs(null) ,最后有一个调用await base.ActivateAsync()匹配函数签名public override async Task ActivateAsync() 。 题: 将这些函数签名正确转换为F#仍然保持接口和function并尊重默认的自定义调度程序是什么? 我也不太确定这个“在F#中异步/等待”。 至于如何“机械地”做到这一点。 🙂 原因是在链接“here(1)”中似乎存在问题(我没有validation过),因为F#异步操作不遵循(Orleans)运行时设置的自定义协作调度程序。 此外, 这里指出TPL操作逃脱调度程序并转到任务池,因此禁止使用它们。 我能想到解决这个问题的一种方法是使用F#函数,如下所示 //Sorry for the inconvenience of shorterned code, for context see the link “here (1)”… override […]

在开始新任务之前检查任务是否已在运行

有一个在任务中执行的过程。 我不希望其中一个同时执行。 这是检查任务是否已在运行的正确方法吗? private Task task; public void StartTask() { if (task != null && (task.Status == TaskStatus.Running || task.Status == TaskStatus.WaitingToRun || task.Status == TaskStatus.WaitingForActivation)) { Logger.Log(“Task has attempted to start while already running”); } else { Logger.Log(“Task has began”); task = Task.Factory.StartNew(() => { // Stuff }); } }

为什么CancellationTokenRegistration存在,为什么它实现IDisposable

我一直看到在CancellationTokenRegistration结果中使用Cancellation.Register和using子句的代码: using (CancellationTokenRegistration ctr = token.Register(() => wc.CancelAsync())) { await wc.DownloadStringAsync(new Uri(“http://www.hamster.com”)); } 我明白你应该确保你Dispose一个IDisposable ,但为什么它甚至实现IDisposable ? 它必须释放什么资源? 它认为平等的唯一方法。 如果您不Dispose它会发生什么? 你泄漏了什么?

使用Task.StartNew方法时如何保证创建新线程

据我所知,我有一些误导性的信息。 我需要在后台运行一个单独的线程。 目前我这样做: var task = Task.Factory.StartNew (CheckFiles , cancelCheckFile.Token , TaskCreationOptions.LongRunning , TaskScheduler.Default);//Check for files on another thread private void CheckFiles() { while (!cancelCheckFile.Token.IsCancellationRequested) { //do stuff } } 这总是为我创建一个新线程。 但经过多次讨论后,即使将其标记为LongRunning也无法保证将创建新的线程。 在过去,我做过这样的事情: thQueueChecker = new Thread(new ThreadStart(CheckQueue)); thQueueChecker.IsBackground = true; thQueueChecker.Name = “CheckQueues” + DateTime.Now.Ticks.ToString(); thQueueChecker.Start(); private void CheckQueue() { while (!ProgramEnding) { //do […]