当与ConcurrentDictionary 一起使用时,C#LINQ OrderBy线程是否安全?
我的工作假设是,当与System.Collections.Concurrent集合(包括ConcurrentDictionary )一起使用时,LINQ是线程安全的。
(其他Overflowpost似乎同意: 链接 )
但是,对LINQ OrderBy扩展方法的实现的检查表明,对于实现ICollection的并发集合的子集(例如ConcurrentDictionary ),它似乎不是线程安全的。
OrderedEnumerable GetEnumerator ( 此处为source )构造一个Buffer结构的实例( 此处为source ),它尝试将集合转换为ICollection ( ConcurrentDictionary实现),然后执行一个collection.CopyTo,其数组初始化为集合的大小。
因此,如果在OrderBy操作期间ConcurrentDictionary (在这种情况下为具体ICollection )的大小增加,在初始化数组和复制到它之间,此操作将抛出。
以下测试代码显示此exception:
(注意:我很欣赏在一个线程安全的集合上执行一个OrderBy ,这个集合在你下面发生变化并没有那么有意义,但我不认为它应该抛出)
using System; using System.Collections.Concurrent; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace Program { class Program { static void Main(string[] args) { try { int loop = 0; while (true) //Run many loops until exception thrown { Console.WriteLine($"Loop: {++loop}"); _DoConcurrentDictionaryWork().Wait(); } } catch (Exception ex) { Console.WriteLine(ex); } } private static async Task _DoConcurrentDictionaryWork() { var concurrentDictionary = new ConcurrentDictionary(); var keyGenerator = new Random(); var tokenSource = new CancellationTokenSource(); var orderByTaskLoop = Task.Run(() => { var token = tokenSource.Token; while (token.IsCancellationRequested == false) { //Keep ordering concurrent dictionary on a loop var orderedPairs = concurrentDictionary.OrderBy(x => x.Key).ToArray(); //THROWS EXCEPTION HERE //...do some more work with ordered snapshot... } }); var updateDictTaskLoop = Task.Run(() => { var token = tokenSource.Token; while (token.IsCancellationRequested == false) { //keep mutating dictionary on a loop var key = keyGenerator.Next(0, 1000); concurrentDictionary[key] = new object(); } }); //Wait for 1 second await Task.Delay(TimeSpan.FromSeconds(1)); //Cancel and dispose token tokenSource.Cancel(); tokenSource.Dispose(); //Wait for orderBy and update loops to finish (now token cancelled) await Task.WhenAll(orderByTaskLoop, updateDictTaskLoop); } } }
OrderBy抛出exception会导致一些可能的结论:
1)我对LINQ作为并发集合的线程安全的假设是不正确的,并且只能在LINQ查询期间对集合执行LINQ(无论它们是并发还是不兼容)是安全的
2)LINQ OrderBy的实现存在一个错误,并且实现尝试将源集合转换为ICollection并尝试执行集合副本是不正确的(并且它应该直接进入迭代IEnumerable的默认行为) )。
3)我误解了这里发生了什么……
非常感谢!
在任何地方都没有说明OrderBy
(或其他LINQ方法)应该始终使用源IEnumerable
GetEnumerator
,或者它应该在并发集合上是线程安全的。 所有承诺的都是这种方法
根据键按升序对序列的元素进行排序。
在某些全局意义上, ConcurrentDictionary
也不是线程安全的。 对于在其上执行的其他操作,它是线程安全的。 甚至更多,文档说
ConcurrentDictionary的所有公共成员和受保护成员都是线程安全的,可以从多个线程同时使用。 但是 ,通过ConcurrentDictionary实现的其中一个接口访问的成员( 包括扩展 方法 )不保证是线程安全的,并且可能需要由调用者同步。
所以,你的理解是正确的( OrderBy
会看到传递给它的IEnumerable
真的是ICollection
,然后会得到那个集合的长度,分配那个大小的缓冲区,然后调用ICollection.CopyTo
,这当然不是任何线程安全的收集的类型),但它不是OrderBy
的错误,因为OrderBy
和ConcurrentDictionary
都没有承诺你所假设的。
如果要在ConcurrentDictionary
上以线程安全的方式执行OrderBy
,则需要依赖承诺为线程安全的方法。 例如:
// note: this is NOT IEnumerable.ToArray() // but public ToArray() method of ConcurrentDictionary itself // it is guaranteed to be thread safe with respect to other operations // on this dictionary var snapshot = concurrentDictionary.ToArray(); // we are working on snapshot so no one other thread can modify it // of course at this point real contents of dictionary might not be // the same as our snapshot var sorted = snapshot.OrderBy(c => c.Key);
如果你不想分配额外的数组(使用ToArray
),你可以使用Select(c => c)
并且在这种情况下它会起作用,但是我们再次处于没有实际意义的区域并依赖于某些东西可以安全使用在情况下它没有被承诺( Select
也不会总是枚举你的集合。如果集合是数组或列表 – 它将快捷方式并使用索引器代替)。 所以你可以像这样创建扩展方法:
public static class Extensions { public static IEnumerable ForceEnumerate (this ICollection collection) { foreach (var item in collection) yield return item; } }
如果你想要安全并且不想分配数组,请使用它:
concurrentDictionary.ForceEnumerate().OrderBy(c => c.Key).ToArray();
在这种情况下,我们强制枚举ConcurrentDictionary
(我们知道文档是安全的),然后将其传递给OrderBy
因为它知道它不会对纯IEnumerable
造成任何伤害。 请注意,正如mjwills在注释中正确指出的那样,这与ToArray
不完全相同,因为ToArray
会生成快照(锁定集合以防止在构建数组时进行修改)而Select
\ yield
不会获取任何锁定(因此可能会添加项目或删除项目)当枚举正在进行时。 虽然我怀疑在执行相关描述之类的事情时很重要 – 在OrderBy
完成后的两种情况下 – 您都不知道您的有序结果是否反映了当前的收集状态。