使用async / await与DataReader? (没有中间缓冲区!)

我的目标很简单,我想做异步I / O调用(使用异步等待) – 但是:

  • 不使用DataFlow依赖( 如在此答案中
  • 没有中间缓冲区( 不像这个答案
  • Projector函数应作为参数发送。 ( 不是这个答案

好。

目前这里是我的代码,它的工作是从db读取并将每一行投影到Func

 public IEnumerable  GetSomeData  (string sql, Func  projector) { using(SqlConnection _conn = new SqlConnection(@"Data Source=...")) { using(SqlCommand _cmd = new SqlCommand(sql, _conn)) { _conn.Open(); _cmd.CommandTimeout = 100000; using(IDataReader rdr = _cmd.ExecuteReader()) { while (rdr.Read()) yield return projector(rdr); } } } } 

那么,投影机是什么?

每个类都有一个获取recordIDataRecord )并创建实体的函数:

示例:

 public class MyClass { public static MyClass MyClassFactory(IDataRecord record) { return new MyClass { Name = record["Name"].ToString(), Datee = DateTime.Parse(record["Datee"].ToString()), val = decimal.Parse(record["val"].ToString()) }; } public string Name { get; set; } public DateTime Datee { get; set; } public decimal val { get; set; } } 

所以在这里, MyClassFactory将是Func

那我现在怎么运行呢?

  var sql = @"SELECT TOP 1000 [NAME],[datee] ,[val] FROM [WebERP].[dbo].[t]"; var a = GetSomeData  (sql, MyClass.MyClassFactory).Where(...); //notice the Func 

一切都好。

问题现在开始:

向该方法添加async会产生错误:(是的,我知道Ienumerable是一个同步接口因此问题)

public async Task<IEnumerable > GetSomeData (string sql, Func projector)

不能是迭代器块,因为’System.Threading.Tasks.Task>’不是迭代器接口类型

但是这个人在这里做了 – :

在此处输入图像描述

哪个会编译。

如何转换我的代码以支持完全异步IO调用?

(在条件下:没有DataFlow依赖,发送投影函数作为参数,没有中间缓冲区)

我想做异步I / O调用(使用异步等待) – 但是:

  • 不使用DataFlow依赖(如在此答案中)
  • 没有中间缓冲区(不像这个答案)
  • Projector函数应作为参数发送。 (不是这个答案)

您可能想查看Stephen Toub的“Tasks,Monads和LINQ” ,了解如何处理异步数据序列的一些好主意。

结合yieldawait不可能的,但我将在这里成为一个动词:引用的要求没有列出IEnumerable和LINQ。 所以,这是一个可能的解决方案,形状为两个协同程序(几乎未经测试)。

数据生成器例程(对应于具有yield IEnumarable ):

 public async Task GetSomeDataAsync( string sql, Func projector, ProducerConsumerHub hub) { using (SqlConnection _conn = new SqlConnection(@"Data Source=...")) { using (SqlCommand _cmd = new SqlCommand(sql, _conn)) { await _conn.OpenAsync(); _cmd.CommandTimeout = 100000; using (var rdr = await _cmd.ExecuteReaderAsync()) { while (await rdr.ReadAsync()) await hub.ProduceAsync(projector(rdr)); } } } } 

数据使用者例程(对应于foreach或LINQ表达式):

 public async Task ConsumeSomeDataAsync(string sql) { var hub = new ProducerConsumerHub(); var producerTask = GetSomeDataAsync(sql, rdr => rdr, hub); while (true) { var nextItemTask = hub.ConsumeAsync(); await Task.WhenAny(producerTask, nextItemTask); if (nextItemTask.IsCompleted) { // process the next data item Console.WriteLine(await nextItemTask); } if (producerTask.IsCompleted) { // process the end of sequence await producerTask; break; } } } 

协同执行助手(也可以实现为一对自定义等待者 ):

 public class ProducerConsumerHub { TaskCompletionSource _consumer = new TaskCompletionSource(); TaskCompletionSource _producer = new TaskCompletionSource(); // TODO: make thread-safe public async Task ProduceAsync(T data) { _producer.SetResult(data); await _consumer.Task; _consumer = new TaskCompletionSource(); } public async Task ConsumeAsync() { var data = await _producer.Task; _producer = new TaskCompletionSource(); _consumer.SetResult(Empty.Value); return data; } struct Empty { public static readonly Empty Value = default(Empty); } } 

这只是一个想法。 对于像这样的简单任务来说,这可能是一种过度杀伤,并且可以在某些方面进行改进(例如线程安全,竞争条件和处理序列结束而不接触producerTask )。 然而,它说明了异步数据检索和处理如何可能解耦。