如何在尽可能短的时间内插入1000万条记录?

我有一个文件(有1000万条记录),如下所示:

line1 line2 line3 line4 ....... ...... 10 million lines 

所以基本上我想在数据库中插入1000万条记录。 所以我读了文件并将其上传到SQL Server。

C#代码

 System.IO.StreamReader file = new System.IO.StreamReader(@"c:\test.txt"); while((line = file.ReadLine()) != null) { // insertion code goes here //DAL.ExecuteSql("insert into table1 values("+line+")"); } file.Close(); 

但插入需要很长时间。 如何使用C#在尽可能短的时间内插入1000万条记录?

更新1:
批量插入:

 BULK INSERT DBNAME.dbo.DATAs FROM 'F:\dt10000000\dt10000000.txt' WITH ( ROWTERMINATOR =' \n' ); 

我的表如下:

 DATAs ( DatasField VARCHAR(MAX) ) 

但我收到以下错误:

Msg 4866,Level 16,State 1,Line 1
批量加载失败。 第1行第1列的数据文件中的列太长。validation是否正确指定了字段终止符和行终止符。

Msg 7399,Level 1,State 1,Line 1
链接服务器“(null)”的OLE DB提供程序“BULK”报告错误。 提供商未提供有关错误的任何信息。

Msg 7330,Level 16,State 2,Line 1
无法从OLE DB提供程序“BULK”获取链接服务器“(null)”的行。

下面的代码工作:

 BULK INSERT DBNAME.dbo.DATAs FROM 'F:\dt10000000\dt10000000.txt' WITH ( FIELDTERMINATOR = '\t', ROWTERMINATOR = '\n' ); 

请不要创建通过BulkCopy加载的DataTable 。 对于较小的数据集,这是一个很好的解决方案,但在调用数据库之前,绝对没有理由将所有1000万行加载到内存中。

您最好的选择(在BCP / BULK INSERT / OPENROWSET(BULK...) )是通过表值参数(TVP)将文件中的内容流式传输到数据库中。 通过使用TVP,您可以打开文件,读取行并发送一行直到完成,然后关闭文件。 此方法的内存占用量仅为一行。 我写了一篇文章,将数据从一个应用程序流式传输到SQL Server 2008 ,这个例子就是这个场景。

结构的简单概述如下。 我假设相同的导入表和字段名称,如上面的问题所示。

必需的数据库对象

 -- First: You need a User-Defined Table Type CREATE TYPE ImportStructure AS TABLE (Field VARCHAR(MAX)); GO -- Second: Use the UDTT as an input param to an import proc. -- Hence "Tabled-Valued Parameter" (TVP) CREATE PROCEDURE dbo.ImportData ( @ImportTable dbo.ImportStructure READONLY ) AS SET NOCOUNT ON; -- maybe clear out the table first? TRUNCATE TABLE dbo.DATAs; INSERT INTO dbo.DATAs (DatasField) SELECT Field FROM @ImportTable; GO 

使用上述SQL对象的C#app代码如下。 注意如何而不是填充对象(例如DataTable)然后执行存储过程,在此方法中,执行存储过程以启动文件内容的读取。 存储过程的输入参数不是变量; 它是方法GetFileContents的返回值。 当SqlCommand调用ExecuteNonQuery时,会调用该方法,该操作会打开文件,读取行并通过IEnumerable将行发送到SQL Server并生成yield return构造,然后关闭文件。 存储过程只看到一个表变量@ImportTable,一旦数据重新开始就可以访问( 注意:数据确实会持续很短的时间,即使不是tempdb中的完整内容 )。

 using System.Collections; using System.Data; using System.Data.SqlClient; using System.IO; using Microsoft.SqlServer.Server; private static IEnumerable GetFileContents() { SqlMetaData[] _TvpSchema = new SqlMetaData[] { new SqlMetaData("Field", SqlDbType.VarChar, SqlMetaData.Max) }; SqlDataRecord _DataRecord = new SqlDataRecord(_TvpSchema); StreamReader _FileReader = null; try { _FileReader = new StreamReader("{filePath}"); // read a row, send a row while (!_FileReader.EndOfStream) { // You shouldn't need to call "_DataRecord = new SqlDataRecord" as // SQL Server already received the row when "yield return" was called. // Unlike BCP and BULK INSERT, you have the option here to create a string // call ReadLine() into the string, do manipulation(s) / validation(s) on // the string, then pass that string into SetString() or discard if invalid. _DataRecord.SetString(0, _FileReader.ReadLine()); yield return _DataRecord; } } finally { _FileReader.Close(); } } 

上面的GetFileContents方法用作存储过程的输入参数值,如下所示:

 public static void test() { SqlConnection _Connection = new SqlConnection("{connection string}"); SqlCommand _Command = new SqlCommand("ImportData", _Connection); _Command.CommandType = CommandType.StoredProcedure; SqlParameter _TVParam = new SqlParameter(); _TVParam.ParameterName = "@ImportTable"; _TVParam.TypeName = "dbo.ImportStructure"; _TVParam.SqlDbType = SqlDbType.Structured; _TVParam.Value = GetFileContents(); // return value of the method is streamed data _Command.Parameters.Add(_TVParam); try { _Connection.Open(); _Command.ExecuteNonQuery(); } finally { _Connection.Close(); } return; } 

补充说明:

  1. 通过一些修改,上面的C#代码可以适用于批量处理数据。
  2. 通过稍加修改,上述C#代码可以适用于发送多个字段(“Steaming Data …”文章中显示的示例链接在上面的2个字段中)。
  3. 您还可以在proc中的SELECT语句中操作每个记录的值。
  4. 您还可以使用proc中的WHERE条件过滤掉行。
  5. 您可以多次访问TVP表变量; 它是READONLY但不是“仅向前”。
  6. 优于SqlBulkCopy优点:
    1. SqlBulkCopy仅限INSERT,而使用TVP允许以任何方式使用数据:您可以调用MERGE ; 你可以根据某些条件DELETE ; 您可以将数据拆分为多个表; 等等。
    2. 由于TVP不是INSERT,因此您不需要单独的临时表来转储数据。
    3. 您可以通过调用ExecuteReader而不是ExecuteNonQuery从数据库中获取数据。 例如,如果DATAs导入表上有IDENTITY字段,则可以向INSERT添加OUTPUT子句以传回INSERTED.[ID] (假设IDIDENTITY字段的名称)。 或者您可以传回完全不同的查询结果,或两者都可以,因为可以通过Reader.NextResult()发送和访问多个结果集。 使用SqlBulkCopy时无法从数据库中获取信息,但是在这里有几个问题想要做到这一点的人(至少关于新创建的IDENTITY值)。
    4. 有关为什么有时整个过程更快的信息,即使从磁盘获取数据到SQL Server的速度稍慢,请参阅SQL Server客户咨询团队的白皮书: 使用TVP最大化吞吐量

在C#中,最好的解决方案是让SqlBulkCopy读取文件。 为此,您需要将IDataReader直接传递给SqlBulkCopy.WriteToServer方法。 这是一个例子: http : //www.codeproject.com/Articles/228332/IDataReader-implementation-plus-SqlBulkCopy

最好的方法是在第一个解决方案和第二个解决方案之间进行混合,创建DataTable并在循环中向其添加行,然后使用BulkCopy在一个连接中上传到数据库, 在批量复制时使用此帮助

还有一件事需要注意,批量复制是一个非常敏感的操作,几乎每个错误都会使副本无效,例如,如果你将dataTable中的列名称声明为“text”,而在DB中声明它的“Text”它会引发exception, 祝好运。

如果要在最短的时间内插入1000万条记录以指示使用SQL查询进行测试,则应使用此查询

  CREATE TABLE TestData(ID INT IDENTITY (1,1), CreatedDate DATETIME) GO INSERT INTO TestData(CreatedDate) SELECT GetDate() GO 10000000