如何标记TPL数据流周期完成?

在TPL数据流中给出以下设置。

var directory = new DirectoryInfo(@"C:\dev\kortforsyningen_dsm\tiles"); var dirBroadcast=new BroadcastBlock(dir=>dir); var dirfinder = new TransformManyBlock((dir) => { return directory.GetDirectories(); }); var tileFilder = new TransformManyBlock((dir) => { return directory.GetFiles(); }); dirBroadcast.LinkTo(dirfinder); dirBroadcast.LinkTo(tileFilder); dirfinder.LinkTo(dirBroadcast); var block = new XYZTileCombinerBlock(3, (file) => { var coordinate = file.FullName.Split('\\').Reverse().Take(3).Reverse().Select(s => int.Parse(Path.GetFileNameWithoutExtension(s))).ToArray(); return XYZTileCombinerBlock.TileXYToQuadKey(coordinate[0], coordinate[1], coordinate[2]); }, (quad) => XYZTileCombinerBlock.QuadKeyToTileXY(quad, (z, x, y) => new FileInfo(Path.Combine(directory.FullName,string.Format("{0}/{1}/{2}.png", z, x, y)))), () => new TransformBlock((s) => { Trace.TraceInformation("Combining {0}", s); return s; })); tileFilder.LinkTo(block); using (new TraceTimer("Time")) { dirBroadcast.Post(directory); block.LinkTo(new ActionBlock((s) => { Trace.TraceInformation("Done combining : {0}", s.Name); })); block.Complete(); block.Completion.Wait(); } 

我想知道我是如何因为这个循环而标记完成的。 一个目录被发布到dirBroadcast广播公司,该广播公司发布到dirfinder可能会向广播公司发回新的dirfinder,所以我不能简单地将其标记为完整,因为它会阻止从dirfinder添加的任何目录。 我应该重新设计它以跟踪目录的数量,或者在TPL中是否有任何相关内容。

如果您的代码的目的是使用某种并行性遍历目录结构,那么我建议不要使用TPL Dataflow并使用Microsoft的Reactive Framework。 我认为它变得更加简单。

这是我怎么做的。

首先定义一个递归函数来构建目录列表:

 Func> recurse = null; recurse = di => Observable .Return(di) .Concat(di.GetDirectories() .ToObservable() .SelectMany(di2 => recurse(di2))) .ObserveOn(Scheduler.Default); 

这将执行目录的递归,并使用默认的Rx调度程序,使得observable并行运行。

因此,通过使用输入DirectoryInfo调用recurse ,我得到一个输入目录及其所有后代的可观察列表。

现在我可以构建一个相当直接的查询来获得我想要的结果:

 var query = from di in recurse(new DirectoryInfo(@"C:\dev\kortforsyningen_dsm\tiles")) from fi in di.GetFiles().ToObservable() let zxy = fi .FullName .Split('\\') .Reverse() .Take(3) .Reverse() .Select(s => int.Parse(Path.GetFileNameWithoutExtension(s))) .ToArray() let suffix = String.Format("{0}/{1}/{2}.png", zxy[0], zxy[1], zxy[2]) select new FileInfo(Path.Combine(di.FullName, suffix)); 

现在我可以像这样操作查询:

 query .Subscribe(s => { Trace.TraceInformation("Done combining : {0}", s.Name); }); 

现在我可能已经错过了一些自定义代码,但如果这是一种你想采取的方法,我相信你可以很容易地解决任何逻辑问题。

当代码用完子目录和文件时,此代码自动处理完成。

要将Rx添加到项目中,请在NuGet中查找“Rx-Main”。

我没有看到任何方法可以做到这一点,因为每个块( dirBroadcasttileFilder )依赖于另一个块而无法自行完成。

我建议你在没有TPL Dataflow的情况下重新设计你的目录遍历,这不适合这类问题。 在我看来,更好的方法就是递归扫描目录并用文件流填充block

 private static void FillBlock(DirectoryInfo directoryInfo, XYZTileCombinerBlock block) { foreach (var fileInfo in directoryInfo.GetFiles()) { block.Post(fileInfo); } foreach (var subDirectory in directoryInfo.GetDirectories()) { FillBlock(subDirectory, block); } } 

 FillBlock(directory, block); block.Complete(); await block.Completion; 

我确信这并不总是可行,但在许多情况下(包括目录枚举),您可以使用正在运行的计数器和Interlocked函数来完成一个循环的一对多数据流:

 public static ISourceBlock GetDirectoryEnumeratorBlock(string path, int maxParallel = 5) { var outputBuffer = new BufferBlock(); var count = 1; var broadcastBlock = new BroadcastBlock(s => s); var getDirectoriesBlock = new TransformManyBlock(d => { var files = Directory.EnumerateDirectories(d).ToList(); Interlocked.Add(ref count, files.Count - 1); //Adds the subdir count, minus 1 for the current directory. if (count == 0) //if count reaches 0 then all directories have been enumerated. broadcastBlock.Complete(); return files; }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = maxParallel }); broadcastBlock.LinkTo(outputBuffer, new DataflowLinkOptions() { PropagateCompletion = true }); broadcastBlock.LinkTo(getDirectoriesBlock, new DataflowLinkOptions() { PropagateCompletion = true }); getDirectoriesBlock.LinkTo(broadcastBlock); getDirectoriesBlock.Post(path); return outputBuffer; } 

我使用这个略微修改枚举文件,但它运作良好。 小心最大并行度,这可以快速饱和网络文件系统!

只是为了展示我的真实答案,TPL和Rx的结合。

  Func> recurse = null; recurse = di => Observable .Return(di) .Concat(di.GetDirectories() .Where(d => int.Parse(d.Name) <= br_tile[0] && int.Parse(d.Name) >= tl_tile[0]) .ToObservable() .SelectMany(di2 => recurse(di2))) .ObserveOn(Scheduler.Default); var query = from di in recurse(new DirectoryInfo(Path.Combine(directory.FullName, baselvl.ToString()))) from fi in di.GetFiles().Where(f => int.Parse(Path.GetFileNameWithoutExtension(f.Name)) >= br_tile[1] && int.Parse(Path.GetFileNameWithoutExtension(f.Name)) <= tl_tile[1]).ToObservable() select fi; query.Subscribe(block.AsObserver()); Console.WriteLine("Done subscribing"); block.Complete(); block.Completion.Wait(); Console.WriteLine("Done TPL Block"); 

block是我的var block = new XYZTileCombinerBlock