使用TPL的生产者消费者模型,.net 4.0中的任务

我有一个相当大的XML文件(大约1-2GB)。

要求是将xml数据持久保存到数据库中。 目前,这是通过3个步骤实现的。

  1. 尽可能多地读取内存占用较少的大文件
  2. 从xml-data创建实体
  3. 使用SqlBulkCopy将创建的实体中的数据存储到数据库中。

为了获得更好的性能,我想创建一个Producer-consumer模型,其中生产者创建一组实体,例如批量为10K并将其添加到Queue中。 并且消费者应该从队列中获取批量实体并使用sqlbulkcopy持久化到数据库。

谢谢,Gokul

void Main() { int iCount = 0; string fileName = @"C:\Data\CatalogIndex.xml"; DateTime startTime = DateTime.Now; Console.WriteLine("Start Time: {0}", startTime); FileInfo fi = new FileInfo(fileName); Console.WriteLine("File Size:{0} MB", fi.Length / 1048576.0); /* I want to change this loop to create a producer consumer pattern here to process the data parallel-ly */ foreach (var element in StreamElements(fileName,"title")) { iCount++; } Console.WriteLine("Count: {0}", iCount); Console.WriteLine("End Time: {0}, Time Taken:{1}", DateTime.Now, DateTime.Now - startTime); } private static IEnumerable StreamElements(string fileName, string elementName) { using (var rdr = XmlReader.Create(fileName)) { rdr.MoveToContent(); while (!rdr.EOF) { if ((rdr.NodeType == XmlNodeType.Element) && (rdr.Name == elementName)) { var e = XElement.ReadFrom(rdr) as XElement; yield return e; } else { rdr.Read(); } } rdr.Close(); } } 

这是你想要做的吗?

  void Main() { const int inputCollectionBufferSize = 1024; const int bulkInsertBufferCapacity = 100; const int bulkInsertConcurrency = 4; BlockingCollection inputCollection = new BlockingCollection(inputCollectionBufferSize); Task loadTask = Task.Factory.StartNew(() => { foreach (object nextItem in ReadAllElements(...)) { // this will potentially block if there are already enough items inputCollection.Add(nextItem); } // mark this collection as done inputCollection.CompleteAdding(); }); Action parseAction = () => { List bulkInsertBuffer = new List(bulkInsertBufferCapacity); foreach (object nextItem in inputCollection.GetConsumingEnumerable()) { if (bulkInsertBuffer.Length == bulkInsertBufferCapacity) { CommitBuffer(bulkInsertBuffer); bulkInsertBuffer.Clear(); } bulkInsertBuffer.Add(nextItem); } }; List parseTasks = new List(bulkInsertConcurrency); for (int i = 0; i < bulkInsertConcurrency; i++) { parseTasks.Add(Task.Factory.StartNew(parseAction)); } // wait before exiting loadTask.Wait(); Task.WaitAll(parseTasks.ToArray()); }