对HttpClient请求进行速率限制的简单方法
我正在使用System.Net.Http中的HTTPClient来对API发出请求。 API限制为每秒10个请求。
我的代码大致如下:
List tasks = new List(); items..Select(i => tasks.Add(ProcessItem(i)); try { await Task.WhenAll(taskList.ToArray()); } catch (Exception ex) { }
ProcessItem方法做了一些事情,但总是使用以下方法调用API: await SendRequestAsync(..blah)
。 看起来像:
private async Task SendRequestAsync(HttpRequestMessage request, CancellationToken token) { token.ThrowIfCancellationRequested(); var response = await HttpClient .SendAsync(request: request, cancellationToken: token).ConfigureAwait(continueOnCapturedContext: false); token.ThrowIfCancellationRequested(); return await Response.BuildResponse(response); }
最初代码工作正常,但是当我开始使用Task.WhenAll时,我开始从API获得“超出速率限制”消息。 如何限制请求的速率?
值得注意的是,ProcessItem可以根据项目进行1-4次API调用。
API限制为每秒10个请求。
然后让你的代码执行一批10个请求,确保它们至少花费一秒钟:
Items[] items = ...; int index = 0; while (index < items.Length) { var timer = Task.Delay(TimeSpan.FromSeconds(1.2)); // ".2" to make sure var tasks = items.Skip(index).Take(10).Select(i => ProcessItemsAsync(i)); var tasksAndTimer = tasks.Concat(new[] { timer }); await Task.WhenAll(tasksAndTimer); index += 10; }
更新
我的ProcessItems方法根据项目进行1-4次API调用。
在这种情况下,批处理不是一个合适的解决方案。 您需要将异步方法限制为特定数字 ,这意味着SemaphoreSlim
。 棘手的部分是你希望随着时间的推移允许更多的电话。
我没有尝试过这段代码,但我想要的一般想法是有一个周期函数,可以释放信号量达10次。 所以,像这样:
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(10); private async Task ThrottledSendRequestAsync(HttpRequestMessage request, CancellationToken token) { await _semaphore.WaitAsync(token); return await SendRequestAsync(request, token); } private async Task PeriodicallyReleaseAsync(Task stop) { while (true) { var timer = Task.Delay(TimeSpan.FromSeconds(1.2)); if (await Task.WhenAny(timer, stop) == stop) return; // Release the semaphore at most 10 times. for (int i = 0; i != 10; ++i) { try { _semaphore.Release(); } catch (SemaphoreFullException) { break; } } } }
用法:
// Start the periodic task, with a signal that we can use to stop it. var stop = new TaskCompletionSource
更新的答案
我的ProcessItems方法根据项目进行1-4次API调用。 因此,批量大小为10时,我仍然超过了速率限制。
您需要在SendRequestAsync中实现滚动窗口。 包含每个请求的时间戳的队列是合适的数据结构。 您使用超过10秒的时间戳对条目进行出列。 碰巧的是,有一个实现作为SO上类似问题的答案。
原始答案
可能对其他人有用
处理此问题的一种简单方法是以10个为一组批量处理请求,同时运行这些请求,然后等待总共10秒(如果尚未完成)。 如果一批请求可以在10秒内完成,这将使您处于速率限制,但如果请求批次需要更长时间,则不是最佳。 看一下MoreLinq中的.Batch()扩展方法。 代码看起来很像
foreach (var taskList in tasks.Batch(10)) { Stopwatch sw = Stopwatch.StartNew(); // From System.Diagnostics await Task.WhenAll(taskList.ToArray()); if (sw.Elapsed.TotalSeconds < 10.0) { // Calculate how long you still have to wait and sleep that long // You might want to wait 10.5 or 11 seconds just in case the rate // limiting on the other side isn't perfectly implemented } }
答案与此类似。
不使用任务列表和WhenAll
,而是使用Parallel.ForEach并使用ParallelOptions将并发任务的数量限制为10,并确保每个任务至少需要1秒:
Parallel.ForEach( items, new ParallelOptions { MaxDegreeOfParallelism = 10 }, async item => { ProcessItems(item); await Task.Delay(1000); } );
或者,如果您想确保每个项目尽可能接近1秒:
Parallel.ForEach( searches, new ParallelOptions { MaxDegreeOfParallelism = 10 }, async item => { var watch = new Stopwatch(); watch.Start(); ProcessItems(item); watch.Stop(); if (watch.ElapsedMilliseconds < 1000) await Task.Delay((int)(1000 - watch.ElapsedMilliseconds)); } );
要么:
Parallel.ForEach( searches, new ParallelOptions { MaxDegreeOfParallelism = 10 }, async item => { await Task.WhenAll( Task.Delay(1000), Task.Run(() => { ProcessItems(item); }) ); } );