具有支持multithreading的限制的异步任务的队列

我需要实现一个库来请求vk.com API。 问题是API每秒只支持3个请求。 我想让API异步。

重要提示: API应支持从多个线程安全访问。

我的想法是实现一些名为throttler的类,它允许不超过3个请求/秒并延迟其他请求。

接口是下一个:

public interface IThrottler : IDisposable { Task Throttle(Func<Task> task); } 

用法就像

 var audio = await throttler.Throttle(() => api.MyAudio()); var messages = await throttler.Throttle(() => api.ReadMessages()); var audioLyrics = await throttler.Throttle(() => api.AudioLyrics(audioId)); /// Here should be delay because 3 requests executed var photo = await throttler.Throttle(() => api.MyPhoto()); 

如何实施throttler?

目前我将其实现为由后台线程处理的队列。

 public Task Throttle(Func<Task> task) { /// TaskRequest has method Run() to run task /// TaskRequest uses TaskCompletionSource to provide new task /// which is resolved when queue processed til this element. var request = new TaskRequest(task); requestQueue.Enqueue(request); return request.ResultTask; } 

这是缩短处理队列的后台线程循环的代码:

 private void ProcessQueue(object state) { while (true) { IRequest request; while (requestQueue.TryDequeue(out request)) { /// Delay method calculates actual delay value and calls Thread.Sleep() Delay(); request.Run(); } } } 

是否可以在没有后台线程的情况下实现它?

因此,我们将首先解决一个更简单的问题,即创建一个可同时处理多达N个任务的队列,而不是限制每秒启动的N个任务,并在此基础上构建:

 public class TaskQueue { private SemaphoreSlim semaphore; public TaskQueue() { semaphore = new SemaphoreSlim(1); } public TaskQueue(int concurrentRequests) { semaphore = new SemaphoreSlim(concurrentRequests); } public async Task Enqueue(Func> taskGenerator) { await semaphore.WaitAsync(); try { return await taskGenerator(); } finally { semaphore.Release(); } } public async Task Enqueue(Func taskGenerator) { await semaphore.WaitAsync(); try { await taskGenerator(); } finally { semaphore.Release(); } } } 

我们还将使用以下帮助器方法将TaskCompletionSource的结果与`Task:

 public static void Match(this TaskCompletionSource tcs, Task task) { task.ContinueWith(t => { switch (t.Status) { case TaskStatus.Canceled: tcs.SetCanceled(); break; case TaskStatus.Faulted: tcs.SetException(t.Exception.InnerExceptions); break; case TaskStatus.RanToCompletion: tcs.SetResult(t.Result); break; } }); } public static void Match(this TaskCompletionSource tcs, Task task) { Match(tcs, task.ContinueWith(t => default(T))); } 

现在,对于我们的实际解决方案,我们可以做的是每次我们需要执行限制操作时,我们创建一个TaskCompletionSource ,然后进入我们的TaskQueue并添加一个启动任务的项目,将TCS与其结果匹配, 不等待它 ,然后将任务队列延迟1秒。 然后,任务队列将不允许任务启动,直到在过去的第二秒中不再启动N个任务,而操作的结果本身与创建Task相同:

 public class Throttler { private TaskQueue queue; public Throttler(int requestsPerSecond) { queue = new TaskQueue(requestsPerSecond); } public Task Enqueue(Func> taskGenerator) { TaskCompletionSource tcs = new TaskCompletionSource(); var unused = queue.Enqueue(() => { tcs.Match(taskGenerator()); return Task.Delay(TimeSpan.FromSeconds(1)); }); return tcs.Task; } public Task Enqueue(Func taskGenerator) { TaskCompletionSource tcs = new TaskCompletionSource(); var unused = queue.Enqueue(() => { tcs.Match(taskGenerator()); return Task.Delay(TimeSpan.FromSeconds(1)); }); return tcs.Task; } } 

这是一个使用秒表的解决方案:

 public class Throttler : IThrottler { private readonly Stopwatch m_Stopwatch; private int m_NumberOfRequestsInLastSecond; private readonly int m_MaxNumberOfRequestsPerSecond; public Throttler(int max_number_of_requests_per_second) { m_MaxNumberOfRequestsPerSecond = max_number_of_requests_per_second; m_Stopwatch = Stopwatch.StartNew(); } public async Task Throttle(Func> task) { var elapsed = m_Stopwatch.Elapsed; if (elapsed > TimeSpan.FromSeconds(1)) { m_NumberOfRequestsInLastSecond = 1; m_Stopwatch.Restart(); return await task(); } if (m_NumberOfRequestsInLastSecond >= m_MaxNumberOfRequestsPerSecond) { TimeSpan time_to_wait = TimeSpan.FromSeconds(1) - elapsed; await Task.Delay(time_to_wait); m_NumberOfRequestsInLastSecond = 1; m_Stopwatch.Restart(); return await task(); } m_NumberOfRequestsInLastSecond++; return await task(); } } 

以下是如何测试此代码:

 class Program { static void Main(string[] args) { DoIt(); Console.ReadLine(); } static async Task DoIt() { Func> func = async () => { await Task.Delay(100); return 1; }; Throttler throttler = new Throttler(3); for (int i = 0; i < 10; i++) { var result = await throttler.Throttle(func); Console.WriteLine(DateTime.Now); } } } 

您可以将其用作Generic

 public TaskThrottle(int maxTasksToRunInParallel) { _semaphore = new SemaphoreSlim(maxTasksToRunInParallel); } public void TaskThrottler(IEnumerable> tasks, int timeoutInMilliseconds, CancellationToken cancellationToken = default(CancellationToken)) where T : class { // Get Tasks as List var taskList = tasks as IList> ?? tasks.ToList(); var postTasks = new List>(); // When the first task completed, it will flag taskList.ForEach(x => { postTasks.Add(x.ContinueWith(y => _semaphore.Release(), cancellationToken)); }); taskList.ForEach(x => { // Wait for open slot _semaphore.Wait(timeoutInMilliseconds, cancellationToken); cancellationToken.ThrowIfCancellationRequested(); x.Start(); }); Task.WaitAll(taskList.ToArray(), cancellationToken); } 

编辑:此解决方案有效但只有在可以串行处理所有请求(在一个线程中)时才使用它。 否则使用解决方案作为答案。

好吧,多亏了.NET中的Best方法来管理单独(单个)线程上的任务队列

我的问题几乎是重复的,除了在执行前添加延迟,这实际上很简单。

这里的主要帮手是SemaphoreSlim类,它允许限制并行度。

所以,首先创建一个信号量:

 // Semaphore allows run 1 thread concurrently. private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1); 

并且,最终版本的油门看起来像

 public async Task Throttle(Func> task) { await semaphore.WaitAsync(); try { await delaySource.Delay(); return await task(); } finally { semaphore.Release(); } } 

延迟源也非常简单:

 private class TaskDelaySource { private readonly int maxTasks; private readonly TimeSpan inInterval; private readonly Queue ticks = new Queue(); public TaskDelaySource(int maxTasks, TimeSpan inInterval) { this.maxTasks = maxTasks; this.inInterval = inInterval; } public async Task Delay() { // We will measure time of last maxTasks tasks. while (ticks.Count > maxTasks) ticks.Dequeue(); if (ticks.Any()) { var now = DateTime.UtcNow.Ticks; var lastTick = ticks.First(); // Calculate interval between last maxTasks task and current time var intervalSinceLastTask = TimeSpan.FromTicks(now - lastTick); if (intervalSinceLastTask < inInterval) await Task.Delay((int)(inInterval - intervalSinceLastTask).TotalMilliseconds); } ticks.Enqueue(DateTime.UtcNow.Ticks); } }