如何限制每秒HttpWebRequest对Web服务器的数量?

当使用HttpWebRequest向一个应用程序服务器发出并行请求时,我需要实现一个限制机制(每秒请求数)。 我的C#应用​​程序每秒必须向远程服务器发出不超过80个请求。 远程服务管理员强加的限制不是硬限制,而是我的平台和他们之间的“SLA”。

使用HttpWebRequest时,如何控制每秒的请求数?

我有同样的问题,找不到一个现成的解决方案,所以我做了一个,就在这里。 我们的想法是使用BlockingCollection添加需要处理的项目,并使用Reactive Extensions来订阅速率受限的处理器。

Throttle类是此速率限制器的重命名版本

 public static class BlockingCollectionExtensions { // TODO: devise a way to avoid problems if collection gets too big (produced faster than consumed) public static IObservable AsRateLimitedObservable(this BlockingCollection sequence, int items, TimeSpan timePeriod, CancellationToken producerToken) { Subject subject = new Subject(); // this is a dummyToken just so we can recreate the TokenSource // which we will pass the proxy class so it can cancel the task // on disposal CancellationToken dummyToken = new CancellationToken(); CancellationTokenSource tokenSource = CancellationTokenSource.CreateLinkedTokenSource(producerToken, dummyToken); var consumingTask = new Task(() => { using (var throttle = new Throttle(items, timePeriod)) { while (!sequence.IsCompleted) { try { T item = sequence.Take(producerToken); throttle.WaitToProceed(); try { subject.OnNext(item); } catch (Exception ex) { subject.OnError(ex); } } catch (OperationCanceledException) { break; } } subject.OnCompleted(); } }, TaskCreationOptions.LongRunning); return new TaskAwareObservable(subject, consumingTask, tokenSource); } private class TaskAwareObservable : IObservable, IDisposable { private readonly Task task; private readonly Subject subject; private readonly CancellationTokenSource taskCancellationTokenSource; public TaskAwareObservable(Subject subject, Task task, CancellationTokenSource tokenSource) { this.task = task; this.subject = subject; this.taskCancellationTokenSource = tokenSource; } public IDisposable Subscribe(IObserver observer) { var disposable = subject.Subscribe(observer); if (task.Status == TaskStatus.Created) task.Start(); return disposable; } public void Dispose() { // cancel consumption and wait task to finish taskCancellationTokenSource.Cancel(); task.Wait(); // dispose tokenSource and task taskCancellationTokenSource.Dispose(); task.Dispose(); // dispose subject subject.Dispose(); } } } 

unit testing:

 class BlockCollectionExtensionsTest { [Fact] public void AsRateLimitedObservable() { const int maxItems = 1; // fix this to 1 to ease testing TimeSpan during = TimeSpan.FromSeconds(1); // populate collection int[] items = new[] { 1, 2, 3, 4 }; BlockingCollection collection = new BlockingCollection(); foreach (var i in items) collection.Add(i); collection.CompleteAdding(); IObservable observable = collection.AsRateLimitedObservable(maxItems, during, CancellationToken.None); BlockingCollection processedItems = new BlockingCollection(); ManualResetEvent completed = new ManualResetEvent(false); DateTime last = DateTime.UtcNow; observable // this is so we'll receive exceptions .ObserveOn(new SynchronizationContext()) .Subscribe(item => { if (item == 1) last = DateTime.UtcNow; else { TimeSpan diff = (DateTime.UtcNow - last); last = DateTime.UtcNow; Assert.InRange(diff.TotalMilliseconds, during.TotalMilliseconds - 30, during.TotalMilliseconds + 30); } processedItems.Add(item); }, () => completed.Set() ); completed.WaitOne(); Assert.Equal(items, processedItems, new CollectionEqualityComparer()); } } 

Throttle()和Sample()扩展方法(On Observable)允许您将快速事件序列调节为“较慢”序列。

这是一篇博客文章,其中包含一个 Sample(Timespan) ,可确保最大速率。

我的原帖讨论了如何通过客户端行为扩展向WCF添加限制机制,但后来指出我误解了这个问题(doh!)。

总的来说,这种方法可以用一个确定我们是否违反速率限制的类来检查。 关于如何检查速率违规已经有很多讨论。

限制方法在N秒内调用M个请求

如果您违反了速率限制,请暂停一段时间并再次检查。 如果没有,那么继续进行HttpWebRequest调用。