C#中的并行树遍历

我需要快速遍历一棵树,我想并行完成。 我宁愿使用并行扩展而不是手动旋转一堆线程。

我当前的代码看起来像这样:

public void Traverse(Node root) { var nodeQueue = new Queue(); nodeQueue.Enqueue(root); while (nodeQueue.Count!=0) { var node = nodeQueue.Dequeue(); if (node.Property = someValue) DoSomething(node); foreach (var node in node.Children) { nodeQueue.Enqueue(node); } } } 

我真的希望Parallel.ForEach有一个Parallel.While模拟。 我遇到了Stephen Toub关于使用Parallel.ForEach实现并行的文章。 如果正确读取它仍然无法工作,因为我正在改变我试图迭代的队列。

我是否需要使用任务工厂和递归(这有风险吗?)? 还是有一些我忽略的简单解决方案?

编辑:@svick

该树有超过250,000个节点。 现在最大深度是14个节点,包括根。

根目录下有大约500个节点,之后的平衡具有相当随机的分布。 我很快就会得到更好的分布统计数据。

@Enigmativity:

是的,许多用户同时修改了树,但我通常会为树或子树提供共​​享读锁,或允许脏读。

对node.Children的调用可以被认为是primefaces的。

DoSomething实际上是几个代理之一,对于一些昂贵的操作,我可能会收集节点的快照列表并在遍历之外处理它们。

我意识到我应该看一般情况(遍历的子树而不是整个树。)为此,我在树的每个节点上运行遍历并查看总时间。

我为每个遍历算法使用了Parallel.ForEach(nodes,Traverse),其中节点包含所有~250k节点。 这模拟(某种程度上)许多用户同时请求许多不同的节点。

00256ms宽度优先顺序

00323ms广度优先连续工作(我将静态计数器增加为“工作”)

01495ms Kirks第一个回答

01143ms Svicks第二个答案

00000ms Recursive Single Threaded在60s后没有完成

00000ms电子书的答案在60年代后没有完成

@Enigma,我想我可能会以某种方式弄乱你的算法,因为它看起来应该快得多。

结果让我惊讶地说至少。 为了让自己相信编译器并没有神奇地优化遍历,我不得不在宽度优先顺序中添加一些工作。

对于头部的单次遍历,并行化第一级仅具有最佳性能。 但几乎没有,这个数字有所改善,因为我向第二级添加了更多节点(2000而不是500)。

最直接的方法是为每个子节点创建一个Task ,然后等待所有子节点:

 public void Traverse(Node root) { if (node.Property == someValue) DoSomething(node); var tasks = new List(); foreach (var node in node.Children) { // tmp is necessary because of the way closures close over loop variables var tmp = node; tasks.Add(Task.Factory.StartNew(() => Traverse(tmp))); } Task.WaitAll(tasks.ToArray()); } 

Task相当轻量级,因此创建大量的Task得相当好。 但是它们确实有一些开销,所以做一些更复杂的事情就像拥有一些共享队列的任务一样可能会更快。 如果这是你要去的方式,不要忘记空队列并不意味着所有的工作都完成了。 如果你采用这种方式, System.Collections.Concurrent命名空间中的类将会派上用场。

编辑:由于树的形状(根有大约500个孩子),并行处理第一级应该提供良好的性能:

 public void Traverse(Node root, bool parallel = true) { if (node.Property == someValue) DoSomething(node); if (parallel) { Parallel.ForEach(node.Children, node => { Traverse(node, false); }); } else { foreach (var node in node.Children) { Traverse(node, false); } } } 

由于遍历树的速度非常快,因此对Children调用是primefaces的,并且需要并行执行DoSomething的昂贵性质,这是我对解决方案的看法。

我开始认为我需要一个函数,它将一个节点作为参数,创建一个执行DoSomething的任务,递归调用自己为所有子节点创建任务,最后返回一个等待所有内部的任务任务要完成。

这里是:

 Func createTask = null; createTask = n => { var nt = Task.Factory.StartNew(() => { if (n.Property == someValue) DoSomething(n); }); var nts = (new [] { nt, }) .Concat(n.Children.Select(cn => createTask(cn))) .ToArray(); return Task.Factory.ContinueWhenAll(nts, ts => { }); }; 

调用它并等待遍历完成所需的全部是:

 createTask(root).Wait(); 

我通过创建一个节点树来测试这个节点,其中有500个孩子离开根,有14个级别,每个节点有1或2个后续子节点。 这给了我总共319,501个节点。

我创建了一个执行一些工作的DoSomething方法 – for (var i = 0; i < 100000 ; i++) { }; - 然后运行上面的代码并将其与串行处理同一棵树进行比较。

并行版本耗时5,151 ms。 顺序版本为13,746毫秒。

我还进行了测试,将节点数减少到3,196,并将DoSomething的处理时间增加了100倍。 如果TPL的任务很快完成,那么TPL会巧妙地恢复运行顺序,从而延长处理时间,使代码运行更具并行性。

现在并行版本耗时3,203ms。 顺序版本花了11,581ms。 并且,如果我只调用createTask(root)函数而不等待它完成它只花了126ms。 这意味着树的遍历非常快,因此在遍历期间锁定树并在处理过程中将其解锁是有意义的。

我希望这有帮助。

我可能会遗漏一些东西,但我认为根本不需要一段whilewhile只是确保您遍历每个节点。

相反,只需为树中的每个节点递归调用函数。

 public void Traverse(Node root) { if (root.Property = someValue) DoSomething(node); Parallel.ForEach(root.Children, node => Traverse(node)); } 

编辑:当然替代方案,如果你喜欢水平而不是垂直处理,而昂贵的操作是DoSomething,则首先要做Traverse

 public IEnumerable Traverse(Node root) { // return all the nodes on this level first, before recurring foreach (var node in root.Children) { if (node.Property == someValue) yield return node; } // next check children of each node foreach (var node in root.Children) { var children = Traverse(node); foreach (var child in children) { yield return child; } } } Parallel.ForEach(Traverse(n), n => DoSomething(n)); 

假设你有p个处理器,你可能会在一个带有p分区的root.Children上做一个Parallel.For 。 这些中的每一个都会对子树进行传统的单线程遍历,比较,而不是DoSomething ,将DoSomething的委托排入并发队列。 如果分布基本上是随机和平衡的,并且因为遍历仅进行遍历/入队,那么该部分花费时间的1 / p 。 此外,遍历可能会在所有DoSomethings执行之前耗尽自己,因此您可以让p个消费者( DoSomething的执行者)为您提供最大的并行执行,假设所有这些操作都是独立的。

通过随机分布的子树的多个根子项的这种天真分区,遍历本身将是快速的。 随着您的消费者大致按处理器分配,您还可以获得最大并行DoSomething操作。

也许使用List或Array而不是队列会有所帮助。 还可以使用另一个List / Array来填充要访问的下一个节点。 在您首先完成整个宽度之前,您不会处理列表。 像这样的东西:

 List todoList = new List(); todoList.Add(node); while (todoList.Count > 0) { // we'll be adding next nodes to process to this list so it needs to be thread-safe // or just sync access to a non-threadsafe list // if you know approx how many nodes you expect, you can pre-size the list ThreadSafeList nextList = new ThreadSafeList(); //todoList is readonly/static so can cache Count in simple variable int maxIndex = todoList.Count-1; // process todoList in parallel Parallel.For(0, maxIndex, i => { // if list reads are thread-safe then no need to sync, otherwise sync Node x = todoList[i]; //process x; // eg do somehting, get childrenNodesToWorkOnNext, etc. // add any child nodes that need to be processed next // eg nextList.add(childrenNodesToWorkOnNext); }); // done with parallel processing by here so use the next todo list todoList = nextList; )