如何使用响应式扩展同时读取交错文件

我是反应式扩展的新手,我想使用它(在c#中)来读取包含多个交错流的文件。 基本上该文件的格式为ABCDABCDABCD... 我更喜欢按顺序读取文件并分离流(即AAA..BBB..等)并并行处理每个流,每个流使用不同的线程。

必须有某种forms的缓冲,以确保每个流可以尽可能地保持忙碌(当然在限制范围内)。 并非所有流都必须同时启动,在这种情况下,必须跳过延迟流的许多元素。 在这种情况下,缓冲可能会弥补差距。

文件中的元素很小(4个字节),因此非常健谈。 因此,我也在寻找一种有效处理这种方法的方法。

我开始创建一个枚举来读取文件。 这可以用于提供包含流ID的结构,或者可以基于顺序(元素数模数量的流)来分离流。 后者可能更有效率。

这个问题已经“全部依赖”,特别是当你谈论性能和效率时,但提供了一个有点人为的例子。 也就是说,与真实文件相比,您的示例文件非常简单。 但是,我会尝试就它有用的机会提供一些建议。

这是一种将流转换为Enumerable 。 流将应用缓冲,这将一次发回一个结果。 这可以提高效率(发回数据块),但在某些时候你需要一次处理一个,它也可能在这里。 不要过早地优化。

 IEnumerable ReadBytes(Stream stream) { using (StreamReader reader = new StreamReader(stream)) { while (!reader.EndOfStream) yield return (char)reader.Read(); } } 

现在,让我们说这是’输出’可观察量的处理代码。 首先,我设置输出可观察量,然后我适当地订阅它们。 请注意,我在这里使用数组,所以我的输出可观察索引是数组索引。 如果流索引不能转换为从零开始的索引,也可以使用字典。

 var outputs = Enumerable.Repeat(0, 3).Select(_ => new Subject()).ToArray(); outputs[0].Delay(TimeSpan.FromSeconds(2)).Subscribe(x => Console.WriteLine("hi: {0}", x)); outputs[1].Delay(TimeSpan.FromSeconds(1)).Subscribe(x => Console.WriteLine("ho: {0}", x)); outputs[2].Subscribe(x => Console.WriteLine("he: {0}", x)); 

注意使用Subject来发送我的元素。 这取决于元素的类型,但char在给定的示例中起作用。 另请注意,我延迟元素只是为了certificate一切正常。 它们现在是独立的流,您可以随心所欲地做任何事情。

好的,给定一个文件流:

 var file = @"C:\test.txt"; var buffer = 32; var stream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, buffer); 

我现在可以订阅并使用模数索引发送到正确的输出流:

 ReadBytes(stream) .ToObservable(Scheduler.ThreadPool) .Select((x,i) => new { Key = (i % 3), Value = x }) // you can change it up here .Subscribe(x => outputs[x.Key].OnNext(x.Value)); 

这里可能有更高效的方法,具体取决于您如何计算目标流,但这个想法保持不变。

输入文件只包含一行: ABCABCABCABCABCABC

运行程序的输出是:

 he: C he: C he: C he: C he: C he: C 

一秒后:

 ho: B ho: B ho: B ho: B ho: B ho: B 

然后是另一秒:

 hi: A hi: A hi: A hi: A hi: A hi: A 

以下是我的解决方案,它基于yamen的答案。 它似乎工作正常,这意味着顺序交错输入被分成多个顺序流并行处理(multithreading)。

但是,我不确定这是否是一个正确的实现(在编程风格,rx契约等方面)。

 const int MAX_BUFFERED_ELEMENTS = 1024; // number of streams in the file var numberOfStreams = 8; // semaphore to limit buffered elements var semaphore = new SemaphoreSlim(MAX_BUFFERED_ELEMENTS); var cts = new CancellationTokenSource(); // should be used to cancel (left out of this sample) // create subjects that are the base of each output stream var subjects = Enumerable.Repeat(0, numberOfStreams).Select(_ => new Subject()).ToArray(); // create the source stream (reader is IEnumerable) var observable = reader.ToObservable(Scheduler.ThreadPool).Publish(); // forward elements from source to the output subjects int stream = 0; observable.Subscribe(x => { semaphores.Wait(cts.Token); // wait if buffer is full _subjects[stream].OnNext(x); // forward to output stream if (++stream >= numberOfStreams) stream = 0; }); // stream = stream++ % numberOfStreams // build output streams subjects.Select( (s,i) => s.ObserveOn(Scheduler.ThreadPool) // process on separate threads .Do(_ => semaphore.Release()) // signal that element is consumed .Subscribe(x => Console.WriteLine("stream: {0}\t element: {1}", i, x)) // debug 'processing' ); // start processing! observable.Connect();