并行不适用于Entity Framework

我有一个ID列表,我需要在每个ID上运行几个存储过程。

当我使用标准的foreach循环时,它工作正常,但是当我有很多记录时,它的工作速度很慢。

我想将代码转换为与EF一起使用,但我得到一个例外:“底层提供程序在Open上失败”。

我在Parallel.ForEach中使用此代码:

using (XmlEntities osContext = new XmlEntities()) { //The code } 

但它仍然抛出exception。

任何想法我如何使用与EF并行? 我是否需要为我正在运行的每个程序创建一个新的上下文? 我有大约10个程序,所以我认为创建10个上下文非常糟糕,每个上下文一个。

entity framework使用的基础数据库连接不是线程安全的 。 您需要为要执行的另一个线程上的每个操作创建一个新上下文。

您对如何并行化操作的关注是有效的; 打开和关闭的许多背景都很昂贵。

相反,您可能想要反思您对并行化代码的想法。 看起来你正在循环遍历许多项目,然后为每个项目串行调用存储过程。

如果可以的话,为每个过程创建一个新的Task (或Task ,如果你不需要结果),然后在那个Task ,打开一个上下文,循环遍历所有项目,然后执行存储过程。 这样,您只有许多上下文等于并行运行的存储过程的数量。

假设你有一个带有两个存储过程的MyDbContextDoSomething1DoSomething2 ,它们都是一个类MyItem的实例。

实现上述内容将类似于:

 // You'd probably want to materialize this into an IList to avoid // warnings about multiple iterations of an IEnumerable. // You definitely *don't* want this to be an IQueryable // returned from a context. IEnumerable items = ...; // The first stored procedure is called here. Task t1 = Task.Run(() => { // Create the context. using (var ctx = new MyDbContext()) // Cycle through each item. foreach (MyItem item in items) { // Call the first stored procedure. // You'd of course, have to do something with item here. ctx.DoSomething1(item); } }); // The second stored procedure is called here. Task t2 = Task.Run(() => { // Create the context. using (var ctx = new MyDbContext()) // Cycle through each item. foreach (MyItem item in items) { // Call the first stored procedure. // You'd of course, have to do something with item here. ctx.DoSomething2(item); } }); // Do something when both of the tasks are done. 

如果你不能并行执行存储过程(每个存储过程都依赖于按特定顺序运行),那么你仍然可以并行化你的操作,它只是稍微复杂一些。

您将看到在项目中创建自定义分区 (使用Partitioner类上的静态Create方法 )。 这将为您提供获取IEnumerator实现的方法(注意,这不是 IEnumerable因此您无法foreach它)。

对于你回来的每个IEnumerator实例,你将创建一个新的Task (如果你需要一个结果),并在Task主体中,你将创建上下文然后循环返回的项目通过IEnumerator ,按顺序调用存储过程。

这看起来像这样:

 // Get the partitioner. OrdinalPartitioner partitioner = Partitioner.Create(items); // Get the partitions. // You'll have to set the parameter for the number of partitions here. // See the link for creating custom partitions for more // creation strategies. IList> paritions = partitioner.GetPartitions( Environment.ProcessorCount); // Create a task for each partition. Task[] tasks = partitions.Select(p => Task.Run(() => { // Create the context. using (var ctx = new MyDbContext()) // Remember, the IEnumerator implementation // might implement IDisposable. using (p) // While there are items in p. while (p.MoveNext()) { // Get the current item. MyItem current = p.Current; // Call the stored procedures. Process the item ctx.DoSomething1(current); ctx.DoSomething2(current); } })). // ToArray is needed (or something to materialize the list) to // avoid deferred execution. ToArray(); 

EF不是线程安全的,因此您不能使用Parallel。

看一下Entity Framework和Multi threading

和这篇文章 。

这是我使用和工作得很好的。 它还支持处理错误exception,并具有调试模式,可以更容易地跟踪事物

 public static ConcurrentQueue Parallel(this IEnumerable items, Action action, int? parallelCount = null, bool debugMode = false) { var exceptions = new ConcurrentQueue(); if (debugMode) { foreach (var item in items) { try { action(item); } // Store the exception and continue with the loop. catch (Exception e) { exceptions.Enqueue(e); } } } else { var partitions = Partitioner.Create(items).GetPartitions(parallelCount ?? Environment.ProcessorCount).Select(partition => Task.Factory.StartNew(() => { while (partition.MoveNext()) { try { action(partition.Current); } // Store the exception and continue with the loop. catch (Exception e) { exceptions.Enqueue(e); } } })); Task.WaitAll(partitions.ToArray()); } return exceptions; } 

您可以像下面那样使用它,因为db是原始DbContext,db.CreateInstance()使用相同的连接字符串创建新实例。

  var batch = db.Set().ToList(); var exceptions = batch.Parallel((item) => { using (var batchDb = db.CreateInstance()) { var batchTime = batchDb.GetDBTime(); var someData = batchDb.Set().Where(x=>x.ID = item.ID).ToList(); //do stuff to someData item.WasMigrated = true; //note that this record is attached to db not batchDb and will only be saved when db.SaveChanges() is called batchDb.SaveChanges(); } }); if (exceptions.Count > 0) { logger.Error("ContactRecordMigration : Content: Error processing one or more records", new AggregateException(exceptions)); throw new AggregateException(exceptions); //optionally throw an exception } db.SaveChanges(); //save the item modifications 

在不知道内部exception结果是什么(如果有的话)的情况下对这个进行故障排除有点困难。 这可能只是设置连接字符串或提供程序配置的方式的问题。

通常,您必须小心并行代码和EF。 但是,你正在做什么 – 应该工作。 我心中有一个问题; 是否在并行之前在该上下文的另一个实例上完成了任何工作? 根据你的post,你在每个post中都有一个单独的上下文。 非常好。 然而,我的一部分想知道在多个上下文之间是否存在一些有趣的构造函数争用。 如果在并行调用之前没有在任何地方使用该上下文,我建议尝试对上下文运行一个简单的查询来打开它,并确保在运行并行方法之前激活所有EF位。 我承认,我还没有尝试过你在这里所做的事,但我已经做得非常接近并且有效。