如何在Rx.NET中以正确的方式约束并发

请观察以下代码段:

var result = await GetSource(1000).SelectMany(s => getResultAsync(s).ToObservable()).ToList(); 

此代码的问题在于getResultAsync以不受约束的方式并发运行。 在某些情况下,这可能不是我们想要的。 假设我想将其并发限制为最多10个并发调用。 什么是Rx.NET方法呢?

我附上一个简单的控制台应用程序,演示了所描述问题的主题和我的蹩脚解决方案。

还有一些额外的代码,比如Stats类和人工随机睡眠。 它们用于确保我真正获得并发执行并且可以可靠地计算在此过程中达到的最大并发性。

RunUnconstrained方法演示了天真的,无约束的运行。 RunConstrained方法显示了我的解决方案,这不是很优雅。 理想情况下,我想通过简单地将专用Rx运算符应用于Monad来简化约束并发性。 当然,不会牺牲性能。

 using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Reactive.Linq; using System.Reactive.Threading.Tasks; using System.Threading; using System.Threading.Tasks; namespace RxConstrainedConcurrency { class Program { public class Stats { public int MaxConcurrentCount; public int CurConcurrentCount; public readonly object MaxConcurrentCountGuard = new object(); } static void Main() { RunUnconstrained().GetAwaiter().GetResult(); RunConstrained().GetAwaiter().GetResult(); } static async Task RunUnconstrained() { await Run(AsyncOp); } static async Task RunConstrained() { using (var sem = new SemaphoreSlim(10)) { await Run(async (s, pause, stats) => { // ReSharper disable AccessToDisposedClosure await sem.WaitAsync(); try { return await AsyncOp(s, pause, stats); } finally { sem.Release(); } // ReSharper restore AccessToDisposedClosure }); } } static async Task Run(Func<string, int, Stats, Task> getResultAsync) { var stats = new Stats(); var rnd = new Random(0x1234); var result = await GetSource(1000).SelectMany(s => getResultAsync(s, rnd.Next(30), stats).ToObservable()).ToList(); Debug.Assert(stats.CurConcurrentCount == 0); Debug.Assert(result.Count == 1000); Debug.Assert(!result.Contains(0)); Debug.WriteLine("Max concurrency = " + stats.MaxConcurrentCount); } static IObservable GetSource(int count) { return Enumerable.Range(1, count).Select(i => i.ToString()).ToObservable(); } static Task AsyncOp(string s, int pause, Stats stats) { return Task.Run(() => { int cur = Interlocked.Increment(ref stats.CurConcurrentCount); if (stats.MaxConcurrentCount < cur) { lock (stats.MaxConcurrentCountGuard) { if (stats.MaxConcurrentCount < cur) { stats.MaxConcurrentCount = cur; } } } try { Thread.Sleep(pause); return int.Parse(s); } finally { Interlocked.Decrement(ref stats.CurConcurrentCount); } }); } } } 

您可以使用Merge的重载在Rx中执行此操作,该重载限制内部可观察对象的并发预订数。

这种forms的Merge应用于流的流。

通常,使用SelectMany从事件调用异步任务会执行两个任务:它将每个事件投影到一个可观察的流中,其单个事件就是结果,并将所有结果流展平在一起。

要使用Merge我们必须使用常规Select将每个事件投影到异步任务的调用中(从而创建流的流),并使用Merge来展平结果。 它将通过仅在任何时间点订阅所提供的固定数量的内部流来以受约束的方式执行此操作。

我们必须小心,只有在订阅包装内部流时才调用每个异步任务调用。 使用ToObservable()将异步任务转换为observable实际上会立即调用异步任务,而不是订阅,因此我们必须将评估推迟到使用Observable.Defer进行订阅。

以下是将所有这些步骤放在一起的示例:

 void Main() { var xs = Observable.Range(0, 10); // source events // "Double" here is our async operation to be constrained, // in this case to 3 concurrent invocations xs.Select(x => Observable.Defer(() => Double(x).ToObservable())).Merge(3) .Subscribe(Console.WriteLine, () => Console.WriteLine("Max: " + MaxConcurrent)); } private static int Concurrent; private static int MaxConcurrent; private static readonly object gate = new Object(); public async Task Double(int x) { var concurrent = Interlocked.Increment(ref Concurrent); lock(gate) { MaxConcurrent = Math.Max(concurrent, MaxConcurrent); } await Task.Delay(TimeSpan.FromSeconds(1)); Interlocked.Decrement(ref Concurrent); return x * 2; } 

这里的最大并发输出将是“3”。 删除合并以“不受约束”,而你将得到“10”。

获得读取更好的Defer效果的另一种(等效)方法是使用FromAsync而不是Defer + ToObservable

 xs.Select(x => Observable.FromAsync(() => Double(x))).Merge(3)