如何从文本文件中读取数百万行并快速插入表中

我已经通过插入200万行快速链接到SQL Server ,发现我可以通过使用批量插入来完成此操作。 所以我试图创建数据表(代码如下),但因为这是一个巨大的文件(超过300K行)我在我的代码中得到一个OutOfMemoryEexception

 string line; DataTable data = new DataTable(); string[] columns = null; bool isInserted = false; using (TextReader tr = new StreamReader(_fileName, Encoding.Default)) { if (columns == null) { line = tr.ReadLine(); columns = line.Split(','); } for (int iColCount = 0; iColCount < columns.Count(); iColCount++) { data.Columns.Add("Column" + iColCount, typeof(string)); } string[] columnVal; while ((line = tr.ReadLine()) != null) { columnVal = line.Split(','); // OutOfMemoryException throwing in this line data.Rows.Add(columnVal); } } 

经过长时间的工作,我修改了我的代码,如下所示,然后我在向数据表添加行时得到OutOfMemoryException

  DataTable data = new DataTable(); string[] columns = null; var line = string.Empty; using (TextReader tr = new StreamReader(_fileName, Encoding.Default)) { if (columns == null) { line = tr.ReadLine(); columns = line.Split(','); } for (int iColCount = 0; iColCount < columns.Count(); iColCount++) { data.Columns.Add("Column" + iColCount, typeof(string)); } } // Split the rows in 20000 rows in different list var _fileList = File.ReadLines(_fileName, Encoding.Default).ToList(); var splitChunks = new List<List>(); splitChunks = SplitFile(_fileList, 20000); Parallel.ForEach(splitChunks, lstChunks => { foreach (var rows in lstChunks) { string[] lineFields = rows.Split(','); DataRow row = datatbl.NewRow(); for (int iCount = 0; iCount < lineFields.Count(); iCount++) { row[iCount] = lineFields[iCount] == string.Empty ? "" : lineFields[iCount].ToString(); } datatbl.Rows.Add(row); } }); 

我可以像下面的代码那样进行下一级的批量插入:

 SqlConnection SqlConnectionObj = GetSQLConnection(); SqlBulkCopy bulkCopy = new SqlBulkCopy(SqlConnectionObj, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.UseInternalTransaction, null); bulkCopy.DestinationTableName = "TempTable"; bulkCopy.WriteToServer(data); 

文件包含以下类型的数据

4714,1370,AUSRICHTEN MASCHINELL

4870,1370,PLATTE STECKEN

0153,1900,CAULK GUN

0154,1900,NEW TERMINATOR

0360,1470,MU 186 MACCH。 X LAV。 S / A ASTE PS174

9113-H22,1970,MC DRILL BITS

代码需要将其转换为6行和3列。

有没有更快的方法来实现上述function来读取文件并为批量插入创建数据表? 所以我不应该从索引exception中获取内存。

提前致谢。

你得到OutOfMemoryException的原因是因为你正在创建一个内存数据表,并试图在其中插入300K行

这是要在内存中放入的大量数据。

你应该做的就是从文本文件中读取每一定数量的行 – 而不是这个 – 你需要将它插入到数据库中。

你是如何做的取决于你,你可以使用SQL或批量复制 – 但请记住,你不能读取整个文本文件并将其保存在内存中,所以要以块的forms进行。

使用SqlBulkCopy.WriteToServer和IDataReader的解决方案。 我正在使用CSV,但我希望很容易修改其他类型。 SqlBulkCopy只使用IDateReader 3件事,我们必须实现它们:

  • public int FieldCount {get; }
  • public bool Read()
  • public object GetValue(int i)

所有其他属性和方法都可以是未实现的。 关于SqlBulkCopy的有趣文章。 完整代码: https : //dotnetfiddle.net/giG3Ai 。 这是切割版本:

 namespace SqlBulkCopy { using System; using System.Collections.Generic; using System.IO; using System.Diagnostics; using System.Data; using System.Data.SqlClient; public class CsvReader : IDataReader { private readonly char CSV_DELIMITER = ','; private readonly StreamReader _sr; private readonly Dictionary> _csv2SqlType; private readonly string[] _headers; private string _line; private string[] _values; public int FieldCount { get { return _headers.Length; } } public CsvReader(string filePath, Dictionary> csvColumn2SqlTypeDict) { if (string.IsNullOrEmpty(filePath)) throw new ArgumentException("is null or empty", "filePath"); if (!System.IO.File.Exists(filePath)) throw new IOException(string.Format("{0} doesn't exist or access denied", filePath)); if (csvColumn2SqlTypeDict == null) throw new ArgumentNullException("csvColumn2SqlTypeDict"); _sr = new StreamReader(filePath); _csv2SqlType = csvColumn2SqlTypeDict; _headers = ReadHeaders(); ValidateHeaders(); } public object GetValue(int i) { // Get column value var colValue = _values[i]; // Get column name var colName = _headers[i]; // Try to convert to SQL type try { return _csv2SqlType[colName](colValue); } catch { return null; } } public bool Read() { if (_sr.EndOfStream) return false; _line = _sr.ReadLine(); _values = _line.Split(CSV_DELIMITER); // If row is invalid, go to next row if (_values.Length != _headers.Length) return Read(); return true; } public void Dispose() { _sr.Dispose(); } private void ValidateHeaders() { if (_headers.Length != _csv2SqlType.Keys.Count) throw new InvalidOperationException(string.Format("Read {0} columns, but csv2SqlTypeDict contains {1} columns", _headers.Length, _csv2SqlType.Keys)); foreach (var column in _headers) { if (!_csv2SqlType.ContainsKey(column)) throw new InvalidOperationException(string.Format("There is no convertor for column '{0}'", column)); } } private string[] ReadHeaders() { var headerLine = _sr.ReadLine(); if (string.IsNullOrEmpty(headerLine)) throw new InvalidDataException("There is no header in CSV!"); var headers = headerLine.Split(CSV_DELIMITER); if (headers.Length == 0) throw new InvalidDataException("There is no header in CSV after Split!"); return headers; } } public class Program { public static void Main(string[] args) { // Converter from CSV columns to SQL columns var csvColumn2SqlTypeDict = new Dictionary> { { "int", (s) => Convert.ToInt32(s) }, { "str", (s) => s }, { "double", (s) => Convert.ToDouble(s) }, { "date", (s) => Convert.ToDateTime(s) }, }; Stopwatch sw = Stopwatch.StartNew(); try { // example.csv /*** int,str,double,date 1,abcd,2.5,15.04.2002 2,dab,2.7,15.04.2007 3,daqqb,4.7,14.04.2007 ***/ using (var csvReader = new CsvReader("example.csv", csvColumn2SqlTypeDict)) { // TODO!!! Modify to your Connection string var cs = @"Server=localhost\SQLEXPRESS;initial catalog=TestDb;Integrated Security=true"; using (var loader = new SqlBulkCopy(cs, SqlBulkCopyOptions.Default)) { // TODO Modify to your Destination table loader.DestinationTableName = "Test"; // Write from csvReader to database loader.WriteToServer(csvReader); } } } catch(Exception ex) { Console.WriteLine("Got an exception: {0}", ex); Console.WriteLine("Press 'Enter' to quit"); Console.ReadLine(); return; } finally { sw.Stop(); } Console.WriteLine("Data has been written in {0}", sw.Elapsed); Console.WriteLine("Press 'Enter' to quit"); Console.ReadLine(); } private static void ShowCsv(IDataReader dr) { int i = 0; while (dr.Read()) { Console.WriteLine("Row# {0}", i); for (int j = 0; j < dr.FieldCount; j++) { Console.WriteLine("{0} => {1}", j, dr.GetValue(j)); } i++; } } } } 

我发现忘记一个DataTable并且逐行使用普通的旧SQLClient更快。 也更简单。 这也击败了流式SQL函数,这可能是在SQL Server数据库中获取数据的最快方法。

尝试并测量速度,看看它是否足够快。 如果不是,您可以随时尝试重新格式化文件(如有必要),让SQL Server使用其批量插入为您完成工作。