创建手动线程,也尝试使用parallel.foreach和async等待 – 但是获得重复

问题:获取重复的项目,即创建的线程数量超过数组大小…嗨,伙计们,我在循环中为数组的每个元素创建线程。 真正的用途是使用亚马逊ses发送一批消息。 消息存储在messageamazonRequestBatch中,循环遍历批处理并发送消息。


Thread thrdSendEmail; try { string amazonMessageID = string.Empty; List lstThread = new List(); foreach (int n in arrMessageid) { thrdSendEmail = new Thread(() => { try { amazonMessageID = SendSimpleEmail_Part2(messageAmazonRequestBatch.ElementAt(n).req); messageAmazonRequestBatch.ElementAt(n).msg.AmazonMessageID = amazonMessageID; logManager_MessageLogwithAmazonmsgID.LogMessage(",\t" + n , true); //logManager_MessageLogwithAmazonmsgID.LogMessage(",\t" + n + ",\t" + messageAmazonRequestBatch.ElementAt(n).msg.QueueMessageId + ",\t" + amazonMessageID, true); } catch (Exception ex) { logManager_RunSummary.LogMessage(ex.Message, true); } }); thrdSendEmail.Name = n.ToString(); lstThread.Add(thrdSendEmail); thrdSendEmail.Start(); //logManager_MessageLogwithAmazonmsgID.LogMessage(",\t" + n, true); } foreach (Thread t in lstThread) { t.Join(); //logManager_MessageLogwithAmazonmsgID.LogMessage(",\t" + t.Name, true); } } catch (Exception ex) { logManager_RunSummary.LogMessage(ex.Message, true); } 

我也尝试过parallel.foreach和asynch并等待选项…他们也提供了重复项。 我知道锁将解决问题,但在我的情况下,锁会降低性能10倍……这是我的性能下降了10倍…因为将sendemail登录锁定阻止直到我得到一个返回amazonmessageid来自亚马逊……

任何有关这方面的帮助将不胜感激。 我不是新手程序员,但是新线程…我的联系电子邮件是shabbirbohra@gmail.com


 private int SendEmailTask_Ver9_23Jan()//tried to create manual threads in parallel foreach and called SendSimpleEmail_Part3 but still duplicates { activeThreadCount++; threadCount++; IList messageBatch = null; lock (dbLocker) { if (activeThreadCount > maxNoofTaskCount)//targetThreadCount { return 0; } if (abort) { sendComplete = true; return 0; } try { messageBatch = messageRepository.ash_GetNextBatch_AirmailVer2(maxBatchSize, this.senderTrackingHost);//messageBatch = messageRepository.ash_GetNextBatch(maxBatchSize); } catch (Exception ex) { logManager_RunSummary.LogException(ex); messageBatch = new List(); } Console.WriteLine(this.currentStatus); } while (messageBatch != null && messageBatch.Count != 0) { IDictionary toUpdate = new Dictionary(); batchSize = messageBatch.Count; sendComplete = false; //foreach (Airmail.Core.Message message in messageBatch) logManager_CollectionLog.LogMessage("\tBatch\t-\t" + messageBatch.Count + "\t-\t" + System.Threading.Thread.CurrentThread.ManagedThreadId, true);//ASH-TEST 11Jan14 int intCounter = 0;//ash-teset 11han14 System.Collections.Concurrent.ConcurrentBag messageBatchConcurrent = new System.Collections.Concurrent.ConcurrentBag(messageBatch); //All public and protected members of ConcurrentBag are thread-safe and may be used concurrently from multiple threads. //foreach (Airmail.Core.Message message in messageBatchConcurrent) Parallel.ForEach(messageBatchConcurrent, message => { //messageBatchConcurrent.Where(x => x == message).Take(1);//ash12Jan14 lock (statLocker) { //messageBatchConcurrent.TryTake(out message); totalProcessed++; intCounter += 1;//ASH-TEST 10Jan14 message.ash_BatchLoopCounter = intCounter.ToString(); //message.ash_BatchSizeCount = messageBatchConcurrent.Count.ToString(); } if (message.ExpiryDate  (the body is just a function called for each item) } lock (statLocker) { StatisticKey key = new StatisticKey(Convert.ToInt32(message.ash_campaignHistoryID), Convert.ToInt32(message.ash_campaignTemplateID), message.Status);//ASH25,OCT13//Airmail 2.0 changes if (!statistics.ContainsKey(key)) statistics.Add(key, 0); statistics[key]--; } try { string amazonMessageID = string.Empty; if (message.Attachments == null || message.Attachments == "")//ASH25,OCT13//Airmail 2.0 changes { //test//if (intCounter > 1000) { Debugger.Break(); } SendEmailResponse response = null; if (message.ash_isSent == "YES") { return; } //if (message.ash_isSent == null) { response = SendSimpleEmail(ref message, message.QueueMessageId, message.ash_BatchLoopCounter + "-" + message.ash_BatchSizeCount, message.ash_isSent); }//ASH-TEST 11Jan14 /// Start - this is parallel.invoke testing on 23Jan14 try { //Parallel.Invoke( // delegate() // Param #2 - in-line delegate // { //mReq.msg.AmazonMessageID = SendSimpleEmail_Part2((SendEmailRequest)mReq.req); //logManager_MessageLog.LogMessage(",\t" + mReq.msg.QueueMessageId, true); // } //); //intthreadCount++; //logManager_MessageLog.LogMessage(",\t creating new thread", true); Thread thrdSendEmail = new Thread(() => { if (message.ash_isSent == null) { response = SendSimpleEmail_Part3(message, message.QueueMessageId, message.ash_BatchLoopCounter + "-" + message.ash_BatchSizeCount, message.ash_isSent); } }); lock (statLocker) { thrdSendEmail.Start(); } thrdSendEmail.Join(); //logManager_MessageLog.LogMessage(",\t finishing new thread", true); } // No exception is expected in this example, but if one is still thrown from a task, // it will be wrapped in AggregateException and propagated to the main thread. catch (AggregateException e) { Console.WriteLine("An action has thrown an exception. THIS WAS UNEXPECTED.\n{0}", e.InnerException.ToString()); } /// End - this is parallel.invoke testing on 23Jan14 //SendRawEmailResponse response = SendRawEmail(message); //cSH12Jan14-test//sqlLogSentMessage += "EXEC ash_Log_SentMessageids " + "@MessageID = " + message.QueueMessageId + ", " + "@Identifier = '" + message.Identifier.ToString() + "', " + "@AmazonMessageID = '" + message.AmazonMessageID + "', " + "@Status = " + ((int)message.Status).ToString() + ", " + "@ToEmailAddress = '" + message.To.Address + "', " + "@CreatedDate = '" + DateTime.UtcNow.ToString() + "'\n"; //logManager_MessageLog.LogMessage( ",\t" + message.ash_BatchLoopCounter + "-" + message.ash_BatchSizeCount + ",\t" + response.SendEmailResult.MessageId + ",\t" + message.QueueMessageId, true);//ASH-TEST 11Jan14 lock (statLocker) { if (response != null) amazonMessageID = response.SendEmailResult.MessageId; if (message.ash_isSent == "DUPLICATE") { return; } } //logManager_CollectionLog.LogMessage("\tSendSimpleEmail\t-\t" + message.ash_BatchSizeCount + "-" + message.ash_BatchLoopCounter + "\t-\t" + message.QueueMessageId, true);//ASH-TEST 10Jan14 } else { SendRawEmailResponse response = SendRawEmail(message); lock (statLocker) { if (response != null) amazonMessageID = response.SendRawEmailResult.MessageId; intCounter += 1;//ASH-TEST 10Jan14 logManager_MessageLog.LogMessage("\tSendRawEmail-1" + intCounter + "\t-\t" + amazonMessageID + "\t-\t" + message.QueueMessageId, true);//ASH-TEST 10Jan14 } } lock (statLocker) { message.AmazonMessageID = amazonMessageID; toUpdate.Add(message, message.UpdateStatus(amazonMessageID == string.Empty ? MessageStatus.Tested : MessageStatus.Sent, "", null, null, true)); messageCount++; } } catch (Exception ex) { if (ex.Message.ToLower().Contains("blacklist") || ex.Message.ToLower().Contains("rejected") || ex.Message.ToLower().Contains("not verified") || ex.Message.ToLower().Contains("illegal") //|| message.OldStatus == MessageStatus.Failed) || message.Status == MessageStatus.Failed) { toUpdate.Add(message, message.UpdateStatus(MessageStatus.Undeliverable, ex.Message, null, null, true)); } else { toUpdate.Add(message, message.UpdateStatus(MessageStatus.Failed, ex.Message, null, null, true)); } Console.WriteLine(ex.Message.ToLower());//ASH22Nov } lock (statLocker) { StatisticKey key = new StatisticKey(Convert.ToInt32(message.ash_campaignHistoryID), Convert.ToInt32(message.ash_campaignTemplateID), message.Status);//ASH25,OCT13//Airmail 2.0 changes if (!statistics.ContainsKey(key)) statistics.Add(key, 0); statistics[key]++; } }); lock (dbLocker) { //cSH12Jan14-test//messageRepository.ash_Log_SentMessageids(sqlLogSentMessage);//ASH12Jan14 try { Task UpdateMessages_Task = Task.Factory.StartNew(() => messageRepository.ash_UpdateMessages(toUpdate), TaskCreationOptions.AttachedToParent); UpdateMessages_Task.Wait();//ASH18Sep2013 - This task added for updating message asynchronously } catch (Exception ex) { logManager_RunSummary.LogException(ex); } if (activeThreadCount > maxNoofTaskCount)//targetThreadCount { return 0; } if (abort) { sendComplete = true; return 1; } try { if (messageBatch == null && messageBatch.Count == 0) { messageBatch = messageRepository.ash_GetNextBatch_AirmailVer2(maxBatchSize, this.senderTrackingHost);//messageBatch = messageRepository.ash_GetNextBatch(maxBatchSize); } else { messageBatch = null; } } catch (Exception ex) { logManager_RunSummary.LogException(ex); messageBatch = new List(); } Console.WriteLine(this.currentStatus); } } return 1; } 

这是一个“访问修改后的闭包”问题。 请查看更多详细信息。 它有很多例子。


 foreach (int n in arrMessageid) { int tempN = n; Thread thrdSendEmail = new Thread(() => { try { amazonMessageID = SendSimpleEmail_Part2(messageAmazonRequestBatch.ElementAt(tempN ).req); messageAmazonRequestBatch.ElementAt(tempN ).msg.AmazonMessageID = amazonMessageID; logManager_MessageLogwithAmazonmsgID.LogMessage(",\t" + tempN , true); //logManager_MessageLogwithAmazonmsgID.LogMessage(",\t" + tempN + ",\t" + messageAmazonRequestBatch.ElementAt(tempN ).msg.QueueMessageId + ",\t" + amazonMessageID, true); } catch (Exception ex) { logManager_RunSummary.LogMessage(ex.Message, true); } }); thrdSendEmail.Name = n.ToString(); lstThread.Add(thrdSendEmail); thrdSendEmail.Start(); //logManager_MessageLogwithAmazonmsgID.LogMessage(",\t" + n, true); }