如何使用ConcurrentQueue 进行线程处理

我试图弄清楚使用队列的最佳方法是什么。 我有一个返回DataTable的进程。 反过来,每个DataTable都与之前的DataTable合并。 有一个问题,在最终的BulkCopy(OutOfMemory)之前要保留的记录太多。

所以,我已经确定我应该立即处理每个传入的DataTable。 考虑ConcurrentQueue …但是我没有看到WriteQueuedData()方法如何知道将WriteQueuedData()列并将其写入数据库。

例如:

 public class TableTransporter { private ConcurrentQueue tableQueue = new ConcurrentQueue(); public TableTransporter() { tableQueue.OnItemQueued += new EventHandler(WriteQueuedData); // no events available } public void ExtractData() { DataTable table; // perform data extraction tableQueue.Enqueue(table); } private void WriteQueuedData(object sender, EventArgs e) { BulkCopy(e.Table); } } 

我的第一个问题是,除了我实际上没有任何事件要订阅的事实,如果我异步调用ExtractData()这将是我需要的全部内容吗? 第二,我是否缺少ConcurrentQueue函数的方式,需要某种forms的触发器与排队对象异步工作?

更新我刚刚从ConcurrentQueue派生了一个具有OnItemQueued事件处理程序的类。 然后:

 new public void Enqueue (DataTable Table) { base.Enqueue(Table); OnTableQueued(new TableQueuedEventArgs(Table)); } public void OnTableQueued(TableQueuedEventArgs table) { EventHandler handler = TableQueued; if (handler != null) { handler(this, table); } } 

有关此实施的任何担忧?

从我对这个问题的理解来看,你遗漏了一些东西。

并发队列是一种数据结构,旨在接受多个线程读取和写入队列,而无需显式锁定数据结构。 (所有爵士乐都在幕后处理,或者集合以不需要锁定的方式实现。)

考虑到这一点,看起来您尝试使用的模式是“生产/消费者”。 首先,您有一些任务可以产生工作(并将项目添加到队列中)。 第二个你有第二个任务从队列中消耗东西(和dequeing items)。

所以你真的需要两个线程:一个添加项目,另一个删除项目。 因为您正在使用并发集合,所以您可以让多个线程添加项目和多个线程删除项目。 但显然,你对并发队列的争论越多,就会越快成为瓶颈。

我认为ConcurrentQueue仅在极少数情况下有用。 它的主要优点是无锁。 但是,通常生产者线程必须以某种方式通知消费者线程有可用于处理的数据。 线程之间的这种信令需要锁定并否定使用ConcurrentQueue的好处。 同步线程的最快方法是使用Monitor.Pulse(),它只能在锁中运行。 所有其他同步工具甚至更慢。

当然,消费者可以不断检查队列中是否存在某些东西,这些东西没有锁定,但却极大地浪费了处理器资源。 如果消费者在检查之间等待,那就好一点。

写入队列时引发线程是一个非常糟糕的主意。 使用ConcurrentQueue来节省mabe 1微秒将通过执行eventhandler完全浪费,这可能需要1000倍的时间。

如果所有处理都是在事件处理程序或异步调用中完成的,那么问题仍然是为什么还需要一个队列? 最好将数据直接传递给处理程序,根本不使用队列。

请注意,ConcurrentQueue的实现相当复杂,以允许并发。 在大多数情况下,最好使用普通的Queue <>并锁定对队列的每次访问。 由于队列访问只需要几微秒,因此2个线程在同一微秒内访问队列的可能性极小,并且由于锁定几乎没有任何延迟。 使用带锁定的普通Queue <>通常会比ConcurrentQueue更快地执行代码执行。

这是我提出的完整解决方案:

 public class TableTransporter { private static int _indexer; private CustomQueue tableQueue = new CustomQueue(); private Func RunPostProcess; private string filename; public TableTransporter() { RunPostProcess = new Func(SerializeTable); tableQueue.TableQueued += new EventHandler(tableQueue_TableQueued); } void tableQueue_TableQueued(object sender, TableQueuedEventArgs e) { // do something with table // I can't figure out is how to pass custom object in 3rd parameter RunPostProcess.BeginInvoke(e.Table,new AsyncCallback(PostComplete), filename); } public void ExtractData() { // perform data extraction tableQueue.Enqueue(MakeTable()); Console.WriteLine("Table count [{0}]", tableQueue.Count); } private DataTable MakeTable() { return new DataTable(String.Format("Table{0}", _indexer++)); } private string SerializeTable(DataTable Table) { string file = Table.TableName + ".xml"; DataSet dataSet = new DataSet(Table.TableName); dataSet.Tables.Add(Table); Console.WriteLine("[{0}]Writing {1}", Thread.CurrentThread.ManagedThreadId, file); string xmlstream = String.Empty; using (MemoryStream memstream = new MemoryStream()) { XmlSerializer xmlSerializer = new XmlSerializer(typeof(DataSet)); XmlTextWriter xmlWriter = new XmlTextWriter(memstream, Encoding.UTF8); xmlSerializer.Serialize(xmlWriter, dataSet); xmlstream = UTF8ByteArrayToString(((MemoryStream)xmlWriter.BaseStream).ToArray()); using (var fileStream = new FileStream(file, FileMode.Create)) fileStream.Write(StringToUTF8ByteArray(xmlstream), 0, xmlstream.Length + 2); } filename = file; return file; } private void PostComplete(IAsyncResult iasResult) { string file = (string)iasResult.AsyncState; Console.WriteLine("[{0}]Completed: {1}", Thread.CurrentThread.ManagedThreadId, file); RunPostProcess.EndInvoke(iasResult); } public static String UTF8ByteArrayToString(Byte[] ArrBytes) { return new UTF8Encoding().GetString(ArrBytes); } public static Byte[] StringToUTF8ByteArray(String XmlString) { return new UTF8Encoding().GetBytes(XmlString); } } public sealed class CustomQueue : ConcurrentQueue { public event EventHandler TableQueued; public CustomQueue() { } public CustomQueue(IEnumerable TableCollection) : base(TableCollection) { } new public void Enqueue (DataTable Table) { base.Enqueue(Table); OnTableQueued(new TableQueuedEventArgs(Table)); } public void OnTableQueued(TableQueuedEventArgs table) { EventHandler handler = TableQueued; if (handler != null) { handler(this, table); } } } public class TableQueuedEventArgs : EventArgs { #region Fields #endregion #region Init public TableQueuedEventArgs(DataTable Table) {this.Table = Table;} #endregion #region Functions #endregion #region Properties public DataTable Table {get;set;} #endregion } 

作为概念的certificate,它似乎运作良好。 我最多看到4个工人线程。