Tag: parallel processing

并行http请求

我有一个应用程序,使用backgroundWorker向last.fm网站发出API请求。 最初我不知道我需要做多少请求。 响应包含总页数,因此我只会在第一次请求后得到它。 这是下面的代码。 private void backgroundWorker_DoWork(object sender, DoWorkEventArgs e) { int page = 1; int totalpages = 1; while (page <= totalpages) { if (backgroundWorker.CancellationPending) { e.Cancel = true; return; } //Here is the request part string Response = RecentTracksRequest(username, from, page); if (Response.Contains("lfm status=\"ok")) { totalpages = Convert.ToInt32(Regex.Match(Response, @"totalPages=.(\d+)").Groups[1].Value); MatchCollection match = Regex.Matches(Response, […]

如何并行读取队列中的消息?

情况 我们有一个消息队列。 我们希望并行处理消息并限制同时处理的消息的数量。 我们下面的试用代码会并行处理消息,但只有在前一个进程完成时才开始新的一批进程。 我们想在完成后重启任务。 换句话说:只要消息队列不为空,任务的最大数量应始终处于活动状态。 试用代码 static string queue = @”.\Private$\concurrenttest”; private static void Process(CancellationToken token) { Task.Factory.StartNew(async () => { while (true) { IEnumerable consumerTasks = ConsumerTasks(); await Task.WhenAll(consumerTasks); await PeekAsync(new MessageQueue(queue)); } }); } private static IEnumerable ConsumerTasks() { for (int i = 0; i { Console.WriteLine(“id: ” + message.id + “, […]

如何并行运行任务并选择满足C#中给定条件的第一个结果?

我希望并行运行三个任务。 我希望检查完成的第一个任务的结果,并检查以确定结果是否良好 。 如果是,我取消所有其他任务并返回此结果,如果没有,我将等待下一个完成的任务并检查是否良好 ,如果是,则执行相同操作。 (想想在OutputDataType的成员上做一些简单的检查)。 我继续这个,直到我获得一个结果良好的完成任务,或者所有任务返回的结果都不好 ,在这种情况下我返回null 。 先感谢您。 using System; using System.Threading; using System.Threading.Tasks; namespace myNamespace { class Program { static void Main() { InputDataType data = getMyData(); OutputDataType x = Foo(data); } static OutputDataType Foo(InputDataType data) { Task task1 = null, task2 = null, taks3 = null; Task[] TaskArray = { task1, […]

并行操作批处理

在TPL(任务 – 并行 – 库)中是否有内置支持用于批处理操作? 我最近玩了一个例程,使用查找表即音译在字符数组上进行字符替换: for (int i = 0; i < chars.Length; i++) { char replaceChar; if (lookup.TryGetValue(chars[i], out replaceChar)) { chars[i] = replaceChar; } } 我可以看到,这可能是平凡的并行化,所以跳入第一个刺,我知道会因为任务太细粒度而表现更差: Parallel.For(0, chars.Length, i => { char replaceChar; if (lookup.TryGetValue(chars[i], out replaceChar)) { chars[i] = replaceChar; } }); 然后我重新编写算法以使用批处理,以便可以将工作分块到不太细粒度的批处理中的不同线程上。 这样就可以按预期使用线程,并且接近线性加速。 我确信必须内置支持TPL中的批处理。 什么是语法,我该如何使用它? const int CharBatch = 100; […]

react native管道 ​​- 如何控制并行性?

我正在构建一个简单的处理管道,其中一个项目被作为输入提取,它由多个处理器以顺序方式操作,最后输出。 下图描述了整体架构: 它当前的工作方式 :Pipeline尽可能快地从提供者那里获取项目。 一旦获取了一个项目,它就会被传递给处理器。 处理完项目后,将通知输出。 虽然以顺序方式处理单个项目,但是可以并行处理多个项目(取决于从提供者获取它们的速度)。 从管道创建并返回的IObservable如下所示: return Observable.Create(async observer => { while (_provider.HasNext) { T item = await _provider.GetNextAsync(); observer.OnNext(item); } }).SelectMany(item => Observable.FromAsync(() => _processors.Aggregate( seed: Task.FromResult(item), func: (current, processor) => current.ContinueWith( // Append continuations. previous => processor.ProcessAsync(previous.Result)) .Unwrap()))); // We need to unwrap Task{T} from Task{Task{T}}; 缺少的部分 :我需要一个控制机制来控制在任何给定时间管道中可以有多少项(最大) 。 例如,如果max并行处理为3 ,那么将导致以下工作流程: […]

Parallels.ForEach与Foreach相同的时间

所有, 我使用Parallels.ForEach如下 private void fillEventDifferencesParallels(IProducerConsumerCollection events, Dictionary originalEvents) { Parallel.ForEach(events, evt => { IEvent originalEventInfo = originalEvents[evt.EventID]; evt.FillDifferences(originalEventInfo); }); } 好吧,所以我遇到的问题是我有28个这样的列表(测试样本,这应该能够扩展到200+),而FillDifferences方法非常耗时(每次调用大约4s)。 因此,在普通ForEach中运行的平均时间大约为100-130秒。 当我在Parallel中运行相同的东西时,它需要相同的时间和Spikes我的CPU(Intel I5,2 Core,每个Core 2个线程)导致应用程序在此查询运行时变得迟缓(这是在一个线程上运行这是由GUI线程产生的)。 所以我的问题是,我做错了什么导致这花费相同的时间? 我读到List不是线程安全的,所以我重写了这个以使用IProducerConsumerCollection。 是否有任何其他陷阱可能导致这种情况? FillDifferences方法调用一个静态类,该类使用reflection来找出原始对象和修改对象之间存在多少差异。 静态对象没有“全局”变量,只是调用方法的本地变量。 有些人想看看FillDifferences()方法调用了什么。 这是最终结束的地方: public List ShallowCompare(object orig, object changed, string currentName) { List differences = new List(); foreach (MemberInfo m in orig.GetType().GetMembers()) { List temp […]

在开始新任务之前检查任务是否已在运行

有一个在任务中执行的过程。 我不希望其中一个同时执行。 这是检查任务是否已在运行的正确方法吗? private Task task; public void StartTask() { if (task != null && (task.Status == TaskStatus.Running || task.Status == TaskStatus.WaitingToRun || task.Status == TaskStatus.WaitingForActivation)) { Logger.Log(“Task has attempted to start while already running”); } else { Logger.Log(“Task has began”); task = Task.Factory.StartNew(() => { // Stuff }); } }

嵌套的Parallel.ForEach循环在同一个列表中?

我需要并行化一个方法,该方法对列表中的元素进行详尽的成对比较。 串行实现很简单: foreach (var element1 in list) foreach (var element2 in list) foo(element1, element2); 在这种情况下,foo不会改变element1或element2的状态。 我知道简单地执行嵌套的Parallel.ForEach语句是不安全的: Parallel.ForEach(list, delegate(A element1) { Parallel.ForEach(list, delegate(A element2) { foo(element1, element2); }); }); 使用并行任务库实现此目的的理想方法是什么?

C#从巨大的url列表中下载数据

我有一个巨大的网页列表,显示一个状态,我需要检查。 一些url位于同一网站内,另一个url位于另一个网站上。 现在我正试图通过使用下面的代码以并行的方式做到这一点,但我觉得我造成了太多的开销。 while(ListOfUrls.Count > 0){ Parallel.ForEach(ListOfUrls, url => { WebClient webClient = new WebClient(); webClient.DownloadString(url); … run my checks here.. }); ListOfUrls = GetNewUrls….. } 这可以用更少的开销来完成,并且可以更多地控制我使用/重用的Web客户端和连接数量吗? 那么,最终工作可以更快完成吗?

您将如何简化输入和退出ReaderWriterLock?

这对我来说似乎很吵。 五行开销太多了。 m_Lock.EnterReadLock() Try Return m_List.Count Finally m_Lock.ExitReadLock() End Try 那你怎么这么简单呢?