如何使用FileHelpers库将大型SQL Server表导出为CSV文件?

我想使用C#和FileHelpers库将大型SQL Server表导出为CSV文件。 我也可以考虑C#和bcp,但我认为FileHelpers比bcp更灵活。 速度不是特殊要求。 运行以下代码时,在storage.ExtractRecords()上抛出OutOfMemoryException storage.ExtractRecords()省略了一些不太重要的代码):

  SqlServerStorage storage = new SqlServerStorage(typeof(Order)); storage.ServerName = "SqlServer"; storage.DatabaseName = "SqlDataBase"; storage.SelectSql = "select * from Orders"; storage.FillRecordCallback = new FillRecordHandler(FillRecordOrder); Order[] output = null; output = storage.ExtractRecords() as Order[]; 

运行以下代码时, link.ExtractToFile()上会抛出“Timeout expired”:

  SqlServerStorage storage = new SqlServerStorage(typeof(Order)); string sqlConnectionString = "Server=SqlServer;Database=SqlDataBase;Trusted_Connection=True"; storage.ConnectionString = sqlConnectionString; storage.SelectSql = "select * from Orders"; storage.FillRecordCallback = new FillRecordHandler(FillRecordOrder); FileDataLink link = new FileDataLink(storage); link.FileHelperEngine.HeaderText = headerLine; link.ExtractToFile("file.csv"); 

SQL查询运行所需的时间超过默认值30秒,因此超时exception。 不幸的是,我在FileHelpers文档中找不到如何将SQL命令超时设置为更高的值。

我可以考虑在小数据集上循环SQL选择,直到导出整个表,但过程太复杂了。 有没有一种直接的方法在大型数据库表导出时使用FileHelpers?

FileHelpers有一个异步引擎 ,更适合处理大文件。 不幸的是, FileDataLink类没有使用它,因此没有简单的方法可以将它与SqlStorage一起使用。

修改SQL超时也不是很容易。 最简单的方法是复制SqlServerStorage的代码以创建自己的备用存储提供程序,并为ExecuteAndClose()ExecuteAndLeaveOpen()提供替换,以便在IDbCommand上设置超时。 ( SqlServerStorage是一个密封类,所以你不能只是它的子类)。

您可能想要查看ReactiveETL ,它使用FileHelpers异步引擎来处理文件,并使用ReactiveExtensions重写Ayende的RhinoETL来处理大型数据集。

Rei Sivan的答案是正确的,因为它可以很好地扩展大文件,因为它可以避免将整个表读入内存。 但是,可以清除代码。

shamp00的解决方案需要外部库。

这是一个更简单的表到CSV文件导出器,可以很好地扩展到大文件,并且不需要任何外部库:

 using System; using System.Collections.Generic; using System.Data; using System.Data.SqlClient; using System.IO; using System.Linq; public class TableDumper { public void DumpTableToFile(SqlConnection connection, string tableName, string destinationFile) { using (var command = new SqlCommand("select * from " + tableName, connection)) using (var reader = command.ExecuteReader()) using (var outFile = File.CreateText(destinationFile)) { string[] columnNames = GetColumnNames(reader).ToArray(); int numFields = columnNames.Length; outFile.WriteLine(string.Join(",", columnNames)); if (reader.HasRows) { while (reader.Read()) { string[] columnValues = Enumerable.Range(0, numFields) .Select(i => reader.GetValue(i).ToString()) .Select(field => string.Concat("\"", field.Replace("\"", "\"\""), "\"")) .ToArray(); outFile.WriteLine(string.Join(",", columnValues)); } } } } private IEnumerable GetColumnNames(IDataReader reader) { foreach (DataRow row in reader.GetSchemaTable().Rows) { yield return (string)row["ColumnName"]; } } } 

我写了这段代码,并将其声明为CC0(公共领域) 。

我合并了2上面的代码。 我用这个代码。 我使用VS 2010。

  //this is all lib that i used

using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Windows.Forms; using UsbLibrary; using System.Data; using System.Data.SqlClient; using System.Configuration; using System.Globalization; //cocy in a button

| SqlConnection _connection = new SqlConnection(); SqlDataAdapter _dataAdapter = new SqlDataAdapter(); SqlCommand _command = new SqlCommand(); DataTable _dataTable = new DataTable(); _connection = new SqlConnection(); _dataAdapter = new SqlDataAdapter(); _command = new SqlCommand(); _dataTable = new DataTable(); //dbk is my database name that you can change it to your database name _connection.ConnectionString = “Data Source=.;Initial Catalog=dbk;Integrated Security=True”; _connection.Open(); SaveFileDialog saveFileDialogCSV = new SaveFileDialog(); saveFileDialogCSV.InitialDirectory = Application.ExecutablePath.ToString(); saveFileDialogCSV.Filter = “CSV files (*.csv)|*.csv|All files (*.*)|*.*”; saveFileDialogCSV.FilterIndex = 1; saveFileDialogCSV.RestoreDirectory = true; string path_csv=””; if (saveFileDialogCSV.ShowDialog() == DialogResult.OK) { // Runs the export operation if the given filenam is valid. path_csv= saveFileDialogCSV.FileName.ToString(); } DumpTableToFile(_connection, “tbl_trmc”, path_csv); } //end of code in button

| public void DumpTableToFile(SqlConnection connection, string tableName, string destinationFile) { using (var command = new SqlCommand(“select * from ” + tableName, connection)) using (var reader = command.ExecuteReader()) using (var outFile = System.IO.File.CreateText(destinationFile)) { string[] columnNames = GetColumnNames(reader).ToArray(); int numFields = columnNames.Length; outFile.WriteLine(string.Join(“,”, columnNames)); if (reader.HasRows) { while (reader.Read()) { string[] columnValues = Enumerable.Range(0, numFields) .Select(i => reader.GetValue(i).ToString()) .Select(field => string.Concat(“\””, field.Replace(“\””, “\”\””), “\””)) .ToArray(); outFile.WriteLine(string.Join(“,”, columnValues)); } } } } private IEnumerable GetColumnNames(IDataReader reader) { foreach (DataRow row in reader.GetSchemaTable().Rows) { yield return (string)row[“ColumnName”]; } }

试试这个:

 private void exportToCSV() { //Asks the filenam with a SaveFileDialog control. SaveFileDialog saveFileDialogCSV = new SaveFileDialog(); saveFileDialogCSV.InitialDirectory = Application.ExecutablePath.ToString(); saveFileDialogCSV.Filter = "CSV files (*.csv)|*.csv|All files (*.*)|*.*"; saveFileDialogCSV.FilterIndex = 1; saveFileDialogCSV.RestoreDirectory = true; if (saveFileDialogCSV.ShowDialog() == DialogResult.OK) { // Runs the export operation if the given filenam is valid. exportToCSVfile(saveFileDialogCSV.FileName.ToString()); } } * Exports data to the CSV file. */ private void exportToCSVfile(string fileOut) { // Connects to the database, and makes the select command. string sqlQuery = "select * from dbo." + this.lbxTables.SelectedItem.ToString(); SqlCommand command = new SqlCommand(sqlQuery, objConnDB_Auto); // Creates a SqlDataReader instance to read data from the table. SqlDataReader dr = command.ExecuteReader(); // Retrives the schema of the table. DataTable dtSchema = dr.GetSchemaTable(); // Creates the CSV file as a stream, using the given encoding. StreamWriter sw = new StreamWriter(fileOut, false, this.encodingCSV); string strRow; // represents a full row // Writes the column headers if the user previously asked that. if (this.chkFirstRowColumnNames.Checked) { sw.WriteLine(columnNames(dtSchema, this.separator)); } // Reads the rows one by one from the SqlDataReader // transfers them to a string with the given separator character and // writes it to the file. while (dr.Read()) { strRow = ""; for (int i = 0; i < dr.FieldCount; i++) { switch (Convert.ToString(dr.GetFieldType(i))) { case "System.Int16": strRow += Convert.ToString(dr.GetInt16(i)); break; case "System.Int32" : strRow += Convert.ToString(dr.GetInt32(i)); break; case "System.Int64": strRow += Convert.ToString(dr.GetInt64(i)); break; case "System.Decimal": strRow += Convert.ToString(dr.GetDecimal(i)); break; case "System.Double": strRow += Convert.ToString(dr.GetDouble(i)); break; case "System.Float": strRow += Convert.ToString(dr.GetFloat(i)); break; case "System.Guid": strRow += Convert.ToString(dr.GetGuid(i)); break; case "System.String": strRow += dr.GetString(i); break; case "System.Boolean": strRow += Convert.ToString(dr.GetBoolean(i)); break; case "System.DateTime": strRow += Convert.ToString(dr.GetDateTime(i)); break; } if (i < dr.FieldCount - 1) { strRow += this.separator; } } sw.WriteLine(strRow); } // Closes the text stream and the database connenction. sw.Close(); dr.Close(); // Notifies the user. MessageBox.Show("ready"); } 

非常感谢Jay Sullivan的回答 – 对我很有帮助。

在此基础上,我观察到在他的解决方案中varbinary和字符串数据类型的字符串格式不好 – varbinary字段将按照字面意思"System.Byte"或类似的东西出现,而datetime字段将格式化为MM/dd/yyyy hh:mm:ss tt ,这对我来说是不可取的。

下面我是我的黑客攻击解决方案,它根据数据类型转换为字符串。 它使用嵌套的三元运算符,但它的工作原理!

希望它对某人有帮助。

 public static void DumpTableToFile(SqlConnection connection, Dictionary cArgs) { string query = "SELECT "; string z = ""; if (cArgs.TryGetValue("top_count", out z)) { query += string.Format("TOP {0} ", z); } query += string.Format("* FROM {0} (NOLOCK) ", cArgs["table"]); string lower_bound = "", upper_bound = "", column_name = ""; if (cArgs.TryGetValue("lower_bound", out lower_bound) && cArgs.TryGetValue("column_name", out column_name)) { query += string.Format("WHERE {0} >= {1} ", column_name, lower_bound); if (cArgs.TryGetValue("upper_bound", out upper_bound)) { query += string.Format("AND {0} < {1} ", column_name, upper_bound); } } Console.WriteLine(query); Console.WriteLine(""); using (var command = new SqlCommand(query, connection)) using (var reader = command.ExecuteReader()) using (var outFile = File.CreateText(cArgs["out_file"])) { string[] columnNames = GetColumnNames(reader).ToArray(); int numFields = columnNames.Length; Console.WriteLine(string.Join(",", columnNames)); Console.WriteLine(""); if (reader.HasRows) { Type datetime_type = Type.GetType("System.DateTime"); Type byte_arr_type = Type.GetType("System.Byte[]"); string format = "yyyy-MM-dd HH:mm:ss.fff"; int ii = 0; while (reader.Read()) { ii += 1; string[] columnValues = Enumerable.Range(0, numFields) .Select(i => reader.GetValue(i).GetType()==datetime_type?((DateTime) reader.GetValue(i)).ToString(format):(reader.GetValue(i).GetType() == byte_arr_type? String.Concat(Array.ConvertAll((byte[]) reader.GetValue(i), x => x.ToString("X2"))) :reader.GetValue(i).ToString())) ///.Select(field => string.Concat("\"", field.Replace("\"", "\"\""), "\"")) .Select(field => field.Replace("\t", " ")) .ToArray(); outFile.WriteLine(string.Join("\t", columnValues)); if (ii % 100000 == 0) { Console.WriteLine("row {0}", ii); } } } } } public static IEnumerable GetColumnNames(IDataReader reader) { foreach (DataRow row in reader.GetSchemaTable().Rows) { yield return (string)row["ColumnName"]; } }