DataTable的线程安全性

我曾经读过这个答案ADO.NET DataTable / DataRow Thread Safety ,并且无法理解一些东西。 特别是我无法理解[2]文章。 我需要使用什么样的包装? 谁能举个例子?

另外我无法理解作者的意思是谈论级联锁和完全锁定。 请举例。

DataTable根本不是为并发使用而设计的(特别是在涉及任何forms的突变的情况下)。 在我看来,这里可取的“包装”是:

  • 不需要同时处理DataTable (当涉及变异时),或者:
  • 删除DataTable ,而不是使用直接支持您需要的数据结构(例如并发集合),或者更简单并且可以简单地同步(独占或读取/写入)的数据结构

基本上:改变问题。


来自评论:

代码如下:

 Parallel.ForEach(strings, str=> { DataRow row; lock(table){ row= table.NewRow(); } MyParser.Parse(str, out row); lock(table){ table.Rows.Add(row) } }); 

我只能希望out row在这里是一个拼写错误,因为这实际上不会导致它填充通过NewRow()创建的行,但是:如果你必须使用这种方法,你就不能使用NewRow ,因为挂起的行有点共享。 你最好的选择是:

 Parallel.ForEach(strings, str=> { object[] values = MyParser.Parse(str); lock(table) { table.Rows.Add(values); } }); 

上面的重要变化是lock覆盖了整个新的行进程。 请注意,在使用Parallel.ForEach时,您无法保证顺序,因此最终订单不需要完全匹配(如果数据包含时间组件,这不应该是一个问题)。

然而! 我仍然认为你正在以错误的方式接近这一点:因为并行性是相关的,它必须是非平凡的数据。 如果你有非平凡的数据,你真的不想在内存中缓冲它。 我强烈建议做类似以下的事情,它可以在单个线程上正常工作:

 using(var bcp = new SqlBulkCopy()) using(var reader = ObjectReader.Create(ParseFile(path))) { bcp.DestinationTable = "MyLog"; bcp.WriteToServer(reader); } ... static IEnumerable ParseFile(string path) { using(var reader = File.OpenText(path)) { string line; while((line = reader.ReadLine()) != null) { yield return new LogRow { // TODO: populate the row from line here }; } } } ... public sealed class LogRow { /* define your schema here */ } 

好处:

  • 没有缓冲 – 这是一个完全流式传输操作( yield return不会将内容放入列表或类似内容)
  • 因此,行可以立即开始流式传输而无需等待整个文件首先进行预处理
  • 没有内存饱和问题
  • 没有线程并发症/开销
  • 你可以保留原始订单(通常不是关键,但很好)
  • 您只受到读取原始文件的速度的限制,这在单个线程上通常比从多个线程更快 (单个IO设备上的争用只是开销)
  • 避免DataTable所有开销,这在这里是过度的 – 因为它非常灵活,它有很大的开销
  • read(从日志文件中)和write(到数据库)现在是并发的而不是顺序的

我在自己的工作中做了很多像^^^这样的事情,从经验来看,它通常至少比首先在内存中填充DataTable 快两倍


最后 – 这是一个IEnumerable实现的示例,它接受并发读取器和编写器,而不需要在内存中缓存所有内容 – 这将允许多个线程使用SqlBulkCopy的单个线程解析数据(调用Add和finally Close )通过IEnumerable API:

 using System; using System.Collections; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; ///  /// Acts as a container for concurrent read/write flushing (for example, parsing a /// file while concurrently uploading the contents); supports any number of concurrent /// writers and readers, but note that each item will only be returned once (and once /// fetched, is discarded). It is necessary to Close() the bucket after adding the last /// of the data, otherwise any iterators will never finish ///  class ThreadSafeBucket : IEnumerable { private readonly Queue queue = new Queue(); public void Add(T value) { lock (queue) { if (closed) // no more data once closed throw new InvalidOperationException("The bucket has been marked as closed"); queue.Enqueue(value); if (queue.Count == 1) { // someone may be waiting for data Monitor.PulseAll(queue); } } } public void Close() { lock (queue) { closed = true; Monitor.PulseAll(queue); } } private bool closed; public IEnumerator GetEnumerator() { while (true) { T value; lock (queue) { if (queue.Count == 0) { // no data; should we expect any? if (closed) yield break; // nothing more ever coming // else wait to be woken, and redo from start Monitor.Wait(queue); continue; } value = queue.Dequeue(); } // yield it **outside** of the lock yield return value; } } IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } } static class Program { static void Main() { var bucket = new ThreadSafeBucket(); int expectedTotal = 0; ThreadPool.QueueUserWorkItem(delegate { int count = 0, sum = 0; foreach(var item in bucket) { count++; sum += item; if ((count % 100) == 0) Console.WriteLine("After {0}: {1}", count, sum); } Console.WriteLine("Total over {0}: {1}", count, sum); }); Parallel.For(0, 5000, new ParallelOptions { MaxDegreeOfParallelism = 3 }, i => { bucket.Add(i); Interlocked.Add(ref expectedTotal, i); } ); Console.WriteLine("all data added; closing bucket"); bucket.Close(); Thread.Sleep(100); Console.WriteLine("expecting total: {0}", Interlocked.CompareExchange(ref expectedTotal, 0, 0)); Console.ReadLine(); } }