.NET中管理单独(单个)线程上任务队列的最佳方式
我知道异步编程多年来已经发生了很多变化。 我有点尴尬,我让自己在34岁时就生锈了,但我依靠StackOverflow让我加快速度。
我想要做的是在一个单独的线程上管理“工作”队列,但是这样一次只能处理一个项目。 我想在这个线程上发布工作,它不需要将任何东西传递给调用者。 当然,我可以简单地启动一个新的Thread
对象并让它在一个共享的Queue
对象上循环,使用sleeps,interrupt,wait handle等等。但是我知道事情从那以后变得更好了。 我们有BlockingCollection
, Task
, async
/ await
,更不用说可能抽象了很多的NuGet包。
我知道“什么是最好的……”这些问题通常是不受欢迎的,所以我会通过说“目前推荐的是什么……”的方式来改写,最好使用内置的.NET机制。 但是如果第三方NuGet包简化了一堆东西,它也是如此。
我认为一个TaskScheduler
实例的固定最大并发性为1,但似乎现在可能没有那么笨重的方法了。
背景
具体来说,我在这种情况下尝试做的是在Web请求期间排队IP地理定位任务。 相同的IP可能会多次排队等待地理定位,但是任务将知道如何检测并尽快跳过,如果它已经解决了。 但请求处理程序只是将这些() => LocateAddress(context.Request.UserHostAddress)
调用抛出到队列中,让LocateAddress
方法处理重复的工作检测。 我正在使用的地理位置API不喜欢被请求轰炸,这就是为什么我想一次将它限制为单个并发任务。 但是,如果允许通过简单的参数更改轻松扩展到更多并发任务,那将会很好。
要创建异步单度并行工作队列,您可以简单地创建一个SemaphoreSlim
,初始化为1,然后在启动所请求的工作之前await
获取该信号量的enqueing方法。
public class TaskQueue { private SemaphoreSlim semaphore; public TaskQueue() { semaphore = new SemaphoreSlim(1); } public async Task Enqueue (Func> taskGenerator) { await semaphore.WaitAsync(); try { return await taskGenerator(); } finally { semaphore.Release(); } } public async Task Enqueue(Func taskGenerator) { await semaphore.WaitAsync(); try { await taskGenerator(); } finally { semaphore.Release(); } } }
当然,要有一个固定的并行度而不是简单地将信号量初始化为其他数字。
我认为你最好的选择是使用TPL Dataflow
的ActionBlock
:
var actionBlock = new ActionBlock(address => { if (!IsDuplicate(address)) { LocateAddress(address); } }); actionBlock.Post(context.Request.UserHostAddress);
TPL Dataflow
是健壮的,线程安全的, async
准备的,非常可配置的基于actor的框架(可用作nuget)
这是一个更复杂案例的简单示例。 我们假设你想要:
- 启用并发(仅限于可用内核)。
- 限制队列大小(这样你就不会耗尽内存)。
- 使
LocateAddress
和队列插入都是async
。 - 一小时后取消所有内容。
var actionBlock = new ActionBlock(async address => { if (!IsDuplicate(address)) { await LocateAddressAsync(address); } }, new ExecutionDataflowBlockOptions { BoundedCapacity = 10000, MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = new CancellationTokenSource(TimeSpan.FromHours(1)).Token }); await actionBlock.SendAsync(context.Request.UserHostAddress);
使用BlockingCollection
创建一个生产者/消费者模式,其中包含一个消费者(一次只能运行一件事)和一个或多个生产者。
首先在某处定义共享队列:
BlockingCollection queue = new BlockingCollection ();
在您的消费者Thread
或Task
您从中获取:
//This will block until there's an item available Action itemToRun = queue.Take()
然后从其他线程上的任意数量的生成器,只需添加到队列:
queue.Add(() => LocateAddress(context.Request.UserHostAddress));
实际上,您不需要在一个线程中运行任务,您需要它们串行运行(一个接一个)和FIFO。 TPL没有类,但这是我非常轻量级,非阻塞的测试实现。 https://github.com/Gentlee/SerialQueue
在那里也有@Servy实现,测试显示它比我的慢两倍并且它不保证FIFO。
例:
private readonly SerialQueue queue = new SerialQueue(); async Task SomeAsyncMethod() { var result = await queue.Enqueue(DoSomething); }