在C#中的多个表中有效地将数据插入MySQL

我需要在mySQL数据库中将一个巨大的CSV文件插入到2个具有1:n关系的表中。

CSV文件每周发送一次,大约1GB,需要附加到现有数据。 其中每个表都有一个自动增量主键。

我试过了:

  • entity framework(占用所有方法的大部分时间)
  • 数据集(相同)
  • 批量上传(不支持多个表)
  • 带参数的MySqlCommand(需要嵌套,我当前的方法)
  • 带有StoredProcedure的MySqlCommand包括一个Transaction

还有什么建议?

让我们说简化这是我的数据结构:

public class User { public string FirstName { get; set; } public string LastName { get; set; } public List Codes { get; set; } } 

我需要从csv插入到这个数据库中:

  User (1-n) Code +---+-----+-----+ +---+---+-----+ |PID|FName|LName| |CID|PID|Code | +---+-----+-----+ +---+---+-----+ | 1 |Jon | Foo | | 1 | 1 | ed3 | | 2 |Max | Foo | | 2 | 1 | wst | | 3 |Paul | Foo | | 3 | 2 | xsd | +---+-----+-----+ +---+---+-----+ 

这是CSV文件的示例行

 Jon;Foo;ed3,wst 

LOAD DATA LOCAL INFILE这样的批量加载是不可能的,因为我有限制的写作权限

鉴于数据量很大,最好的方法(性能方面)是将尽可能多的数据处理留给数据库而不是应用程序。

创建一个临时表,暂时保存.csv文件中的数据。

 CREATE TABLE `imported` ( `id` int(11) NOT NULL, `firstname` varchar(45) DEFAULT NULL, `lastname` varchar(45) DEFAULT NULL, `codes` varchar(450) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; 

.csv的数据加载到此表非常简单。 我建议使用MySqlCommand (这也是你目前的方法)。 此外,对所有INSERT语句使用相同的 MySqlConnection对象将减少总执行时间

然后,为了进一步处理数据,您可以创建一个将处理它的存储过程。

假设这两个表(取自您的简化示例):

 CREATE TABLE `users` ( `PID` int(11) NOT NULL AUTO_INCREMENT, `FName` varchar(45) DEFAULT NULL, `LName` varchar(45) DEFAULT NULL, PRIMARY KEY (`PID`) ) ENGINE=InnoDB AUTO_INCREMENT=3737 DEFAULT CHARSET=utf8; 

 CREATE TABLE `codes` ( `CID` int(11) NOT NULL AUTO_INCREMENT, `PID` int(11) DEFAULT NULL, `code` varchar(45) DEFAULT NULL, PRIMARY KEY (`CID`) ) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8; 

您可以拥有以下存储过程。

 CREATE DEFINER=`root`@`localhost` PROCEDURE `import_data`() BEGIN DECLARE fname VARCHAR(255); DECLARE lname VARCHAR(255); DECLARE codesstr VARCHAR(255); DECLARE splitted_value VARCHAR(255); DECLARE done INT DEFAULT 0; DECLARE newid INT DEFAULT 0; DECLARE occurance INT DEFAULT 0; DECLARE i INT DEFAULT 0; DECLARE cur CURSOR FOR SELECT firstname,lastname,codes FROM imported; DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = 1; OPEN cur; import_loop: LOOP FETCH cur INTO fname, lname, codesstr; IF done = 1 THEN LEAVE import_loop; END IF; INSERT INTO users (FName,LName) VALUES (fname, lname); SET newid = LAST_INSERT_ID(); SET i=1; SET occurance = (SELECT LENGTH(codesstr) - LENGTH(REPLACE(codesstr, ',', '')) + 1); WHILE i <= occurance DO SET splitted_value = (SELECT REPLACE(SUBSTRING(SUBSTRING_INDEX(codesstr, ',', i), LENGTH(SUBSTRING_INDEX(codesstr, ',', i - 1)) + 1), ',', '')); INSERT INTO codes (PID, code) VALUES (newid, splitted_value); SET i = i + 1; END WHILE; END LOOP; CLOSE cur; END 

对于源数据中的每一行,它都为user表创建一个INSERT语句。 然后有一个WHILE循环来分割逗号分隔代码,并为每个codes表生成一个INSERT语句。

关于LAST_INSERT_ID()的使用,它在PER CONNECTION基础上是可靠的( 参见此处的doc )。 如果用于运行此存储过程的MySQL连接未被其他事务使用,则使用LAST_INSERT_ID()是安全的。

生成的ID在每个连接的基础上在服务器中维护。 这意味着函数返回给定客户端的值是为该客户端影响AUTO_INCREMENT列的最新语句生成的第一个AUTO_INCREMENT值。 其他客户端不会影响此值,即使它们生成自己的AUTO_INCREMENT值也是如此。 此行为可确保每个客户端都可以检索自己的ID,而无需考虑其他客户端的活动,也无需锁定或事务。

编辑 :这是OP的变体,省略了imported的临时表。 您可以调用SP直接将它们存储到数据库中,而不是将.csv中的数据插入到imported表中。

 CREATE DEFINER=`root`@`localhost` PROCEDURE `import_data`(IN fname VARCHAR(255), IN lname VARCHAR(255),IN codesstr VARCHAR(255)) BEGIN DECLARE splitted_value VARCHAR(255); DECLARE done INT DEFAULT 0; DECLARE newid INT DEFAULT 0; DECLARE occurance INT DEFAULT 0; DECLARE i INT DEFAULT 0; INSERT INTO users (FName,LName) VALUES (fname, lname); SET newid = LAST_INSERT_ID(); SET i=1; SET occurance = (SELECT LENGTH(codesstr) - LENGTH(REPLACE(codesstr, ',', '')) + 1); WHILE i <= occurance DO SET splitted_value = (SELECT REPLACE(SUBSTRING(SUBSTRING_INDEX(codesstr, ',', i), LENGTH(SUBSTRING_INDEX(codesstr, ',', i - 1)) + 1), ',', '')); INSERT INTO codes (PID, code) VALUES (newid, splitted_value); SET i = i + 1; END WHILE; END 

注意 :分割代码的代码取自此处 (MySQL不为字符串提供拆分function)。

参考你的答案我会替换

 using (MySqlCommand myCmdNested = new MySqlCommand(cCommand, mConnection)) { foreach (string Code in item.Codes) { myCmdNested.Parameters.Add(new MySqlParameter("@UserID", UID)); myCmdNested.Parameters.Add(new MySqlParameter("@Code", Code)); myCmdNested.ExecuteNonQuery(); } } 

 List lCodes = new List(); foreach (string code in item.Codes) { lCodes.Add(String.Format("('{0}','{1}')", UID, MySqlHelper.EscapeString(code))); } string cCommand = "INSERT INTO Code (UserID, Code) VALUES " + string.Join(",", lCodes); using (MySqlCommand myCmdNested = new MySqlCommand(cCommand, mConnection)) { myCmdNested.ExecuteNonQuery(); } 

生成一个insert语句而不是item.Count

我使用Entity Framework开发了我的WPF应用程序应用程序,并使用了SQL服务器数据库,需要从excel文件中读取数据,并且必须将这些数据插入到两个表之间,这两个表之间存在关系。 对于excel中的大约15000行,过去需要大约4个小时的时间。 然后我做的是我使用了每个插入500行的块,这加快了我对unbelievalbe的快速插入速度,现在导入相同的数据只需要3-5秒。

因此,我建议您一次将行添加到类似100/200/500的上下文中,然后调用SaveChanges方法(如果您确实想要使用EF)。 还有其他有用的提示,以加快EF的性能。 请阅读此内容供您参考。

 var totalRecords = TestPacksData.Rows.Count; var totalPages = (totalRecords / ImportRecordsPerPage) + 1; while (count <= totalPages) { var pageWiseRecords = TestPacksData.Rows.Cast().Skip(count * ImportRecordsPerPage).Take(ImportRecordsPerPage); count++; Project.CreateNewSheet(pageWiseRecords.ToList()); Project.CreateNewSpool(pageWiseRecords.ToList()); } 

这是CreateNewSheet方法

 ///  /// Creates a new Sheet record in the database ///  /// DataRow containing the Sheet record public void CreateNewSheet(List rows) { var tempSheetsList = new List(); foreach (var row in rows) { var sheetNo = row[SheetFields.Sheet_No.ToString()].ToString(); if (string.IsNullOrWhiteSpace(sheetNo)) continue; var testPackNo = row[SheetFields.Test_Pack_No.ToString()].ToString(); TestPack testPack = null; if (!string.IsNullOrWhiteSpace(testPackNo)) testPack = GetTestPackByTestPackNo(testPackNo); var existingSheet = GetSheetBySheetNo(sheetNo); if (existingSheet != null) { UpdateSheet(existingSheet, row); continue; } var isometricNo = GetIsometricNoFromSheetNo(sheetNo); var newSheet = new Sheet { sheet_no = sheetNo, isometric_no = isometricNo, ped_rev = row[SheetFields.PED_Rev.ToString()].ToString(), gpc_rev = row[SheetFields.GPC_Rev.ToString()].ToString() }; if (testPack != null) { newSheet.test_pack_id = testPack.id; newSheet.test_pack_no = testPack.test_pack_no; } if (!tempSheetsList.Any(l => l.sheet_no == newSheet.sheet_no)) { DataStore.Context.Sheets.Add(newSheet); tempSheetsList.Add(newSheet); } } try { DataStore.Context.SaveChanges(); **DataStore.Dispose();** This is very important. Dispose the context } catch (DbEntityValidationException ex) { // Create log for the exception here } } 

除了字段名称和表名,CreateNewSpool与ditto相同,因为它更新了子表。 但这个想法是一样的

1 – 将VirtualId列添加到User表和类。

编辑 2 – 为每个User对象中的VirtualId循环分配数字(使用负数开始-1以避免最后一步中的冲突)字段。 对于属于User u对象的每个Code c对象,设置c.UserId = u.VirtualId

3 – 批量加载用户到User表,批量加载代码到Code表。

4- UPDATE CODE C,USER U SET C.UserId = U.Id WHERE C.UserId = U.VirtualId.

注意:如果您在Code.UserId上有FK约束,则可以删除它并在插入后重新添加它。

 public class User { public int Id { get; set; } public string FirstName { get; set; } public string LastName { get; set; } public int VirtualId { get; set; } } public class Code { public int Id { get; set; } public string Code { get; set; } public string UserId { get; set; } } 

你能把CSV分成两个文件吗?

例如假设您的文件包含以下列:

 ... A ... | ... B ... a0 | b0 a0 | b1 a0 | b2 <-- data a1 | b3 a1 | b4 

因此,一组A可能有多个B条目。 在你分开之后,你得到:

 ... A ... a0 a1 ... B ... b0 b1 b2 b3 b4 

然后你单独批量插入它们。

编辑:伪代码

基于对话,类似于:

 DataTable tableA = ...; // query schema for TableA DataTable tableB = ...; // query schmea for TableB List usernames = select distinct username from TableA; Hashtable htUsername = new Hashtable(StringComparer.InvariantCultureIgnoreCase); foreach (String username in usernames) htUsername[username] = ""; int colUsername = ...; foreach (String[] row in CSVFile) { String un = row[colUsername] as String; if (htUsername[un] == null) { // add new row to tableA DataRow row = tableA.NewRow(); row["Username"] = un; // etc. tableA.Rows.Add(row); htUsername[un] = ""; } } // bulk insert TableA select userid, username from TableA Hashtable htUserId = new Hashtable(StringComparer.InvariantCultureIgnoreCase); // htUserId[username] = userid; int colUserId = ...; foreach (String[] row in CSVFile) { String un = row[colUsername] as String; int userid = (int) htUserId[un]; DataRow row = tableB.NewRow(); row[colUserId] = userId; // fill in other values tableB.Rows.Add(row); if (table.Rows.Count == 65000) { // bulk insert TableB var t = tableB.Clone(); tableB.Dispose(); tableB = t; } } if (tableB.Rows.Count > 0) // bulk insert TableB 

AFAIK在表中完成的插入是顺序的,而不同表中的插入可以并行完成。 打开两个单独的连接到同一个数据库,然后可以使用任务并行库并行插入。

但是,如果表之间存在关于1:n关系的完整性约束,则:

  1. 插入可能会失败,因此任何并行插入方法都是错误的。 显然,你最好的选择是只进行顺序插入,一个接着一个表。
  2. 您可以尝试对两个表的数据进行排序,编写下面编写的InsertInto方法,这样只有在第一个表中插入数据后才能在第二个表中插入。

编辑:由于您已经请求,如果您可以并行执行插入,以下是您可以使用的代码模板

 private void ParallelInserts() { .. //Other code in the method .. //Read first csv into memory. It's just a GB so should be fine ReadFirstCSV(); //Read second csv into memory... ReadSecondCSV(); //Because the inserts will last more than a few CPU cycles... var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None) //An array to hold the two parallel inserts var insertTasks = new Task[2]; //Begin insert into first table... insertTasks[0] = taskFactory.StartNew(() => InsertInto(commandStringFirst, connectionStringFirst)); //Begin insert into second table... insertTasks[1] = taskFactory.StartNew(() => InsertInto(commandStringSecond, connectionStringSecond)); //Let them be done... Task.WaitAll(insertTasks); Console.WriteLine("Parallel insert finished."); } //Defining the InsertInto method which we are passing to the tasks in the method above private static void InsertInto(string commandString, string connectionString) { using (/*open a new connection using the connectionString passed*/) { //In a while loop, iterate until you have 100/200/500 rows while (fileIsNotExhausted) { using (/*commandString*/) { //Execute command to insert in bulk } } } } 

当你说“有效”时,你在谈论记忆还是时间?

在提高插入速度方面,如果每个插入语句可以执行多个值块,则速度可以提高500%。 我在这个问题上对此做了一些基准: 哪个更快:多个单个INSERT还是一个多行INSERT?

我在答案中描述了我的方法,但简单地说,一次读取最多50个“行”(要插入)并将它们捆绑成一个INSERT INTO(...), VALUES(...),(...),(...)...(...),(...)类型语句似乎真的加快了速度。 至少,如果您被限制无法进行批量加载。

如果你有上传期间无法删除索引的实时数据,另一种方法是在没有索引的mysql服务器上创建一个内存表,将数据转储到那里,然后执行INSERT INTO live SELECT * FROM mem 。 虽然它在服务器上使用了更多的内存,但是在这个答案开头的问题是“你有效地’是什么意思’?” 🙂

哦,迭代文件并首先执行所有第一个表插入,然后再执行第二个表,可能没有错。 我想,除非现场使用数据。 在这种情况下,您绝对可以使用捆绑方法,但执行此操作的应用程序逻辑要复杂得多。

更新: OP请求多值插入块的示例C#代码。

注意:此代码假定您已经配置了许多结构:

  1. tables List – 要插入的表名
  2. fieldslist Dictionary > – 每个表的字段名称列表
  3. typeslist Dictionary > – 每个表的MySqlDbType列表,与字段名称的顺序相同。
  4. nullslist Dictionary > – 每个表(与字段名称相同的顺序)判断字段是否可为空的标志列表。
  5. prikey Dictionary – 每个表的主键字段名称列表(注意:这不支持多个字段主键,但如果你需要它,你可能会破解它 – 我觉得某个地方我有一个版本,确实支持这个,但是…… meh)。
  6. theData Dictionary >> – 实际数据,作为每个表的fieldnum-value字典列表。

哦是的,localcommand是通过在本地MySqlConnection对象上使用CreateCommand()创建的MySqlCommand。

进一步说明:在我开始的时候,我写了很长一段时间。 如果这导致你的眼睛或大脑出血,我提前道歉:)

 const int perinsert = 50; foreach (string table in tables) { string[] fields = fieldslist[table].ToArray(); MySqlDbType[] types = typeslist[table].ToArray(); bool[] nulls = nullslist[table].ToArray(); int thisblock = perinsert; int rowstotal = theData[table].Count; int rowsremainder = rowstotal % perinsert; int rowscopied = 0; // Do the bulk (multi-VALUES block) INSERTs, but only if we have more rows than there are in a single bulk insert to perform: while (rowscopied < rowstotal) { if (rowstotal - rowscopied < perinsert) thisblock = rowstotal - rowscopied; // Generate a 'perquery' multi-VALUES prepared INSERT statement: List extravals = new List(); for (int j = 0; j < thisblock; j++) extravals.Add(String.Format("(@{0}_{1})", j, String.Join(String.Format(", @{0}_", j), fields))); localcmd.CommandText = String.Format("INSERT INTO {0} VALUES{1}", tmptable, String.Join(",", extravals.ToArray())); // Now create the parameters to match these: for (int j = 0; j < thisblock; j++) for (int i = 0; i < fields.Length; i++) localcmd.Parameters.Add(String.Format("{0}_{1}", j, fields[i]), types[i]).IsNullable = nulls[i]; // Keep doing bulk INSERTs until there's less rows left than we need for another one: while (rowstotal - rowscopied >= thisblock) { // Queue up all the VALUES for this block INSERT: for (int j = 0; j < thisblock; j++) { Dictionary row = theData[table][rowscopied++]; for (int i = 0; i < fields.Length; i++) localcmd.Parameters[String.Format("{0}_{1}", j, fields[i])].Value = row[i]; } // Run the query: localcmd.ExecuteNonQuery(); } // Clear all the paramters - we're done here: localcmd.Parameters.Clear(); } }