取消长时间运行任务后如何正确清理

我创建了一个类,其目的是抽象出对队列的并发访问的控制。

该类被设计为在单个线程上实例化,由多个线程写入,然后从后续单个线程读取。

我在类中生成了一个长时间运行的任务,它将执行阻塞循环并在项成功出列时触发事件。

我的问题是:我执行取消长时间运行的任务并随后清理/重置CancellationTokenSource对象的正确用法吗?

理想情况下,我希望能够在保持可用性以添加到队列的同时停止并重新启动活动对象。

我用Peter Bromberg的文章作为基础: C#4.0中的生产者/消费者队列和BlockingCollection

代码如下:

 using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; namespace Test { public delegate void DeliverNextQueuedItemHandler(T item); public sealed class SOQueueManagerT { ConcurrentQueue _multiQueue; BlockingCollection _queue; CancellationTokenSource _canceller; Task _listener = null; public event DeliverNextQueuedItemHandler OnNextItem; public bool IsRunning { get; private set; } public int QueueSize { get { if (_queue != null) return _queue.Count; return -1; } } public CancellationTokenSource CancellationTokenSource { get { if (_canceller == null) _canceller = new CancellationTokenSource(); return _canceller; } } public SOQueueManagerT() { _multiQueue = new ConcurrentQueue(); _queue = new BlockingCollection(_multiQueue); IsRunning = false; } public void Start() { if (_listener == null) { IsRunning = true; _listener = Task.Factory.StartNew(() => { while (!CancellationTokenSource.Token.IsCancellationRequested) { T item; if (_queue.TryTake(out item, 100)) { if (OnNextItem != null) { OnNextItem(item); } } } }, CancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); } } public void Stop() { if (_listener != null) { CancellationTokenSource.Cancel(); CleanUp(); } } public void Add(T item) { _queue.Add(item); } private void CleanUp() { _listener.Wait(2000); if (_listener.IsCompleted) { IsRunning = false; _listener = null; _canceller = null; } } } } 

更新这是我最后一起去的地方。 它并不完美,但到目前为止还在做这项工作。

 public sealed class TaskQueueManager { ConcurrentQueue _multiQueue; BlockingCollection _queue; CancellationTokenSource _canceller; Task _listener = null; public event DeliverNextQueuedItemHandler OnNextItem; public bool IsRunning { get { if (_listener == null) return false; else if (_listener.Status == TaskStatus.Running || _listener.Status == TaskStatus.Created || _listener.Status == TaskStatus.WaitingForActivation || _listener.Status == TaskStatus.WaitingToRun || _listener.IsCanceled) return true; else return false; } } public int QueueSize { get { if (_queue != null) return _queue.Count; return -1; } } public TaskQueueManager() { _multiQueue = new ConcurrentQueue(); _queue = new BlockingCollection(_multiQueue); } public void Start() { if (_listener == null) { _canceller = new CancellationTokenSource(); _listener = Task.Factory.StartNew(() => { while (!_canceller.Token.IsCancellationRequested) { T item; if (_queue.TryTake(out item, 100)) { if (OnNextItem != null) { try { OnNextItem(item); } catch (Exception e) { //log or call an event } } } } }, _canceller.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); } } public void Stop() { if (_listener != null) { _canceller.Cancel(); if (_listener.IsCanceled && !_listener.IsCompleted) _listener.Wait(); _listener = null; _canceller = null; } } public void Add(T item) { if (item != null) { _queue.Add(item); } else { throw new ArgumentNullException("TaskQueueManager.Add item is null"); } } } 

精心编程是唯一能够削减它的东西。 即使您取消操作,您可能还有一个未在时间上完成的待处理操作。 它很可能是一个僵局的阻塞操作。 在这种情况下,您的程序实际上不会终止。

例如,如果我多次调用您的CleanUp方法或者没有先调用Start,那么我感觉它会崩溃。

在清理期间超时2秒,感觉比计划更随意,我实际上会确保事情正常关闭或崩溃/挂起(你永远不想让并发的东西处于未知状态)。

此外, IsRunning是显式设置的,不是从对象的状态推断出来的。

为了获得灵感,我希望你看一下我最近写的一个类似的类,它是一个生产者/消费者模式,它在后台线程中工作。 您可以在CodePlex上找到该源代码。 虽然,这是为解决一个非常具体的问题而设计的。

这里,取消是通过命令只有工作线程识别并因此开始关闭的特定类型来解决的。 这也确保我永远不会取消待处理的工作,只考虑整个工作单元。

为了改善这种情况,您可以为当前工作设置单独的计时器,如果取消,则中止或回滚未完成的工作。 现在,实现类似行为的事务将需要一些试验和错误,因为你需要查看每个可能的极端情况并问自己,如果程序在这里崩溃会发生什么? 理想情况下,所有这些代码路径都会导致可恢复或已知状态,您可以从中恢复工作。 但正如我认为你已经猜到的那样,那将需要仔细的编程和大量的测试。