如何使用C#更快地从Oracle向Elasticsearch表中插入400万条记录?

我有以下用C#编写的代码,但据此,将数据从Oracle数据库迁移到Elasticsearch需要4-5天。 我是以100个批次插入记录。还有其他方式可以更快地移动400万条记录(如果可能的话,可能在不到一天的时间内)吗?

public static void Selection() { for(int i = 1; i < 4000000; i += 1000) { for(int j = i; j < (i+1000); j += 100) { OracleCommand cmd = new OracleCommand(BuildQuery(j), oracle_connection); OracleDataReader reader = cmd.ExecuteReader(); List list=CreateRecordList(reader); insert(list); } } } private static List CreateRecordList(OracleDataReader reader) { List l = new List(); string[] str = new string[7]; try { while (reader.Read()) { for (int i = 0; i < 7; i++) { str[i] = reader[i].ToString(); } Record r = new Record(str[0], str[1], str[2], str[3], str[4], str[5], str[6]); l.Add(r); } } catch (Exception er) { string msg = er.Message; } return l; } private static string BuildQuery(int from) { int to = from + change - 1; StringBuilder builder = new StringBuilder(); builder.AppendLine(@"select * from"); builder.AppendLine("("); builder.AppendLine("select FIELD_1, FIELD_2, FIELD_3, FIELD_4, FIELD_5, FIELD_6, FIELD_7, "); builder.Append(" row_number() over(order by FIELD_1) rn"); builder.AppendLine(" from tablename"); builder.AppendLine(")"); builder.AppendLine(string.Format("where rn between {0} and {1}", from, to)); builder.AppendLine("order by rn"); return builder.ToString(); } public static void insert(List l) { try { foreach(Record r in l) client.Index(r, "index", "type"); } catch (Exception er) { string msg = er.Message; } } 

ROW_NUMBER()函数会对性能产生负面影响,并且您运行了数千次。 您已经在使用OracleDataReader – 它不会同时将所有四百万行拉到您的计算机上,它基本上只是一次或多次流式传输它们。

这必须在几分钟或几小时内完成,而不是几天 – 我们有几个进程以类似的方式在Sybase和SQL服务器之间移动数百万条记录,并且只需不到五分钟。

也许给这个镜头:

 OracleCommand cmd = new OracleCommand("SELECT ... FROM TableName", oracle_connection); int batchSize = 500; using (OracleDataReader reader = cmd.ExecuteReader()) { List l = new List(batchSize); string[] str = new string[7]; int currentRow = 0; while (reader.Read()) { for (int i = 0; i < 7; i++) { str[i] = reader[i].ToString(); } l.Add(new Record(str[0], str[1], str[2], str[3], str[4], str[5], str[6])); // Commit every time batchSize records have been read if (++currentRow == batchSize) { Commit(l); l.Clear(); currentRow = 0; } } // commit remaining records Commit(l); } 

以下是Commit样子:

 public void Commit(IEnumerable records) { // TODO: Use ES's BULK features, I don't know the exact syntax client.IndexMany(records, "index", "type"); // client.Bulk(b => b.IndexMany(records))... something like this } 

但是你没有分批插入100个
最后,您一次插入一个
(甚至可能不是插入一个的正确代码)

 foreach(Record r in l) client.Index(r, "index", "type"); 

如果插入一次只有一行,所有读取的girations都不会做任何事情
当你得到下一批时,你只是引入滞后
读取(几乎)总是比写入更快

 OracleCommand cmd = new OracleCommand(BuildQuery(all), oracle_connection); OracleDataReader reader = cmd.ExecuteReader(); while (reader.Read()) { client.Index(new Record(reader.GetSting(0), reader.GetSting(1), reader.GetSting(2), reader.GetSting(3), reader.GetSting(4), reader.GetSting(5), reader.GetSting(6), "index", "type"); } reader.Close(); 

如果要并行读写,可以使用BlockingCollection
但是使用最大尺寸来读取并不会在写入之前走得太远