具有支持multithreading的限制的异步任务的队列
我需要实现一个库来请求vk.com API。 问题是API每秒只支持3个请求。 我想让API异步。
重要提示: API应支持从多个线程安全访问。
我的想法是实现一些名为throttler的类,它允许不超过3个请求/秒并延迟其他请求。
接口是下一个:
public interface IThrottler : IDisposable { Task Throttle(Func<Task> task); }
用法就像
var audio = await throttler.Throttle(() => api.MyAudio()); var messages = await throttler.Throttle(() => api.ReadMessages()); var audioLyrics = await throttler.Throttle(() => api.AudioLyrics(audioId)); /// Here should be delay because 3 requests executed var photo = await throttler.Throttle(() => api.MyPhoto());
如何实施throttler?
目前我将其实现为由后台线程处理的队列。
public Task Throttle(Func<Task> task) { /// TaskRequest has method Run() to run task /// TaskRequest uses TaskCompletionSource to provide new task /// which is resolved when queue processed til this element. var request = new TaskRequest(task); requestQueue.Enqueue(request); return request.ResultTask; }
这是缩短处理队列的后台线程循环的代码:
private void ProcessQueue(object state) { while (true) { IRequest request; while (requestQueue.TryDequeue(out request)) { /// Delay method calculates actual delay value and calls Thread.Sleep() Delay(); request.Run(); } } }
是否可以在没有后台线程的情况下实现它?
因此,我们将首先解决一个更简单的问题,即创建一个可同时处理多达N个任务的队列,而不是限制每秒启动的N个任务,并在此基础上构建:
public class TaskQueue { private SemaphoreSlim semaphore; public TaskQueue() { semaphore = new SemaphoreSlim(1); } public TaskQueue(int concurrentRequests) { semaphore = new SemaphoreSlim(concurrentRequests); } public async Task Enqueue (Func> taskGenerator) { await semaphore.WaitAsync(); try { return await taskGenerator(); } finally { semaphore.Release(); } } public async Task Enqueue(Func taskGenerator) { await semaphore.WaitAsync(); try { await taskGenerator(); } finally { semaphore.Release(); } } }
我们还将使用以下帮助器方法将TaskCompletionSource
的结果与`Task:
public static void Match(this TaskCompletionSource tcs, Task task) { task.ContinueWith(t => { switch (t.Status) { case TaskStatus.Canceled: tcs.SetCanceled(); break; case TaskStatus.Faulted: tcs.SetException(t.Exception.InnerExceptions); break; case TaskStatus.RanToCompletion: tcs.SetResult(t.Result); break; } }); } public static void Match (this TaskCompletionSource tcs, Task task) { Match(tcs, task.ContinueWith(t => default(T))); }
现在,对于我们的实际解决方案,我们可以做的是每次我们需要执行限制操作时,我们创建一个TaskCompletionSource
,然后进入我们的TaskQueue
并添加一个启动任务的项目,将TCS与其结果匹配, 不等待它 ,然后将任务队列延迟1秒。 然后,任务队列将不允许任务启动,直到在过去的第二秒中不再启动N个任务,而操作的结果本身与创建Task
相同:
public class Throttler { private TaskQueue queue; public Throttler(int requestsPerSecond) { queue = new TaskQueue(requestsPerSecond); } public Task Enqueue (Func> taskGenerator) { TaskCompletionSource tcs = new TaskCompletionSource (); var unused = queue.Enqueue(() => { tcs.Match(taskGenerator()); return Task.Delay(TimeSpan.FromSeconds(1)); }); return tcs.Task; } public Task Enqueue (Func taskGenerator) { TaskCompletionSource tcs = new TaskCompletionSource (); var unused = queue.Enqueue(() => { tcs.Match(taskGenerator()); return Task.Delay(TimeSpan.FromSeconds(1)); }); return tcs.Task; } }
这是一个使用秒表的解决方案:
public class Throttler : IThrottler { private readonly Stopwatch m_Stopwatch; private int m_NumberOfRequestsInLastSecond; private readonly int m_MaxNumberOfRequestsPerSecond; public Throttler(int max_number_of_requests_per_second) { m_MaxNumberOfRequestsPerSecond = max_number_of_requests_per_second; m_Stopwatch = Stopwatch.StartNew(); } public async Task Throttle (Func> task) { var elapsed = m_Stopwatch.Elapsed; if (elapsed > TimeSpan.FromSeconds(1)) { m_NumberOfRequestsInLastSecond = 1; m_Stopwatch.Restart(); return await task(); } if (m_NumberOfRequestsInLastSecond >= m_MaxNumberOfRequestsPerSecond) { TimeSpan time_to_wait = TimeSpan.FromSeconds(1) - elapsed; await Task.Delay(time_to_wait); m_NumberOfRequestsInLastSecond = 1; m_Stopwatch.Restart(); return await task(); } m_NumberOfRequestsInLastSecond++; return await task(); } }
以下是如何测试此代码:
class Program { static void Main(string[] args) { DoIt(); Console.ReadLine(); } static async Task DoIt() { Func> func = async () => { await Task.Delay(100); return 1; }; Throttler throttler = new Throttler(3); for (int i = 0; i < 10; i++) { var result = await throttler.Throttle(func); Console.WriteLine(DateTime.Now); } } }
您可以将其用作Generic
public TaskThrottle(int maxTasksToRunInParallel) { _semaphore = new SemaphoreSlim(maxTasksToRunInParallel); } public void TaskThrottler(IEnumerable> tasks, int timeoutInMilliseconds, CancellationToken cancellationToken = default(CancellationToken)) where T : class { // Get Tasks as List var taskList = tasks as IList> ?? tasks.ToList(); var postTasks = new List>(); // When the first task completed, it will flag taskList.ForEach(x => { postTasks.Add(x.ContinueWith(y => _semaphore.Release(), cancellationToken)); }); taskList.ForEach(x => { // Wait for open slot _semaphore.Wait(timeoutInMilliseconds, cancellationToken); cancellationToken.ThrowIfCancellationRequested(); x.Start(); }); Task.WaitAll(taskList.ToArray(), cancellationToken); }
编辑:此解决方案有效但只有在可以串行处理所有请求(在一个线程中)时才使用它。 否则使用解决方案作为答案。
好吧,多亏了.NET中的Best方法来管理单独(单个)线程上的任务队列
我的问题几乎是重复的,除了在执行前添加延迟,这实际上很简单。
这里的主要帮手是SemaphoreSlim类,它允许限制并行度。
所以,首先创建一个信号量:
// Semaphore allows run 1 thread concurrently. private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);
并且,最终版本的油门看起来像
public async Task Throttle (Func> task) { await semaphore.WaitAsync(); try { await delaySource.Delay(); return await task(); } finally { semaphore.Release(); } }
延迟源也非常简单:
private class TaskDelaySource { private readonly int maxTasks; private readonly TimeSpan inInterval; private readonly Queue ticks = new Queue (); public TaskDelaySource(int maxTasks, TimeSpan inInterval) { this.maxTasks = maxTasks; this.inInterval = inInterval; } public async Task Delay() { // We will measure time of last maxTasks tasks. while (ticks.Count > maxTasks) ticks.Dequeue(); if (ticks.Any()) { var now = DateTime.UtcNow.Ticks; var lastTick = ticks.First(); // Calculate interval between last maxTasks task and current time var intervalSinceLastTask = TimeSpan.FromTicks(now - lastTick); if (intervalSinceLastTask < inInterval) await Task.Delay((int)(inInterval - intervalSinceLastTask).TotalMilliseconds); } ticks.Enqueue(DateTime.UtcNow.Ticks); } }