C#5 ReadAsync和迭代器

我试图将下面的类转换为懒惰返回一个文件。

public class ObservableFile2 : IObservable { private readonly IObservable subject; public ObservableFile2(string fileName) { subject = Observable.Using ( () => new StreamReader(new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read)), streamReader => ObserveLines(streamReader) ); } private IObservable ObserveLines(StreamReader streamReader) { return ReadLines(streamReader).ToObservable(); } private IEnumerable ReadLines(StreamReader streamReader) { while (!streamReader.EndOfStream) { yield return streamReader.ReadLine(); } } public IDisposable Subscribe(IObserver observer) { return subject.Subscribe(observer); } } 

我现在正试图将其转换为使用

 StreamReader.ReadLineAsync() 

或者更好的是将数据分块,即

 await SourceStream.ReadAsync(buffer, 0, chunkSize). 

我似乎没有掌握如何包装和解包任务

欢迎提供援助。

谢谢

不是 Rx大师,所以可能有比我的答案更好的方法。

我相信这应该可以使用async -enabled Create

 public static class ObservableFile2 { public static IObservable Create(string fileName) { return Observable.Create(async (subject, token) => { try { using (var streamReader = new StreamReader(new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read)) { while (true) { token.ThrowIfCancellationRequested(); var line = await streamReader.ReadLineAsync(); if (line == null) { subject.OnCompleted(); return; } subject.OnNext(line); } } } catch (Exception ex) { subject.OnError(ex); } }); } } 

我不知道你的解决方案是否需要async-features,因为你没有提到它 – 我只能看到你想要“懒洋洋地”使用文件 – 我的猜测是你要在一个时间,如果是这样,这个人应该做的伎俩:

 public static IEnumerable EnumerateLines(string fileName) { using ( var streamReader = new StreamReader(new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read))) { while (true) { if (streamReader.EndOfStream) yield break; Console.WriteLine("read another line..."); yield return streamReader.ReadLine(); } } } 

请注意,这仅取决于StreamReader.ReadLine的实现细节。 你可以试着通过使用Lazy来进行非严格的评估 – 但是你会因为你不知道什么时候消耗掉这些值来解决文件句柄的麻烦(即使Haskell有这个问题:)) – 我的建议:不要试图懒惰文件……这可以让你轻松陷入困境