任务并行库 – 自定义任务计划程序

我要求将web服务请求发送到在线api,我认为Parallel Extensions非常适合我的需求。

有问题的Web服务旨在重复调用,但如果您每秒超过一定数量的呼叫,则会有一种机制向您收费。 我显然希望尽量减少我的收费,所以想知道是否有人见过可以应对以下要求的TaskScheduler:

  1. 限制每个时间跨度计划的任务数。 我想如果请求的数量超过这个限制那么它需要丢弃任务或可能阻止? (停止任务的后退日志)
  2. 检测相同的请求是否已经在要执行的调度程序中但尚未执行,如果是,则不对第二个任务进行排队,而是返回第一个任务。

人们是否觉得这些是任务调度员应该处理的责任,还是我在咆哮错误的树? 如果您有其他选择,我愿意接受建议。

我同意其他人认为TPL Dataflow听起来像是一个很好的解决方案。

要限制处理,您可以创建一个实际上不以任何方式转换数据的TransformBlock ,如果它在之前的数据之后很快到达,它就会延迟它:

 static IPropagatorBlock CreateDelayBlock(TimeSpan delay) { DateTime lastItem = DateTime.MinValue; return new TransformBlock( async x => { var waitTime = lastItem + delay - DateTime.UtcNow; if (waitTime > TimeSpan.Zero) await Task.Delay(waitTime); lastItem = DateTime.UtcNow; return x; }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); } 

然后创建一个生成数据的方法(例如从0开始的整数):

 static async Task Producer(ITargetBlock target) { int i = 0; while (await target.SendAsync(i)) i++; } 

它是异步编写的,因此如果目标块现在无法处理项目,它将等待。

然后编写一个消费者方法:

 static void Consumer(int i) { Console.WriteLine(i); } 

最后,将它们连接在一起并启动它:

 var delayBlock = CreateDelayBlock(TimeSpan.FromMilliseconds(500)); var consumerBlock = new ActionBlock( (Action)Consumer, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded }); delayBlock.LinkTo(consumerBlock, new DataflowLinkOptions { PropagateCompletion = true }); Task.WaitAll(Producer(delayBlock), consumerBlock.Completion); 

这里, delayBlock每500毫秒最多接受一个项目,而Consumer()方法可以并行运行多次。 要完成处理,请调用delayBlock.Complete()

如果你想为你的#2添加一些缓存,你可以创建另一个TransformBlock来做那里的工作并将它链接到其他块。

老实说,我会在更高的抽象层次上工作,并使用TPL Dataflow API。 唯一的问题是您需要编写一个自定义块,它将以您需要的速率限制请求,因为默认情况下,块是“贪婪的”并且将尽可能快地处理。 实现将是这样的:

  1. BufferBlock开始,这是您要发布的逻辑块。
  2. BufferBlock链接到一个自定义块,该块具有请求/秒和限制逻辑的知识。
  3. 将自定义块从2链接到ActionBlock

我没有时间在第二个时间内为#2编写自定义块,但是如果你还没有弄明白,我会稍后再回来查看并尝试填写一个实现。

我没有多少使用RX,但AFAICT Observable.Window方法可以正常工作。

http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.window(VS.103).aspx

它似乎比Throttle更合适,似乎扔掉了元素,我猜这不是你想要的

如果您需要按时间限制,您应该查看Quartz.net 。 它可以促进一致的轮询。 如果您关心所有请求,则应考虑使用某种排队机制。 MSMQ可能是正确的解决方案,但如果您想要更大并使用NServiceBus或RabbitMQ等ESB,则有许多特定的实现。

更新:

在这种情况下,如果您可以利用CTP,TPL Dataflow是您的首选解决方案。 受限制的BufferBlock是解决方案。

此示例来自Microsoft提供的文档 :

 // Hand-off through a bounded BufferBlock private static BufferBlock m_buffer = new BufferBlock( new DataflowBlockOptions { BoundedCapacity = 10 }); // Producer private static async void Producer() { while(true) { await m_buffer.SendAsync(Produce()); } } // Consumer private static async Task Consumer() { while(true) { Process(await m_buffer.ReceiveAsync()); } } // Start the Producer and Consumer private static async Task Run() { await Task.WhenAll(Producer(), Consumer()); } 

更新:

查看RX的Observable.Throttle 。