.NET中管理单独(单个)线程上任务队列的最佳方式

我知道异步编程多年来已经发生了很多变化。 我有点尴尬,我让自己在34岁时就生锈了,但我依靠StackOverflow让我加快速度。

我想要做的是在一个单独的线程上管理“工作”队列,但是这样一次只能处理一个项目。 我想在这个线程上发布工作,它不需要将任何东西传递给调用者。 当然,我可以简单地启动一个新的Thread对象并让它在一个共享的Queue对象上循环,使用sleeps,interrupt,wait handle等等。但是我知道事情从那以后变得更好了。 我们有BlockingCollectionTaskasync / await ,更不用说可能抽象了很多的NuGet包。

我知道“什么是最好的……”这些问题通常是不受欢迎的,所以我会通过说“目前推荐的是什么……”的方式来改写,最好使用内置的.NET机制。 但是如果第三方NuGet包简化了一堆东西,它也是如此。

我认为一个TaskScheduler实例的固定最大并发性为1,但似乎现在可能没有那么笨重的方法了。

背景

具体来说,我在这种情况下尝试做的是在Web请求期间排队IP地理定位任务。 相同的IP可能会多次排队等待地理定位,但是任务将知道如何检测并尽快跳过,如果它已经解决了。 但请求处理程序只是将这些() => LocateAddress(context.Request.UserHostAddress)调用抛出到队列中,让LocateAddress方法处理重复的工作检测。 我正在使用的地理位置API不喜欢被请求轰炸,这就是为什么我想一次将它限制为单个并发任务。 但是,如果允许通过简单的参数更改轻松扩展到更多并发任务,那将会很好。

要创建异步单度并行工作队列,您可以简单地创建一个SemaphoreSlim ,初始化为1,然后在启动所请求的工作之前await获取该信号量的enqueing方法。

 public class TaskQueue { private SemaphoreSlim semaphore; public TaskQueue() { semaphore = new SemaphoreSlim(1); } public async Task Enqueue(Func> taskGenerator) { await semaphore.WaitAsync(); try { return await taskGenerator(); } finally { semaphore.Release(); } } public async Task Enqueue(Func taskGenerator) { await semaphore.WaitAsync(); try { await taskGenerator(); } finally { semaphore.Release(); } } } 

当然,要有一个固定的并行度而不是简单地将信号量初始化为其他数字。

我认为你最好的选择是使用TPL DataflowActionBlock

 var actionBlock = new ActionBlock(address => { if (!IsDuplicate(address)) { LocateAddress(address); } }); actionBlock.Post(context.Request.UserHostAddress); 

TPL Dataflow是健壮的,线程安全的, async准备的,非常可配置的基于actor的框架(可用作nuget)

这是一个更复杂案例的简单示例。 我们假设你想要:

  • 启用并发(仅限于可用内核)。
  • 限制队列大小(这样你就不会耗尽内存)。
  • 使LocateAddress和队列插入都是async
  • 一小时后取消所有内容。
 var actionBlock = new ActionBlock(async address => { if (!IsDuplicate(address)) { await LocateAddressAsync(address); } }, new ExecutionDataflowBlockOptions { BoundedCapacity = 10000, MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = new CancellationTokenSource(TimeSpan.FromHours(1)).Token }); await actionBlock.SendAsync(context.Request.UserHostAddress); 

使用BlockingCollection创建一个生产者/消费者模式,其中包含一个消费者(一次只能运行一件事)和一个或多个生产者。

首先在某处定义共享队列:

 BlockingCollection queue = new BlockingCollection(); 

在您的消费者ThreadTask您从中获取:

 //This will block until there's an item available Action itemToRun = queue.Take() 

然后从其他线程上的任意数量的生成器,只需添加到队列:

 queue.Add(() => LocateAddress(context.Request.UserHostAddress)); 

实际上,您不需要在一个线程中运行任务,您需要它们串行运行(一个接一个)和FIFO。 TPL没有类,但这是我非常轻量级,非阻塞的测试实现。 https://github.com/Gentlee/SerialQueue

在那里也有@Servy实现,测试显示它比我的慢两倍并且它不保证FIFO。

例:

 private readonly SerialQueue queue = new SerialQueue(); async Task SomeAsyncMethod() { var result = await queue.Enqueue(DoSomething); }