线程和SqlFileStream。 该进程无法访问指定的文件,因为它已在另一个事务中打开

我正在提取SQL文件表中的文件的内容。 如果我不使用Parallel,以下代码可以正常工作。

我同时读取sql文件流(并行)时遇到以下exception。

该进程无法访问指定的文件,因为它已在另一个事务中打开。

TL; DR:

在Parallel.ForEach中从FileTable读取文件(使用GET_FILESTREAM_TRANSACTION_CONTEXT)时,我得到上述exception。

示例代码供您试用:

https://gist.github.com/NerdPad/6d9b399f2f5f5e5c6519

更长版本:

获取附件,并提取内容:

var documents = new List(); using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled)) { var attachments = await dao.GetAttachmentsAsync(); // Extract the content simultaneously // documents = attachments.ToDbDocuments().ToList(); // This works Parallel.ForEach(attachments, a => documents.Add(a.ToDbDocument())); // this doesn't ts.Complete(); } 

DAO读取文件表:

 public async Task<IEnumerable> GetAttachmentsAsync() { try { var commandStr = "...."; IEnumerable attachments = null; using (var connection = new SqlConnection(this.DatabaseContext.Database.Connection.ConnectionString)) using (var command = new SqlCommand(commandStr, connection)) { connection.Open(); using (var reader = await command.ExecuteReaderAsync()) { attachments = reader.ToSearchAttachments().ToList(); } } return attachments; } catch (System.Exception) { throw; } } 

为每个文件创建对象:该对象包含对GET_FILESTREAM_TRANSACTION_CONTEXT的引用

 public static IEnumerable ToSearchAttachments(this SqlDataReader reader) { if (!reader.HasRows) { yield break; } // Convert each row to SearchAttachment while (reader.Read()) { yield return new SearchAttachment { ... ... UNCPath = reader.To(Constants.UNCPath), ContentStream = reader.To(Constants.Stream) // GET_FILESTREAM_TRANSACTION_CONTEXT() ... ... }; } } 

使用SqlFileStream读取文件: 此处抛出exception

 public static ExtractedContent ToDbDocument(this SearchAttachment attachment) { // Read the file // Exception is thrown here using (var stream = new SqlFileStream(attachment.UNCPath, attachment.ContentStream, FileAccess.Read, FileOptions.SequentialScan, 4096)) { ... // extract content from the file } .... } 

更新1:

根据这篇文章,它似乎可能是一个隔离级问题。 有没有人遇到过类似的问题?

事务不会流入Parallel.ForEach ,您必须手动启用事务。

 //Switched to a thread safe collection. var documents = new ConcurrentQueue(); using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled)) { var attachments = await dao.GetAttachmentsAsync(); //Grab a reference to the current transaction. var transaction = Transaction.Current; Parallel.ForEach(attachments, a => { //Spawn a dependant clone of the transaction using (var depTs = transaction.DependentClone(DependentCloneOption.RollbackIfNotComplete)) { documents.Enqueue(a.ToDbDocument()); depTs.Complete(); } }); ts.Complete(); } 

我还从List切换到ConcurrentQueue因为不允许调用.Add(在多个线程的列表中)。