想要使用任务并行库和进度报告来更新数据库

我开发了一个存储多个连接字符串的应用程序。 我只是迭代for循环并连接每个数据库并对每个数据库执行sql。 这样我用bulk sql语句更新多个数据库。

现在我需要使用现在的任务并行库来同时更新多个数据库,而不是一个接一个地循环更新。 我还想处理exception,并且还希望为多个数据库操作显示多个进度条。 应该有暂停和恢复function。

当我点击按钮时,将打开多个数据库连接,对于每个任务,我的表单中将添加一个新的进度条。 每个进度条将显示每个数据库操作进度。 当任何任务完成后,相应的进度条将从表单中删除。

任何人都可以使用TPL指导我如何使用示例代码。 这里我有一个代码更新一个进度条但我需要更新多个进度条。 int iterations = 100;

ProgressBar pb = new ProgressBar(); pb.Maximum = iterations; pb.Dock = DockStyle.Fill; Controls.Add(pb); Task.Create(delegate { Parallel.For(0, iterations, i => { Thread.SpinWait(50000000); // do work here BeginInvoke((Action)delegate { pb.Value++; }); }); }); 

更新问题

我是这样做的。 代码有效但所有进度条值依次增加。 我在winform应用程序中有一个表单和一个用户控件。 请看看我的代码并告诉我那里有什么问题。

主要代码

 public partial class Main : Form { public Main() { InitializeComponent(); this.DoubleBuffered = true; } private void btnStart_Click(object sender, EventArgs e) { Progress ucProgress = null; Dictionary dicList = new Dictionary(); dicList.Add("GB", "conn1"); dicList.Add("US", "conn2"); dicList.Add("DE", "conn3"); fpPanel.Controls.Clear(); Task.Factory.StartNew(() => { foreach (KeyValuePair entry in dicList) { ucProgress = new Progress(); ucProgress.Country = entry.Key; ucProgress.DBConnection = entry.Value; fpPanel.BeginInvoke((MethodInvoker)delegate { fpPanel.Controls.Add(ucProgress); ucProgress.Process(); }); //fpPanel.Controls.Add(ucProgress); System.Threading.Thread.SpinWait(5000000); } }); } private void Main_Resize(object sender, EventArgs e) { this.Invalidate(); } private void Main_Paint(object sender, PaintEventArgs e) { using (LinearGradientBrush brush = new LinearGradientBrush(this.ClientRectangle, Color.WhiteSmoke, Color.LightGray, 90F)) { e.Graphics.FillRectangle(brush, this.ClientRectangle); } } } 

用户控制代码

 public partial class Progress : UserControl { public Progress() { InitializeComponent(); lblMsg.Text = ""; pbStatus.Minimum = 0; pbStatus.Maximum = 100; } public string Country { get; set; } public string DBConnection { get; set; } public string Sql { get; set; } public void SetMessage(string strMsg) { lblMsg.Text = strMsg; } public void Process() { var uiScheduler = TaskScheduler.FromCurrentSynchronizationContext(); Task.Factory.StartNew(() => { lblMsg.BeginInvoke((MethodInvoker)delegate { lblMsg.Text = "Connecting country " + Country; }); pbStatus.BeginInvoke((MethodInvoker)delegate { pbStatus.Value = 30; }); System.Threading.Thread.SpinWait(50000000); //*********** lblMsg.BeginInvoke((MethodInvoker)delegate { lblMsg.Text = "executing sql for country " + Country; }); pbStatus.BeginInvoke((MethodInvoker)delegate { pbStatus.Value = 60; }); System.Threading.Thread.SpinWait(50000000); //*********** lblMsg.BeginInvoke((MethodInvoker)delegate { lblMsg.Text = "sql executed successfully for country " + Country; }); pbStatus.BeginInvoke((MethodInvoker)delegate { pbStatus.Value = 100; }); System.Threading.Thread.SpinWait(50000000); }); //System.Threading.Thread.SpinWait(50000000); // do work here } } 

也许它可以成为起点。 处理暂停/恢复取决于您的需求,可以进行调整。

 var cancellationTokenSource = new CancellationTokenSource(); var cancellation = cancellationTokenSource.Token; void UpdateDatabases(IEnumerable<...> databases, CancellationToken cancellation) { foreach(db in databases) { //create as many ProgressBar instances as databases you want to update //check if ProgressBar exist, then return it and reuse, otherwise create new ProgressBar pb = new ProgressBar(); pb.Maximum = iterations; pb.Dock = DockStyle.Fill; Controls.Add(pb); //start thread for every database/progress bar Task.Factory.StartNew(progressBar => { var start = (ProgressBar)progressBar).Value; //use last value in case of pause Parallel.For(start, iterations, new ParallelOptions(){CancellationToken = cancellation} (i, loopState) => { if (loopState.ShouldExitCurrentIteration) return; //perhaps check loopState.ShouldExitCurrentIteration inside worker method Thread.SpinWait(50000000); // do work here BeginInvoke((Action)delegate { ((ProgressBar)progressBar).Value++; }); }); }, pb, cancellation) .ContinueWith(task => { //to handle exceptions use task.Exception member var progressBar = (ProgressBar)task.AsyncState; if (!task.IsCancelled) { //hide progress bar here and reset pb.Value = 0 } }, TaskScheduler.FromCurrentSynchronizationContext() //update UI from UI thread ); } } //......... //Call UpdateDatabases(databases, cancellation) //To suspend, call cancellationTokenSource.Cancel(); //To resume - simply call UpdateDatabases again cancellationTokenSource = new CancellationTokenSource(); cancellation = cancellationTokenSource.Token; UpdateDatabases(databases, cancellation) 

更新

我已经查看了你的代码。 查看重新访问的代码并根据您的需求进行调整。 主要错误 – 搞乱closures并从非ui线程创建进度。 要启用并行处理,可以使用Parallel.ForEach(有关可能的重载,请参阅MSND)。 对我来说,这个设计看起来有点奇怪(你在Progress混合了数据和逻辑)。 从UI的角度来看,进度条按处理顺序显示但不是原始顺序也是奇怪的,因为它们在列表中(如果您决定按字母顺序对列表进行排序,则会出现问题)

我希望它有所帮助。

主要代码

  private void btnStart_Click(object sender, EventArgs e) { Progress ucProgress = null; Dictionary dicList = new Dictionary(); dicList.Add("GB", "conn1"); dicList.Add("US", "conn2"); dicList.Add("DE", "conn3"); fpPanel.Controls.Clear(); Func, object> createProgress = entry => { var tmp = new Progress {Country = entry.Key, DBConnection = entry.Value}; fpPanel.Controls.Add(tmp); return tmp; }; Task.Factory.StartNew(() => { //foreach (KeyValuePair entry in dicList) Parallel.ForEach(dicList, entry => { //create and add the Progress in UI thread var ucProgress = (Progress)fpPanel.Invoke(createProgress, entry); //execute ucProgress.Process(); in non-UI thread in parallel. //the .Process(); must update UI by using *Invoke ucProgress.Process(); System.Threading.Thread.SpinWait(5000000); }); }); } 

用户控制代码

 public void Process() { //uiScheduler - Not used //var uiScheduler = TaskScheduler.FromCurrentSynchronizationContext(); //The Task is not necessary because the Process() called from Parallel.ForEach //Task.Factory.StartNew(() => //{ //BeginInvoke or Invoke required lblMsg.BeginInvoke((MethodInvoker)delegate { lblMsg.Text = "Connecting country " + Country; }); pbStatus.BeginInvoke((MethodInvoker)delegate { pbStatus.Value = 30; }); System.Threading.Thread.SpinWait(50000000); //*********** lblMsg.BeginInvoke((MethodInvoker)delegate { lblMsg.Text = "executing sql for country " + Country; }); pbStatus.BeginInvoke((MethodInvoker)delegate { pbStatus.Value = 60; }); System.Threading.Thread.SpinWait(50000000); //*********** lblMsg.BeginInvoke((MethodInvoker)delegate { lblMsg.Text = "sql executed successfully for country " + Country; }); pbStatus.BeginInvoke((MethodInvoker)delegate { pbStatus.Value = 100; }); System.Threading.Thread.SpinWait(50000000); //}); //System.Threading.Thread.SpinWait(50000000); // do work here }