有没有办法将任务并行库(TPL)与SQLDataReader一起使用?

我喜欢TPL中Parallel.For和Parallel.ForEach扩展方法的简单性。 我想知道是否有办法利用类似的东西,甚至是稍微高级的任务。

下面是SqlDataReader的典型用法,我想知道是否可能,如果是这样,如何用TPL中的东西替换下面的while循环。 因为读者无法提供固定数量的迭代,所以不能使用For扩展方法,这样就可以处理我将收集的任务。 我希望有人可能已经解决了这个问题,然后在ADO.net上做了一些事情。

using (SqlConnection conn = new SqlConnection("myConnString")) using (SqlCommand comm = new SqlCommand("myQuery", conn)) { conn.Open(); SqlDataReader reader = comm.ExecuteReader(); if (reader.HasRows) { while (reader.Read()) { // Do something with Reader } } } 

你快到了。 使用此签名包裹您在函数中发布的代码:

 IEnumerable MyQuery() 

然后用以下代码替换你的// Do something with Reader代码// Do something with Reader

 yield return reader; 

现在你有一些在单个线程中工作的东西。 不幸的是,当您阅读查询结果时,它每次都返回对同一对象的引用,并且对象只是为每次迭代而改变自身。 这意味着如果你尝试并行运行它会得到一些非常奇怪的结果,因为并行读取会改变不同线程中使用的对象。 您需要代码来获取记录的副本以发送到并行循环。

但是,在这一点上,我喜欢做的是跳过记录的额外副本并直接进入强类型类。 更重要的是,我喜欢使用通用方法来做到这一点:

 IEnumerable GetData(Func factory, string sql, Action addParameters) { using (var cn = new SqlConnection("My connection string")) using (var cmd = new SqlCommand(sql, cn)) { addParameters(cmd.Parameters); cn.Open(); using (var rdr = cmd.ExecuteReader()) { while (rdr.Read()) { yield return factory(rdr); } } } } 

假设您的工厂方法按预期创建副本,此代码应该可以安全地在Parallel.ForEach循环中使用。 调用该方法看起来像这样(假设一个Employee类具有名为“Create”的静态工厂方法):

 var UnderPaid = GetData(Employee.Create, "SELECT * FROM Employee WHERE AnnualSalary <= @MinSalary", p => { p.Add("@MinSalary", SqlDbType.Int).Value = 50000; }); Parallel.ForEach(UnderPaid, e => e.GiveRaise()); 

重要更新:
我对此代码的信心并不像以前那么自信。 一个单独的线程仍然可以改变读者,而另一个线程正在进行复制。 我可以锁定它,但我也担心另一个线程可以在原始自己调用Read()之后但在开始复制之前调用更新读取器。 因此,这里的关键部分包含整个while循环……此时,您又回到了单线程。 我希望有一种方法可以修改此代码,以便按照multithreading方案的预期工作,但需要更多的研究。

你将很难直接替换while循环。 SqlDataReader不是线程安全类,因此您不能直接从多个线程使用它。

话虽这么说,您可以使用TPL 处理您读取的数据。 这里有几个选项。 最简单的方法是创建适用于阅读器的IEnumerable实现,并返回包含数据的类或结构。 然后,您可以使用PLINQ或Parallel.ForEach语句Parallel.ForEach处理数据:

 public IEnumerable ReadData() { using (SqlConnection conn = new SqlConnection("myConnString")) using (SqlCommand comm = new SqlCommand("myQuery", conn)) { conn.Open(); SqlDataReader reader = comm.ExecuteReader(); if (reader.HasRows) { while (reader.Read()) { yield return new MyDataClass(... data from reader ...); } } } } 

拥有该方法后,您可以通过PLINQ或TPL直接处理:

 Parallel.ForEach(this.ReadData(), data => { // Use the data here... }); 

要么:

 this.ReadData().AsParallel().ForAll(data => { // Use the data here... });