任务并行库中的任务如何影响ActivityID?

在使用任务并行库之前,我经常使用CorrelationManager.ActivityId来跟踪multithreading的跟踪/错误报告。

ActivityId存储在线程本地存储中,因此每个线程都有自己的副本。 这个想法是当你启动一个线程(活动)时,你分配一个新的ActivityId。 ActivityId将使用任何其他跟踪信息写入日志,从而可以单独列出单个“活动”的跟踪信息。 这对于WCF非常有用,因为ActivityId可以转移到服务组件。

这是我正在谈论的一个例子:

static void Main(string[] args) { ThreadPool.QueueUserWorkItem(new WaitCallback((o) => { DoWork(); })); } static void DoWork() { try { Trace.CorrelationManager.ActivityId = Guid.NewGuid(); //The functions below contain tracing which logs the ActivityID. CallFunction1(); CallFunction2(); CallFunction3(); } catch (Exception ex) { Trace.Write(Trace.CorrelationManager.ActivityId + " " + ex.ToString()); } } 

现在,通过TPL,我的理解是多个任务共享线程。 这是否意味着ActivityId很容易在任务中间重新初始化(通过另一项任务)? 是否有新的机制来处理活动追踪?

我运行了一些实验,结果certificate我的问题中的假设是错误的 – 使用TPL创建的多个任务不会同时在同一个线程上运行。

ThreadLocalStorage可以安全地与.NET 4.0中的TPL一起使用,因为一个线程一次只能由一个任务使用。

任务可以同时共享线程的假设是基于我在DotNetRocks上听到的关于c#5.0的采访 (对不起,我不记得它是哪个节目) – 所以我的问题可能(或可能不会)很快就会变得相关。

我的实验启动了许多任务,并记录了运行了多少任务,花了多长时间以及消耗了多少线程。 如果有人想重复它,代码如下。

 class Program { static void Main(string[] args) { int totalThreads = 100; TaskCreationOptions taskCreationOpt = TaskCreationOptions.None; Task task = null; Stopwatch stopwatch = new Stopwatch(); stopwatch.Start(); Task[] allTasks = new Task[totalThreads]; for (int i = 0; i < totalThreads; i++) { task = Task.Factory.StartNew(() => { DoLongRunningWork(); }, taskCreationOpt); allTasks[i] = task; } Task.WaitAll(allTasks); stopwatch.Stop(); Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds)); Console.WriteLine(String.Format("Used {0} threads", threadIds.Count)); Console.ReadKey(); } private static List threadIds = new List(); private static object locker = new object(); private static void DoLongRunningWork() { lock (locker) { //Keep a record of the managed thread used. if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId)) threadIds.Add(Thread.CurrentThread.ManagedThreadId); } Guid g1 = Guid.NewGuid(); Trace.CorrelationManager.ActivityId = g1; Thread.Sleep(3000); Guid g2 = Trace.CorrelationManager.ActivityId; Debug.Assert(g1.Equals(g2)); } } 

输出(当然这取决于机器)是:

 Completed 100 tasks in 23097 milliseconds Used 23 threads 

将taskCreationOpt更改为TaskCreationOptions.LongRunning会产生不同的结果:

 Completed 100 tasks in 3458 milliseconds Used 100 threads 

请原谅我发布这个作为答案,因为它不是你的问题的真正答案,但是,它与你的问题有关,因为它处理CorrelationManager行为和线程/任务/等。 我一直在寻找使用CorrelationManager的LogicalOperationStack (和StartLogicalOperation/StopLogicalOperation方法)在multithreading场景中提供额外的上下文。

我拿了你的例子并稍微修改它以增加使用Parallel.For并行执行工作的能力。 另外,我使用StartLogicalOperation/StopLogicalOperation来括号(内部) DoLongRunningWork 。 从概念上讲, DoLongRunningWork每次执行时都会执行以下操作:

 DoLongRunningWork StartLogicalOperation Thread.Sleep(3000) StopLogicalOperation 

我发现,如果我将这些逻辑操作添加到您的代码中(或多或少),所有逻辑操作都保持同步(始终是堆栈上预期的操作数,并且堆栈上的操作值始终为预期)。

在我自己的一些测试中,我发现并非总是这样。 逻辑操作堆栈正在“损坏”。 我能想到的最好的解释是当“子”线程退出时将CallContext信息“合并”回“父”线程上下文导致“旧”子线程上下文信息(逻辑操作)为“inheritance“由另一个兄弟姐妹线程。

问题也可能与Parallel.For显然使用主线程(至少在示例代码中,如编写)作为“工作线程”之一(或者在并行域中应该调用它们)之间的事实有关。 每当执行DoLongRunningWork时,就会启动一个新的逻辑操作(在开始时)并停止(在结束时)(即,将其推送到LogicalOperationStack并从中弹出)。 如果主线程已经有效的逻辑操作,并且DoLongRunningWork在主线程上执行,则启动新的逻辑操作,因此主线程的LogicalOperationStack现在具有两个操作。 DoLongRunningWork的任何后续执行(只要DoLongRunningWork的这个“迭代”在主线程上执行)将(显然)inheritance主线程的LogicalOperationStack(现在它有两个操作,而不仅仅是一个预期的操作)。

我花了很长时间才弄清楚为什么LogicalOperationStack的行为在我的示例中与我的示例的修改版本不同。 最后,我看到在我的代码中,我在一个逻辑操作中将整个程序括起来,而在我的测试程序的修改版本中,我没有。 这意味着在我的测试程序中,每次执行“工作”(类似于DoLongRunningWork)时,已经存在逻辑操作。 在我的测试程序的修改版本中,我没有在逻辑操作中将整个程序括起来。

所以,当我修改你的测试程序以在逻辑操作中包含整个程序时如果我使用Parallel.For,我遇到了完全相同的问题。

使用上面的概念模型,这将成功运行:

 Parallel.For DoLongRunningWork StartLogicalOperation Sleep(3000) StopLogicalOperation 

虽然这最终会因为LogicalOperationStack显然不同步而断言:

 StartLogicalOperation Parallel.For DoLongRunningWork StartLogicalOperation Sleep(3000) StopLogicalOperation StopLogicalOperation 

这是我的示例程序。 它类似于你的,因为它有一个DoLongRunningWork方法来操作ActivityId以及LogicalOperationStack。 我也有两种方式踢DoLongRunningWork。 一种风味使用任务一使用Parallel.For。 还可以执行每种风格,使得整个并行操作被包含在逻辑操作中或不包含在逻辑操作中。 因此,总共有4种方法来执行并行操作。 要尝试每个,只需取消注释所需的“使用…”方法,重新编译并运行。 UseTasksUseTasks(true)UseParallelFor都应该运行完成。 UseParallelFor(true)将在某些时候断言,因为LogicalOperationStack没有预期的条目数。

 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; namespace CorrelationManagerParallelTest { class Program { static void Main(string[] args) { //UseParallelFor(true) will assert because LogicalOperationStack will not have expected //number of entries, all others will run to completion. UseTasks(); //Equivalent to original test program with only the parallelized //operation bracketed in logical operation. ////UseTasks(true); //Bracket entire UseTasks method in logical operation ////UseParallelFor(); //Equivalent to original test program, but use Parallel.For //rather than Tasks. Bracket only the parallelized //operation in logical operation. ////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation } private static List threadIds = new List(); private static object locker = new object(); private static int mainThreadId = Thread.CurrentThread.ManagedThreadId; private static int mainThreadUsedInDelegate = 0; // baseCount is the expected number of entries in the LogicalOperationStack // at the time that DoLongRunningWork starts. If the entire operation is bracketed // externally by Start/StopLogicalOperation, then baseCount will be 1. Otherwise, // it will be 0. private static void DoLongRunningWork(int baseCount) { lock (locker) { //Keep a record of the managed thread used. if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId)) threadIds.Add(Thread.CurrentThread.ManagedThreadId); if (Thread.CurrentThread.ManagedThreadId == mainThreadId) { mainThreadUsedInDelegate++; } } Guid lo1 = Guid.NewGuid(); Trace.CorrelationManager.StartLogicalOperation(lo1); Guid g1 = Guid.NewGuid(); Trace.CorrelationManager.ActivityId = g1; Thread.Sleep(3000); Guid g2 = Trace.CorrelationManager.ActivityId; Debug.Assert(g1.Equals(g2)); //This assert, LogicalOperation.Count, will eventually fail if there is a logical operation //in effect when the Parallel.For operation was started. Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1)); Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1)); Trace.CorrelationManager.StopLogicalOperation(); } private static void UseTasks(bool encloseInLogicalOperation = false) { int totalThreads = 100; TaskCreationOptions taskCreationOpt = TaskCreationOptions.None; Task task = null; Stopwatch stopwatch = new Stopwatch(); stopwatch.Start(); if (encloseInLogicalOperation) { Trace.CorrelationManager.StartLogicalOperation(); } Task[] allTasks = new Task[totalThreads]; for (int i = 0; i < totalThreads; i++) { task = Task.Factory.StartNew(() => { DoLongRunningWork(encloseInLogicalOperation ? 1 : 0); }, taskCreationOpt); allTasks[i] = task; } Task.WaitAll(allTasks); if (encloseInLogicalOperation) { Trace.CorrelationManager.StopLogicalOperation(); } stopwatch.Stop(); Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds)); Console.WriteLine(String.Format("Used {0} threads", threadIds.Count)); Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate)); Console.ReadKey(); } private static void UseParallelFor(bool encloseInLogicalOperation = false) { int totalThreads = 100; Stopwatch stopwatch = new Stopwatch(); stopwatch.Start(); if (encloseInLogicalOperation) { Trace.CorrelationManager.StartLogicalOperation(); } Parallel.For(0, totalThreads, i => { DoLongRunningWork(encloseInLogicalOperation ? 1 : 0); }); if (encloseInLogicalOperation) { Trace.CorrelationManager.StopLogicalOperation(); } stopwatch.Stop(); Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds)); Console.WriteLine(String.Format("Used {0} threads", threadIds.Count)); Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate)); Console.ReadKey(); } } } 

如果LogicalOperationStack可以与Parallel.For(和/或其他线程/任务构造)一起使用或者如何使用它的整个问题可能值得提出自己的问题。 也许我会发一个问题。 与此同时,我想知道你是否对此有任何想法(或者,我想知道你是否考虑过使用LogicalOperationStack,因为ActivityId似乎是安全的)。

[编辑]

有关将LogicalOperationStack和/或CallContext.LogicalSetData与各种Thread / ThreadPool / Task / Parallel结构一起使用的详细信息,请参阅我对此问题的回答。

另请参阅我在这里关于LogicalOperationStack和并行扩展的问题: CorrelationManager.LogicalOperationStack是否与Parallel.For,Tasks,Threads等兼容

最后,请参阅Microsoft的Parallel Extensions论坛上的问题: http : //social.msdn.microsoft.com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9

在我的测试中,当使用Parallel.For或Parallel.Invoke时,看起来Trace.CorrelationManager.LogicalOperationStack会被破坏。如果你在主线程中启动逻辑操作,然后在委托中启动/停止逻辑操作。 在我的测试中(参见上面两个链接中的任何一个),当DoLongRunningWork正在执行时,LogicalOperationStack应该总是有2个条目(如果我在使用各种技术踢DoLongRunningWork之前在主线程中启动逻辑操作)。 因此,“损坏”是指LogicalOperationStack最终会有超过2个条目。

据我所知,这可能是因为Parallel.For和Parallel.Invoke使用主线程作为执行DoLongRunningWork操作的“工作”线程之一。

使用存储在CallContext.LogicalSetData中的堆栈来模仿LogicalOperationStack的行为(类似于通过CallContext.SetData存储的log4net的LogicalThreadContext.Stacks)会产生更糟糕的结果。 如果我使用这样的堆栈来维护上下文,它几乎在我主线程中有“逻辑操作”并且每次迭代中都有逻辑操作的所有场景中都会被破坏(即没有预期的条目数) /执行DoLongRunningWork委托。