C#排队要由线程池处理的相关任务

我想将依赖任务排列在需要按顺序处理的多个流中(在每个流程中)。 流可以并行处理。

具体来说,假设我需要两个队列,并且我希望按顺序处理每个队列中的任务。 下面是示例伪代码,用于说明所需的行为:

Queue1_WorkItem wi1a=...; enqueue wi1a; ... time passes ... Queue1_WorkItem wi1b=...; enqueue wi1b; // This must be processed after processing of item wi1a is complete ... time passes ... Queue2_WorkItem wi2a=...; enqueue wi2a; // This can be processed concurrently with the wi1a/wi1b ... time passes ... Queue1_WorkItem wi1c=...; enqueue wi1c; // This must be processed after processing of item wi1b is complete 

这是一个图表,带有箭头,说明工作项之间的依赖关系:

在此处输入图像描述

问题是如何使用C#4.0 / .NET 4.0执行此操作? 现在我有两个工作线程,每个队列一个,我为每个队列使用BlockingCollection 。 我想改为利用.NET线程池并让工作线程同时处理项目(跨流程),但是在流程中连续处理。 换句话说,我希望能够表明,例如wi1b取决于wi1a的完成,而不必跟踪完成并记住wi1a,当wi1b到达时。 换句话说,我只想说,“我想为queue1提交一个工作项,它将与我已经为queue1提交的其他项目串行处理,但可能与提交给其他队列的工作项目并行”。

我希望这种描述有意义。 如果没有,请随时在评论中提问,我会相应地更新这个问题。

谢谢阅读。

更新:

总结到目前为止“有缺陷”的解决方案,这里是我不能使用的答案部分的解决方案以及我不能使用它们的原因:

TPL任务需要为ContinueWith()指定先行任务。 我不想在提交新任务时保持每个队列的先前任务的知识。

TDF ActionBlocks看起来很有希望,但似乎发布到ActionBlock的项目是并行处理的。 我需要连续处理特定队列的项目。

更新2:

RE:ActionBlocks

看起来将MaxDegreeOfParallelism选项设置为1会阻止并行处理提交到单个ActionBlock的工作项。 因此,似乎每个队列都有一个ActionBlock解决了我的问题,唯一的缺点是这需要从Microsoft安装和部署TDF库,我希望有一个纯.NET 4.0解决方案。 到目前为止,这是候选人接受的答案,除非有人能够找到一种方法来使用纯.NET 4.0解决方案来做到这一点,该解决方案不会退化为每个队列的工作线程(我已经在使用)。

我知道你有很多队列,不想捆绑线程。 每个队列可以有一个ActionBlock 。 ActionBlock可以自动执行您需要的大部分操作:它以串行方式处理工作项,并且仅在工作待处理时启动任务。 当没有待处理的工作时,不会阻止任务/线程。

最好的方法是使用Task Parallel Library (TPL)Continuations 。 延续不仅允许您创建任务流,还可以处理您的exception。 这是对TPL的一个很好的介绍 。 但是要给你一些想法……

您可以使用启动TPL任务

 Task task = Task.Factory.StartNew(() => { // Do some work here... }); 

现在,当先前任务完成(错误或成功)时,您可以使用ContinueWith方法启动第二个任务

 Task task1 = Task.Factory.StartNew(() => Console.WriteLine("Antecedant Task")); Task task2 = task1.ContinueWith(antTask => Console.WriteLine("Continuation...")); 

因此,只要task1完成,失败或被取消, task2 ‘就会启动’并开始运行。 请注意,如果task1在到达第二行代码之前已完成,则task2将被安排立即执行。 传递给第二个lambda的antTask参数是对前一个任务的引用。 有关详细示例,请参阅此链接 …

您还可以传递先行任务的延续结果

 Task.Factory.StartNew(() => 1) .ContinueWith(antTask => antTask.Result * 4) .ContinueWith(antTask => antTask.Result * 4) .ContinueWith(antTask =>Console.WriteLine(antTask.Result * 4)); // Prints 64. 

注意。 请务必在提供的第一个链接中阅读exception处理,因为这会导致TPL的新手误入歧途。

最后要特别注意你想要的是儿童任务。 子任务是创建为AttachedToParent任务。 在这种情况下,在所有子任务完成之前,延续不会运行

 TaskCreationOptions atp = TaskCreationOptions.AttachedToParent; Task.Factory.StartNew(() => { Task.Factory.StartNew(() => { SomeMethod() }, atp); Task.Factory.StartNew(() => { SomeOtherMethod() }, atp); }).ContinueWith( cont => { Console.WriteLine("Finished!") }); 

我希望这有帮助。

编辑:您是否看过ConcurrentCollections ,特别是BlockngCollection 所以在你的情况下你可能会使用类似的东西

 public class TaskQueue : IDisposable { BlockingCollection taskX = new BlockingCollection(); public TaskQueue(int taskCount) { // Create and start new Task for each consumer. for (int i = 0; i < taskCount; i++) Task.Factory.StartNew(Consumer); } public void Dispose() { taskX.CompleteAdding(); } public void EnqueueTask (Action action) { taskX.Add(Action); } void Consumer() { // This seq. that we are enumerating will BLOCK when no elements // are avalible and will end when CompleteAdding is called. foreach (Action action in taskX.GetConsumingEnumerable()) action(); // Perform your task. } } 

可以使用基于TPL的.NET 4.0解决方案,同时隐藏它需要在某处存储父任务的事实。 例如:

 class QueuePool { private readonly Task[] _queues; public QueuePool(int queueCount) { _queues = new Task[queueCount]; } public void Enqueue(int queueIndex, Action action) { lock (_queues) { var parent = _queue[queueIndex]; if (parent == null) _queues[queueIndex] = Task.Factory.StartNew(action); else _queues[queueIndex] = parent.ContinueWith(_ => action()); } } } 

这是为所有队列使用单个锁,以说明这个想法。 但是,在生产代码中,我会使用每个队列锁定来减少争用。

看起来你已经拥有的设计是好的和有效的。 您的工作线程(每个队列一个)长时间运行,因此如果您想使用Task,请指定TaskCreationOptions.LongRunning以便获得专用的工作线程。

但是这里并不需要使用ThreadPool。 它不能为长期工作提供许多好处。