重型I / O操作中的Parallel.ForEach与Async Forloop
我想比较两种理论情景。 为了这个问题,我简化了案件。 但基本上它是典型的生产者消费者情景。 (我专注于消费者)。
我有一个大的Queue dataQueue
,我必须传输给多个客户端。
所以让我们从更简单的情况开始:
class SequentialBlockingCase { public static Queue DataQueue = new Queue(); private static List _destinations = new List(); /// /// Is the main function that is run in its own thread /// private static void Run() { while (true) { if (DataQueue.Count > 0) { string data = DataQueue.Dequeue(); foreach (var destination in _destinations) { SendDataToDestination(destination, data); } } else { Thread.Sleep(1); } } } private static void SendDataToDestination(string destination, string data) { //TODO: Send data using http post, instead simulate the send Thread.Sleep(200); } } }
现在这个设置工作正常。 它位于那里并轮询Queue
,当有数据要发送时,它将它发送到所有目的地。
问题:
- 如果其中一个目的地不可用或速度较慢,则会影响所有其他目的地。
- 在并行执行的情况下,它不使用multithreading。
- 每个传输到每个目的地的块。
所以这是我的第二次尝试:
class ParalleBlockingCase { public static Queue DataQueue = new Queue(); private static List _destinations = new List(); /// /// Is the main function that is run in its own thread /// private static void Run() { while (true) { if (DataQueue.Count > 0) { string data = DataQueue.Dequeue(); Parallel.ForEach(_destinations, destination => { SendDataToDestination(destination, data); }); } else { Thread.Sleep(1); } } } private static void SendDataToDestination(string destination, string data) { //TODO: Send data using http post Thread.Sleep(200); } }
如果1个目的地缓慢或不可用,则此修订至少不会影响其他目的地。
但是这个方法仍然是阻塞的,我不确定Parallel.ForEach
是否使用了线程池。 我的理解是它将创建X个线程/任务并一次执行4个(4个核心cpu)。 但在任务5开始之前,它必须完全完成芬兰任务1。
因此我的第三个选择:
class ParalleAsyncCase { public static Queue DataQueue = new Queue(); private static List _destinations = new List { }; /// /// Is the main function that is run in its own thread /// private static void Run() { while (true) { if (DataQueue.Count > 0) { string data = DataQueue.Dequeue(); List tasks = new List(); foreach (var destination in _destinations) { var task = SendDataToDestination(destination, data); task.Start(); tasks.Add(task); } //Wait for all tasks to complete Task.WaitAll(tasks.ToArray()); } else { Thread.Sleep(1); } } } private static async Task SendDataToDestination(string destination, string data) { //TODO: Send data using http post await Task.Delay(200); } }
现在从我理解这个选项,仍然会在Task.WaitAll(tasks.ToArray());
的主线程上Task.WaitAll(tasks.ToArray());
这很好,因为我不希望它以比执行任务更快的速度创建任务。
但是并行执行的任务应该使用ThreadPool
,并且所有X个任务应该立即开始执行,而不是阻塞或按顺序执行。 (线程池将在它们变为活动或正在awaiting
在它们之间交换)
现在我的问题。
选项3是否比选项2具有任何性能优势。
特别是在更高性能的服务器端方案中。 在我正在处理的特定软件中。 上面我的简单用例会有多个实例。 即几个消费者。
我对两种解决方案的理论差异和专业与缺点感兴趣,如果有的话,甚至可能是更好的第四种选择。
Parallel.ForEach
将使用线程池。 异步代码不会 ,因为它根本不需要任何线程 (链接到我的博客)。
正如Mrinal所指出的,如果你有CPU绑定代码,并行性是合适的; 如果您有I / O绑定代码,则异步是合适的。 在这种情况下,HTTP POST显然是I / O,因此理想的消费代码将是异步的。
如果有的话,甚至可能是更好的第四选择。
我建议让你的消费者完全异步。 为此,您需要使用与异步兼容的生产者/消费者队列。 在TPL Dataflow库中有一个相当高级的( BufferBlock
), 在我的AsyncEx库中有一个相当简单的( AsyncProducerConsumerQueue
) 。
使用其中任何一个,您可以创建完全异步的使用者:
List tasks = new List (); foreach (var destination in _destinations) { var task = SendDataToDestination(destination, data); tasks.Add(task); } await Task.WhenAll(tasks);
或者,更简化:
var tasks = _destinations .Select(destination => SendDataToDestination(destination, data)); await Task.WhenAll(tasks);
你的主要问题 – Parallel.ForEach vs Async Forloop
- 对于
computing operations
,在内存处理中始终使用Parallel API
作为从线程池调用的线程来执行某些工作,这是其调用的目的。 - 对于
IO bound operations
,始终是Async-Await
,因为没有调用线程,它使用硬件functionIO completion ports
在后台处理。
由于Async-Await是首选选项,因此我要在实现中指出一些事项:
- 它是
Synchronous
因为你没有等待主要操作Send data using http post
,正确的代码将await Http Post Async
不await Task.Delay
- 如果你正在调用像
Http post Async
这样的标准Async
实现,你不需要显式启动Task
,只有你有自定义Async
方法的情况才会这样。 -
Task.WaitAll
仅适用于没有Synchronization上下文或UI线程的Console应用程序,否则会导致死锁,需要使用Task.WhenAll
现在关于Parallel approach
- 虽然代码是正确的,并且
Parallel API
确实可以在Thread pool
工作,并且大部分它能够重用线程,从而进行优化,但是如果任务长时间运行,它可能最终会创建多个线程,以限制您可以使用构造函数选项new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }
,从而将最大数量限制为系统中的逻辑核心数
另一个重要的原因是Parallel API
对于IO
绑定调用来说是一个坏主意,因为每个线程都是用于UI
的昂贵资源,包括创建Thread environment block + User memory + Kernel Memory
并且在IO操作中它闲置无所事事,这是无论如何都不好