对HttpClient请求进行速率限制的简单方法

我正在使用System.Net.Http中的HTTPClient来对API发出请求。 API限制为每秒10个请求。

我的代码大致如下:

List tasks = new List(); items..Select(i => tasks.Add(ProcessItem(i)); try { await Task.WhenAll(taskList.ToArray()); } catch (Exception ex) { } 

ProcessItem方法做了一些事情,但总是使用以下方法调用API: await SendRequestAsync(..blah) 。 看起来像:

 private async Task SendRequestAsync(HttpRequestMessage request, CancellationToken token) { token.ThrowIfCancellationRequested(); var response = await HttpClient .SendAsync(request: request, cancellationToken: token).ConfigureAwait(continueOnCapturedContext: false); token.ThrowIfCancellationRequested(); return await Response.BuildResponse(response); } 

最初代码工作正常,但是当我开始使用Task.WhenAll时,我开始从API获得“超出速率限制”消息。 如何限制请求的速率?

值得注意的是,ProcessItem可以根据项目进行1-4次API调用。

API限制为每秒10个请求。

然后让你的代码执行一批10个请求,确保它们至少花费一秒钟:

 Items[] items = ...; int index = 0; while (index < items.Length) { var timer = Task.Delay(TimeSpan.FromSeconds(1.2)); // ".2" to make sure var tasks = items.Skip(index).Take(10).Select(i => ProcessItemsAsync(i)); var tasksAndTimer = tasks.Concat(new[] { timer }); await Task.WhenAll(tasksAndTimer); index += 10; } 

更新

我的ProcessItems方法根据项目进行1-4次API调用。

在这种情况下,批处理不是一个合适的解决方案。 您需要将异步方法限制为特定数字 ,这意味着SemaphoreSlim 。 棘手的部分是你希望随着时间的推移允许更多的电话。

我没有尝试过这段代码,但我想要的一般想法是有一个周期函数,可以释放信号量达10次。 所以,像这样:

 private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(10); private async Task ThrottledSendRequestAsync(HttpRequestMessage request, CancellationToken token) { await _semaphore.WaitAsync(token); return await SendRequestAsync(request, token); } private async Task PeriodicallyReleaseAsync(Task stop) { while (true) { var timer = Task.Delay(TimeSpan.FromSeconds(1.2)); if (await Task.WhenAny(timer, stop) == stop) return; // Release the semaphore at most 10 times. for (int i = 0; i != 10; ++i) { try { _semaphore.Release(); } catch (SemaphoreFullException) { break; } } } } 

用法:

 // Start the periodic task, with a signal that we can use to stop it. var stop = new TaskCompletionSource(); var periodicTask = PeriodicallyReleaseAsync(stop.Task); // Wait for all item processing. await Task.WhenAll(taskList); // Stop the periodic task. stop.SetResult(null); await periodicTask; 

更新的答案

我的ProcessItems方法根据项目进行1-4次API调用。 因此,批量大小为10时,我仍然超过了速率限制。

您需要在SendRequestAsync中实现滚动窗口。 包含每个请求的时间戳的队列是合适的数据结构。 您使用超过10秒的时间戳对条目进行出列。 碰巧的是,有一个实现作为SO上类似问题的答案。

原始答案

可能对其他人有用

处理此问题的一种简单方法是以10个为一组批量处理请求,同时运行这些请求,然后等待总共10秒(如果尚未完成)。 如果一批请求可以在10秒内完成,这将使您处于速率限制,但如果请求批次需要更长时间,则不是最佳。 看一下MoreLinq中的.Batch()扩展方法。 代码看起来很像

 foreach (var taskList in tasks.Batch(10)) { Stopwatch sw = Stopwatch.StartNew(); // From System.Diagnostics await Task.WhenAll(taskList.ToArray()); if (sw.Elapsed.TotalSeconds < 10.0) { // Calculate how long you still have to wait and sleep that long // You might want to wait 10.5 or 11 seconds just in case the rate // limiting on the other side isn't perfectly implemented } } 

答案与此类似。

不使用任务列表和WhenAll ,而是使用Parallel.ForEach并使用ParallelOptions将并发任务的数量限制为10,并确保每个任务至少需要1秒:

 Parallel.ForEach( items, new ParallelOptions { MaxDegreeOfParallelism = 10 }, async item => { ProcessItems(item); await Task.Delay(1000); } ); 

或者,如果您想确保每个项目尽可能接近1秒:

 Parallel.ForEach( searches, new ParallelOptions { MaxDegreeOfParallelism = 10 }, async item => { var watch = new Stopwatch(); watch.Start(); ProcessItems(item); watch.Stop(); if (watch.ElapsedMilliseconds < 1000) await Task.Delay((int)(1000 - watch.ElapsedMilliseconds)); } ); 

要么:

 Parallel.ForEach( searches, new ParallelOptions { MaxDegreeOfParallelism = 10 }, async item => { await Task.WhenAll( Task.Delay(1000), Task.Run(() => { ProcessItems(item); }) ); } );