无论如何平行收益率c#

我有多个枚举器枚举平面文件。 我最初在并行调用中有每个枚举器,每个Action都添加到BlockingCollection并且该集合返回一个ConsumingEnumerable();

 public interface IFlatFileQuery { IEnumerable Run(); } public class FlatFile1 : IFlatFileQuery { public IEnumerable Run() { // loop over a flat file and yield each result yield return Entity; } } public class Main { public IEnumerable DoLongTask(ICollection _flatFileQueries) { // do some other stuff that needs to be returned first: yield return Entity; // then enumerate and return the flat file data foreach (var entity in GetData(_flatFileQueries)) { yield return entity; } } private IEnumerable GetData(_flatFileQueries) { var buffer = new BlockingCollection(100); var actions = _flatFileQueries.Select(fundFileQuery => (Action)(() => { foreach (var entity in fundFileQuery.Run()) { buffer.TryAdd(entity, Timeout.Infinite); } })).ToArray(); Task.Factory.StartNew(() => { Parallel.Invoke(actions); buffer.CompleteAdding(); }); return buffer.GetConsumingEnumerable(); } } 

但经过一些测试后发现,下面的代码更改速度提高了大约20-25%。

 private IEnumerable GetData(_flatFileQueries) { return _flatFileQueries.AsParallel().SelectMany(ffq => ffq.Run()); } 

代码更改的问题在于它等待所有平面文件查询被枚举之后才返回整个批次,然后可以枚举和生成。

是否有可能以某种方式在上面的代码中产生更快的速度?

我应该补充一点,所有平面文件查询的组合结果最多只能是1000个实体。

编辑 :将其更改为以下内容对运行时没有影响。 (R#甚至建议回到原来的样子)

 private IEnumerable GetData(_flatFileQueries) { foreach (var entity in _flatFileQueries.AsParallel().SelectMany(ffq => ffq.Run())) { yield return entity; } } 

代码更改的问题在于它等待所有平面文件查询被枚举之后才返回整个批次,然后可以枚举和生成。

让我们通过一个简单的例子来certificate它是错误的。 首先,让我们创建一个TestQuery类,它将在给定时间后产生一个实体。 其次,让我们并行执行几个测试查询,并测量产生结果所需的时间。

 public class TestQuery : IFlatFileQuery { private readonly int _sleepTime; public IEnumerable Run() { Thread.Sleep(_sleepTime); return new[] { new Entity() }; } public TestQuery(int sleepTime) { _sleepTime = sleepTime; } } internal static class Program { private static void Main() { Stopwatch stopwatch = Stopwatch.StartNew(); var queries = new IFlatFileQuery[] { new TestQuery(2000), new TestQuery(3000), new TestQuery(1000) }; foreach (var entity in queries.AsParallel().SelectMany(ffq => ffq.Run())) Console.WriteLine("Yielded after {0:N0} seconds", stopwatch.Elapsed.TotalSeconds); Console.ReadKey(); } } 

此代码打印:

1秒后产生
2秒后产生
3秒后产生

你可以看到这个输出, AsParallel()会尽快产生每个结果,所以一切正常。 请注意,根据并行度(例如“2s,5s,6s”,并行度为1,可能会得到不同的时序,有效地使整个操作完全不平行)。 此输出来自4芯机器。

如果线程之间没有共同的瓶颈(例如共享锁定资源),则您的长处理可能会随核心数量而扩展。 您可能希望对算法进行概要分析,以查看是否存在可以使用dotTrace等工具进行改进的缓慢部分。

我不认为你的代码中有任何红旗。 没有令人发指的低效率。 我认为这归结为多个较小的差异。

PLINQ非常擅长处理数据流。 在内部,它比逐个添加项目到同步列表更有效。 我怀疑你对TryAdd的调用是一个瓶颈,因为每次调用都需要内部至少两个Interlocked操作。 这些可能会给处理器间内存总线带来巨大负担,因为所有线程都将竞争相同的缓存线。

PLINQ因为内部更便宜,它会做一些缓冲。 我敢肯定它不是一个一个地输出项目。 可能它会对它们进行批量处理,并通过多种方式分摊同步成本。

第二个问题是BlockingCollection的有限容量。 100不是很多。 这可能会导致很多等待。 等待是昂贵的,因为它需要调用内核和上下文切换。

在任何情况下,我都会选择对我有用的替代方案:

这对我有用:

  • 在Parallel.Foreach中的任务中,在ConcurrentQueue中将项目转换为要处理。
  • 该任务有一个继续,标志着该任务结束的标志。
  • 在同一个执行线程中,任务结束了一段时间后出队并产生

对我来说快速而出色的结果:

 Task.Factory.StartNew (() => { Parallel.ForEach (TextHelper.ReadLines(FileName), ProcessHelper.DefaultParallelOptions, (string currentLine) => { // Read line, validate and enqeue to an instance of FileLineData (custom class) }); }). ContinueWith ( ic => isCompleted = true ); while (!isCompleted || qlines.Count > 0) { if (qlines.TryDequeue (out returnLine)) { yield return returnLine; } }