如何从并发运行的任务更新进度条

我尝试将并行任务绑定到包含pprogressBars的listView。 我使用的是有限的调度程序,只允许指定的最大并行度。 到目前为止它大部分时间都可以正常工作,但有时两个任务会更新listView中的相同进度条。 以下是我的代码。

知道如何防止两个任务更新listView中的相同进度条吗? 或者如何从同时运行的任务更新进度条?

public class MyClass { public ObservableCollection StatusItems { get; set; } private Object thisLock = new Object(); public int Process() //handled { StatusItems = new ObservableCollection(); for (int i = 0; i < 4; i++) // initialize progress bar collection { StatusInfo sInfo = new StatusInfo(); sInfo.ThreadID = i; sInfo.Table = "No Table"; sInfo.Percentage = 0; sInfo.Status = AppInfo.AVAILABLE; sInfo.Minimum = 0; sInfo.Maximum = 100; sInfo.Visibility = Visibility.Visible; StatusItems.Add(sInfo); } Parent.StatusItems = StatusItems; // assign to viewmodel int numberOfBackGroundThread = 4; LimitedTaskScheduler scheduler = new LimitedTaskScheduler(numberOfBackGroundThread); TaskFactory factory = new TaskFactory(scheduler); var ui = LimitedTaskScheduler.FromCurrentSynchronizationContext(); Task[] tasks = new Task[rows.Count]; for (int i = 0; i < rows.Count; i++) { ...... tasks[i] = factory.StartNew(() => { int barIndex = -1; int threadID = Thread.CurrentThread.ManagedThreadId; cnt++; if (cnt > numberOfBackGroundThread - 1) { while (true) { for (int j = 0; j = 0) { break; } // break while loop } } else { barIndex = cnt; } StatusItems[barIndex].TabType = tabType; StatusItems[barIndex].ThreadID = threadID; int nStatus = IndividualProcess(barIndex); if (nStatus  { AppInfo.Finished = true; }); done.ContinueWith(completedTasks => { int nStatus = PostProcess(); }, ui); return returnStatus; } private int IndividualProcess(int barIndex) { for (int i=0; i< 100; i++) { perform work... SetProgressbar (i, StatusItems, barIndex, "in progress") } SetProgressbar (100, StatusItems, barIndex, "DONE") } public void SetProgressbar(int pPercent, ObservableCollection pInfo, int pInfoIndex, string pStatus) { try // Increment percentage for COPY or nested PURGE { if (Application.Current.Dispatcher.Thread != System.Threading.Thread.CurrentThread) { Application.Current.Dispatcher.BeginInvoke(new Action(() => { ((StatusInfo)pInfo[pInfoIndex]).Percentage = pPercent; ((StatusInfo)pInfo[pInfoIndex]).Status = pStatus; ((StatusInfo)pInfo[pInfoIndex]).PCT = pPercent.ToString() + "%"; })); } else // When the current thread is main UI thread. The label won't be updated until the EntityCopy() finishes. { ((StatusInfo)pInfo[pInfoIndex]).Percentage = pPercent; ((StatusInfo)pInfo[pInfoIndex]).Status = pStatus; ((StatusInfo)pInfo[pInfoIndex]).PCT = pPercent.ToString() + "%"; } } catch { throw; } } } public class LimitedTaskScheduler : TaskScheduler { // Fields // Whether the current thread is processing work items. [ThreadStatic] private static bool _currentThreadIsProcessingItems; // The list of tasks to be executed. private readonly LinkedList _tasks = new LinkedList(); // protected by lock(_tasks) /// The maximum concurrency level allowed by this scheduler. private readonly int _maxDegreeOfParallelism; /// Whether the scheduler is currently processing work items. private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks) ///  /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the /// specified degree of parallelism. ///  /// The maximum degree of parallelism provided by this scheduler. public LimitedTaskScheduler(int maxDegreeOfParallelism) { if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism"); _maxDegreeOfParallelism = maxDegreeOfParallelism; } /// Queues a task to the scheduler. /// The task to be queued. protected sealed override void QueueTask(Task task) { // Add the task to the list of tasks to be processed. If there aren't enough // delegates currently queued or running to process tasks, schedule another. lock (_tasks) { _tasks.AddLast(task); if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism) { ++_delegatesQueuedOrRunning; NotifyThreadPoolOfPendingWork(); } } } ///  /// Informs the ThreadPool that there's work to be executed for this scheduler. ///  private void NotifyThreadPoolOfPendingWork() { ThreadPool.UnsafeQueueUserWorkItem(_ => { // Note that the current thread is now processing work items. // This is necessary to enable inlining of tasks into this thread. _currentThreadIsProcessingItems = true; try { // Process all available items in the queue. while (true) { Task item; lock (_tasks) { // When there are no more items to be processed, // note that we're done processing, and get out. if (_tasks.Count == 0) { --_delegatesQueuedOrRunning; break; } // Get the next item from the queue item = _tasks.First.Value; _tasks.RemoveFirst(); } // Execute the task we pulled out of the queue base.TryExecuteTask(item); } } // We're done processing items on the current thread finally { _currentThreadIsProcessingItems = false; } }, null); } /// Attempts to execute the specified task on the current thread. /// The task to be executed. ///  /// Whether the task could be executed on the current thread. protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { // If this thread isn't already processing a task, we don't support inlining if (!_currentThreadIsProcessingItems) return false; // If the task was previously queued, remove it from the queue if (taskWasPreviouslyQueued) TryDequeue(task); // Try to run the task. return base.TryExecuteTask(task); } /// Attempts to remove a previously scheduled task from the scheduler. /// The task to be removed. /// Whether the task could be found and removed. protected sealed override bool TryDequeue(Task task) { lock (_tasks) return _tasks.Remove(task); } /// Gets the maximum concurrency level supported by this scheduler. public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } } /// Gets an enumerable of the tasks currently scheduled on this scheduler. /// An enumerable of the tasks currently scheduled. protected sealed override IEnumerable GetScheduledTasks() { bool lockTaken = false; try { Monitor.TryEnter(_tasks, ref lockTaken); if (lockTaken) return _tasks.ToArray(); else throw new NotSupportedException(); } finally { if (lockTaken) Monitor.Exit(_tasks); } } } 

更新:任务计划程序可能不相关。 我把它放在这里以防万一有人能找到一些我从未想过做过或想过的新东西。

知道如何防止两个任务更新listView中的相同进度条吗? 或者如何从同时运行的任务更新进度条?

如果您有多个并行运行的任务,并且您确实希望向用户显示每个单独任务的进度,则需要为每个任务显示单独的进度条。

你如何做到这取决于UI结构。 例如,如果您有一个列表视图,其中每个项目都是一个任务,您可以为每个listview项目添加一个进度条作为子窗口:

带有进度条的Listview

这可能是一种矫枉过正,通常只需要一个进度条来跟踪整体进度就足够了。 即,如果您有100个任务,则当完成所有100个任务时,您的进度条将显示100%。

但请注意,任务#50(例如)可能在任务#45之前完成,因此您无法使用任务编号来更新进度。 要正确显示进度,您需要计算已完成的任务,并将该计数器用作进度指示器。

更新以解决评论:

我在listview中有4个进度条和500个任务。 由于调度程序有限,在任何给定时间只有4个任务同时运行。 我尝试将列表视图中的进度条分配给具有空闲状态的新传入任务,然后在任务完成时将进度条的状态设置为空闲,以便进度条可以被另一个新传入任务重用。 那有意义吗? 或者我要去deadend?

我不确定这是一个明智的用户界面设计决定,但如果你想这样做,你可以使用SemaphoreSlim将进度条分配为有限的资源。 例:

 using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using System.Windows.Forms; namespace TaskProgress { public partial class Form1 : Form { public Form1() { InitializeComponent(); } private async void Form1_Load(object sender, EventArgs e) { await DoWorkAsync(); } const int MAX_PARALLEL = 4; readonly object _lock = new Object(); readonly SemaphoreSlim _semaphore = new SemaphoreSlim(initialCount: MAX_PARALLEL); HashSet _pendingTasks; Queue _availableProgressBars; // do all Task async Task DoWorkAsync() { _availableProgressBars = new Queue(); _pendingTasks = new HashSet(); var progressBars = new ProgressBar[] { this.progressBar1, this.progressBar2, this.progressBar3, this.progressBar4 }; foreach (var item in progressBars) _availableProgressBars.Enqueue(item); for (int i = 0; i < 50; i++) // start 50 tasks QueueTaskAsync(DoTaskAsync()); await Task.WhenAll(WithLock(() => _pendingTasks.ToArray())); } // do a sigle Task readonly Random _random = new Random(Environment.TickCount); async Task DoTaskAsync() { await _semaphore.WaitAsync(); try { var progressBar = WithLock(() => _availableProgressBars.Dequeue()); try { progressBar.Maximum = 100; progressBar.Value = 0; IProgress progress = new Progress(value => progressBar.Value = value); await Task.Run(() => { // our simulated work takes no more than 10s var sleepMs = _random.Next(10000) / 100; for (int i = 0; i < 100; i++) { Thread.Sleep(sleepMs); // simulate work item progress.Report(i); } }); } finally { WithLock(() => _availableProgressBars.Enqueue(progressBar)); } } finally { _semaphore.Release(); } } // Add/remove a task to the list of pending tasks async void QueueTaskAsync(Task task) { WithLock(() => _pendingTasks.Add(task)); try { await task; } catch { if (!task.IsCanceled && !task.IsFaulted) throw; return; } WithLock(() => _pendingTasks.Remove(task)); } // execute func inside a lock T WithLock(Func func) { lock (_lock) return func(); } // execute action inside a lock void WithLock(Action action) { lock (_lock) action(); } } } 

此代码不使用自定义任务调度程序, SemaphoreSlim足以限制并发性。 另请注意,保护锁( WithLock )在这里是多余的,因为除了Task.Run lambda之外的所有内容都运行在UI线程上。 但是,我决定保留锁,因为您的应用程序可能有不同的线程模型。 在这种情况下,无论您在何处访问ProgressBar或其他UI,都应该使用BeginInvoke或类似方法在UI线程上执行此操作。

此外,请检查“在4.5中的异步:在异步API中启用进度和取消”,以获取有关Progress模式的更多详细信息。