Tag: parallel processing

C#任务是否在一个核心上运行?

C#任务是否在一个核心上运行? 我有一个项目,我需要决定要创建多少个任务。 我需要创建尽可能多的计算机。 这是处理器,核心或逻辑处理器的数量,我在三个选项之间感到困惑。

C#Parallel.Foreach相当于Python

我有96个txt文件需要处理。 现在我正在使用for循环并一次执行一个,这个过程非常慢。 生成的96个文件,不需要合并。 有没有办法使它们并行运行,ala Parallel.foreach在C#中? 当前代码: for src_name in glob.glob(source_dir+’/*.txt’): outfile = open (…) with open(…) as infile: for line in infile: –PROCESS– for –condition–: outfile.write(…) infile.close() outfile.close() 希望此进程并行运行source_dir中的所有文件。

Parallel.ForEach中的File.Copy

我正在尝试创建一个目录并在Parallel.ForEach复制一个文件(pdf)。 以下是一个简单的例子: private static void CreateFolderAndCopyFile(int index) { const string sourcePdfPath = “c:\\testdata\\test.pdf”; const string rootPath = “c:\\testdata”; string folderDirName = string.Format(“Data{0}”, string.Format(“{0:00000000}”, index)); string folderDirPath = rootPath + @”\” + folderDirName; Directory.CreateDirectory(folderDirPath); string desPdfPath = folderDirPath + @”\” + “test.pdf”; File.Copy(sourcePdfPath, desPdfPath, true); } 上述方法创建一个新文件夹并将pdf文件复制到新文件夹。 它创建了这个目录树: TESTDATA -Data00000000 -test.pdf -Data00000001 -test.pdf …. -Data0000000N -test.pdf […]

Task.StartNew()vs Parallel.ForEach:多个Web请求场景

我已经阅读了SO中的所有相关问题,但对于我的场景中触发多个Web服务调用的最佳方法有点困惑。 我有一个聚合器服务,它接受输入,解析并将其转换为多个Web请求,进行Web请求调用(不相关,因此可以并行触发)并合并发送回调用者的响应。 现在使用以下代码 – list.ForEach((object obj) => { tasks.Add(Task.Factory.StartNew((object state) => { this.ProcessRequest(obj); }, obj, CancellationToken.None, TaskCreationOptions.AttachedToParent, TaskScheduler.Default)); }); await Task.WhenAll(tasks); await Task.WhenAll(tasks)来自Scott Hanselman的post ,据说 斯蒂芬说:“从可扩展性的角度来看,更好的解决方案是利用异步I / O.当你通过网络呼叫时,没有理由(除了方便之外)在等待响应来阻止线程时背部” 现有代码似乎消耗了太multithreading,并且处理器时间在生产负载上达到100%,这让我思考。 另一个替代是使用Parallel.ForEach,它使用分区器,但也“阻塞”调用,这对我的场景来说很好。 考虑到这是所有“异步IO”工作而不是“CPU绑定”工作,并且Web请求不会长时间运行(最多返回3秒),我倾向于认为现有代码足够好。 但这会提供比Parallel.ForEach更好的吞吐量吗? Parallel.ForEach可能使用“最小”数量的任务,因为分区因此最佳使用线程(?)。 我用一些本地测试测试了Parallel.ForEach,但似乎没有更好。 目标是减少CPU时间并提高吞吐量,从而提高可扩展性。 是否有更好的方法来并行处理Web请求? 感谢任何投入。 编辑:代码示例中显示的ProcessRequest方法确实使用HttpClient及其异步方法来触发请求(PostAsync,GetAsync,PutAsync)。

Rx如何并行化长时间运行的任务?

我有以下代码片段,它枚举了一些xml的元素(从svn log –xml …进程的输出中读取),然后为每个xml元素运行一个长时间运行的方法。 var proc = Process.Start(svnProcInfo); var xml = XDocument.Load(proc.StandardOutput); var xElements = xml.Descendants(“path”) .ToObservable() //.SubscribeOn(ThreadPoolScheduler.Instance) .Select(descendant => return LongRunning(descendant)); xElements //.SubscribeOn(NewThreadScheduler.Default) .Subscribe(result => Console.WriteLine(result); Console.ReadKey(); LongRunning方法并不重要,但在其中我记录了它运行的线程。 我们假设它运行一整秒。 我的问题是,取消注释SubscribeOn()行没有任何效果。 对LongRunning的调用是顺序的,并且每隔一秒发生在同一个线程上(尽管与主(初始)线程不同)。 这是一个控制台应用程序。 我是Rx的新手。 我错过了什么? 编辑: 在尝试了Lee Campbell的回答之后,我注意到了另一个问题。 Console.Error.WriteLine(“Main thread ” + Thread.CurrentThread.ManagedThreadId); var xElements = xml.Descendants(“path”).ToObservable() //.ObserveOn(Scheduler.CurrentThread) .SelectMany(descendant => Observable.Start(()=>LongRunning(descendant),NewThreadScheduler.Default)) .Subscribe(result => Console.WriteLine( “Result […]

LinqToSql。 更新行时出现死锁。 的Parallel.For

我有个问题。 我正在尝试使用parallel更新数据库库。 这是代码: Parallel.For(rCnt, range.Rows.Count + 1, (jrCnt, loopState) => { var prcI = new Price(); // new /*bla bla bla bla – bla bla – bla bla – bla */ if ((!string.IsNullOrEmpty(prcI.name)) && (prcI.prc != 0)) // process add or update { prcI.company = nameprice; prcI.date = datatimeselect.Text; Accessor.AddProductUpdateProduct(prcI); // main func /*bla bla […]

模拟给出不同的结果与正常的环路Vs并行For

当我尝试使用普通for循环(这是正确的结果)时,我对我的一个简单模拟样本的不同结果感到有点惊讶。 请帮我找出原因。 我观察到并行执行与正常相比是如此之快。 using System; using System.Collections.Generic; using System.Threading.Tasks; namespace Simulation { class Program { static void Main(string[] args) { ParalelSimulation(); // result is .757056 NormalSimulation(); // result is .508021 which is correct Console.ReadLine(); } static void ParalelSimulation() { DateTime startTime = DateTime.Now; int trails = 1000000; int numberofpeople = 23; Random rnd = new […]

更改C#Parallel.For循环的增量值

我想转换一个for循环,它使用TPL将每次传递的迭代器增加2进入Parallel For循环。 数据不依赖于顺序或以任何方式受约束,但我只想处理源数组的每个其他元素中的数据(在下面的代码中是_Datalist),因此需要增加2。 我的For循环: for (int i = 1; i < _DataList.Length – 1; i += 2) { // Do work for _DataList[i] } 是否有可能告诉并行循环我想要增加2而不是1? 这是并行循环,但显然我每次迭代只增加1: Task.Factory.StartNew(() => Parallel.For(1, _DataList.Length, i => { // do work for _DataList[i] }) ); 我可以告诉内部循环体忽略i的奇数值,但这看起来很麻烦 – 有没有办法在循环初始化中做某种方式?

与BlockingCollection集成时,Parallel.ForEach停滞不前

我根据这个问题中的代码采用了我的并行/消费者实现 class ParallelConsumer : IDisposable { private readonly int _maxParallel; private readonly Action _action; private readonly TaskFactory _factory = new TaskFactory(); private CancellationTokenSource _tokenSource; private readonly BlockingCollection _entries = new BlockingCollection(); private Task _task; public ParallelConsumer(int maxParallel, Action action) { _maxParallel = maxParallel; _action = action; } public void Start() { try { _tokenSource = […]

取消Dispose方法中的任务

我有一个类可以产生可以无限期运行的各种任务。 处理此对象时,我想阻止这些任务运行。 这是正确的方法: public class MyClass : IDisposable { // Stuff public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { if (disposing) { queueCancellationTokenSource.Cancel(); feedCancellationTokenSource.Cancel(); } } }