Tag: parallel extensions

使用Parallel Linq Extensions结合两个序列,如何才能首先产生最快的结果?

假设我有两个序列返回整数1到5。 第一个返回1,2和3非常快,但4和5每个需要200毫秒。 public static IEnumerable FastFirst() { for (int i = 1; i 3) Thread.Sleep(200); yield return i; } } 第二个返回1,2和3,延迟时间为200ms,但快速返回4和5。 public static IEnumerable SlowFirst() { for (int i = 1; i < 6; i++) { if (i < 4) Thread.Sleep(200); yield return i; } } 联合这两个序列只给出数字1到5。 FastFirst().Union(SlowFirst()); 我不能保证两种方法中的哪一种在什么时候有延迟,所以执行的顺序不能保证为我提供解决方案。 因此,我想并行化联盟,以便最小化我的例子中的(人为的)延迟。 一个真实场景:我有一个返回一些实体的缓存,以及一个返回所有实体的数据源。 我希望能够从一个方法返回一个迭代器,该方法将请求内部并行化到缓存和数据源,以便缓存的结果尽可能快地生成。 注1:我意识到这仍然在浪费CPU周期; 我不是在问我如何防止序列迭代它们的慢元素,我怎么能尽可能快地将它们联合起来。 更新1:我已经定制了achitaka-san对接受多个生成器的响应,并使用ContinueWhenAll将BlockingCollection的CompleteAdding设置为一次。 […]

任务并行库 – 自定义任务计划程序

我要求将web服务请求发送到在线api,我认为Parallel Extensions非常适合我的需求。 有问题的Web服务旨在重复调用,但如果您每秒超过一定数量的呼叫,则会有一种机制向您收费。 我显然希望尽量减少我的收费,所以想知道是否有人见过可以应对以下要求的TaskScheduler: 限制每个时间跨度计划的任务数。 我想如果请求的数量超过这个限制那么它需要丢弃任务或可能阻止? (停止任务的后退日志) 检测相同的请求是否已经在要执行的调度程序中但尚未执行,如果是,则不对第二个任务进行排队,而是返回第一个任务。 人们是否觉得这些是任务调度员应该处理的责任,还是我在咆哮错误的树? 如果您有其他选择,我愿意接受建议。

在任务中抛出exception – “等待”与等待()

static async void Main(string[] args) { Task t = new Task(() => { throw new Exception(); }); try { t.Start(); t.Wait(); } catch (AggregateException e) { // When waiting on the task, an AggregateException is thrown. } try { t.Start(); await t; } catch (Exception e) { // When awating on the task, the exception […]

CorrelationManager.LogicalOperationStack是否与Parallel.For,Tasks,Threads等兼容

有关背景信息,请参阅此问题: 任务并行库中的任务如何影响ActivityID? 该问题询问Tasks如何影响Trace.CorrelationManager.ActivityId 。 @Greg Samson用测试程序回答了他自己的问题,该程序显示ActivityId在Tasks的上下文中是可靠的。 测试程序在Task委托的开头设置一个ActivityId,hibernate以模拟工作,然后检查最后的ActivityId以确保它是相同的值(即它没有被另一个线程修改)。 该程序成功运行。 在研究线程,任务和并行操作的其他“上下文”选项(最终为日志提供更好的上下文)时,我遇到了Trace.CorrelationManager.LogicalOperationStack的一个奇怪问题(无论如何我都很奇怪)。 我在下面的问题中复制了我的“答案”。 我认为它充分描述了我遇到的问题(Trace.CorrelationManager.LogicalOperationStack显然已经损坏 – 或者什么 – 当在Parallel.For的上下文中使用时,但只有当Parallel.For本身包含在逻辑操作中时) 。 这是我的问题: Trace.CorrelationManager.LogicalOperationStack应该可以与Parallel.For一起使用吗? 如果是这样,如果逻辑操作已经与Parallel.For启动了,它是否会产生影响? 是否有一种“正确”的方式使用LogicalOperationStack与Parallel.For? 我能不同地对这个示例程序进行编码以使其“有效”吗? 通过“工作”,我的意思是LogicalOperationStack总是具有预期的条目数,并且条目本身是预期的条目。 我已经使用Threads和ThreadPool线程做了一些额外的测试,但是我必须返回并重试那些测试,看看我是否遇到了类似的问题。 我会说,看起来任务/并行线程和ThreadPool线程确实从父线程“inheritance”了Trace.CorrelationManager.ActivityId和Trace.CorrelationManager.LogicalOperationStack值。 这是预期的,因为CorrelationManager使用CallContext的LogicalSetData方法(而不是SetData)存储这些值。 请再次参考此问题,以获取我在下面发布的“答案”的原始背景: 任务并行库中的任务如何影响ActivityID? 另请参阅Microsoft的Parallel Extensions论坛上的这个类似问题(目前尚未得到解答): http://social.msdn.microsoft.com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9 [BEGIN PASTE] 请原谅我发布这个作为答案,因为它不是你的问题的真正答案,但是,它与你的问题有关,因为它处理CorrelationManager行为和线程/任务/等。 我一直在寻找使用CorrelationManager的LogicalOperationStack (和StartLogicalOperation/StopLogicalOperation方法)在multithreading场景中提供额外的上下文。 我拿了你的例子并稍微修改它以增加使用Parallel.For并行执行工作的能力。 另外,我使用StartLogicalOperation/StopLogicalOperation来括号(内部) DoLongRunningWork 。 从概念上讲, DoLongRunningWork每次执行时都会执行以下操作: DoLongRunningWork StartLogicalOperation Thread.Sleep(3000) StopLogicalOperation 我发现,如果我将这些逻辑操作添加到您的代码中(或多或少),所有逻辑操作都保持同步(始终是堆栈上预期的操作数,并且堆栈上的操作值始终为预期)。 在我自己的一些测试中,我发现并非总是这样。 逻辑操作堆栈正在“损坏”。 我能想到的最好的解释是当“子”线程退出时将CallContext信息“合并”回“父”线程上下文导致“旧”子线程上下文信息(逻辑操作)为“inheritance“由另一个兄弟姐妹线程。 问题也可能与Parallel.For显然使用主线程(至少在示例代码中,如编写)作为“工作线程”之一(或者在并行域中应该调用它们)之间的事实有关。 每当执行DoLongRunningWork时,就会启动一个新的逻辑操作(在开始时)并停止(在结束时)(即,将其推送到LogicalOperationStack并从中弹出)。 如果主线程已经有效的逻辑操作,并且DoLongRunningWork在主线程上执行,则启动新的逻辑操作,因此主线程的LogicalOperationStack现在具有两个操作。 DoLongRunningWork的任何后续执行(只要DoLongRunningWork的这个“迭代”在主线程上执行)将(显然)inheritance主线程的LogicalOperationStack(现在它有两个操作,而不仅仅是一个预期的操作)。 我花了很长时间才弄清楚为什么LogicalOperationStack的行为在我的示例中与我的示例的修改版本不同。 最后,我看到在我的代码中,我在一个逻辑操作中将整个程序括起来,而在我的测试程序的修改版本中,我没有。 […]

Parallel.For步长

有谁知道是否有任何重载允许我在Parallel.For循环中指定步长? c#或VB.Net中的样本会很棒。 谢谢,贡萨洛

列出线程安全

我使用以下代码 var processed = new List(); Parallel.ForEach(items, item => { processed.Add(SomeProcessingFunc(item)); }); 上面的代码线程安全吗? 处理后的列表是否有可能被破坏? 或者我应该在添加之前使用锁? var processed = new List(); Parallel.ForEach(items, item => { lock(items.SyncRoot) processed.Add(SomeProcessingFunc(item)); }); 谢谢。