如何将列添加到DataReader

我的目标是从数据源检索数据,向其添加一些元数据并将其插入另一个目标。

目标具有架构,其中包含四列,然后是源(计算列)。

我正在使用SqlBulkCopy ,它需要一个包含所有列的阅读器(包括计算出的4个)。

有没有办法手动将列添加到DataReader? 或者如果不可能有什么替代方案来插入数据?

我需要这样做,并且还能够根据其他列创建列,并使列的值取决于阅读器的行索引。 这个类从Dapper实现IWrappedReader ,如果这对你有用(这对我来说)。 该类未完成,因为我没有实现所有IDataRecord字段,但您可以查看我如何使用IDataRecord.GetInt32来查看简单模式。

 ///  public class WrappedDataReader : IDataReader { private readonly IList _additionalFields; private readonly int _originalOrdinalCount; private IDbCommand _cmd; private int _currentRowIndex = -1; //The first Read() will make this 0 private IDataReader _underlyingReader; public WrappedDataReader(IDataReader underlyingReader, IList additionalFields) { _additionalFields = additionalFields; _underlyingReader = underlyingReader; var schema = Reader.GetSchemaTable(); if (schema == null) { throw new ObjectDisposedException(GetType().Name); } _originalOrdinalCount = schema.Rows.Count; } public object this[int i] { get { throw new NotImplementedException(); } } public IDataReader Reader { get { if (_underlyingReader == null) { throw new ObjectDisposedException(GetType().Name); } return _underlyingReader; } } IDbCommand IWrappedDataReader.Command { get { if (_cmd == null) { throw new ObjectDisposedException(GetType().Name); } return _cmd; } } void IDataReader.Close() => _underlyingReader?.Close(); int IDataReader.Depth => Reader.Depth; DataTable IDataReader.GetSchemaTable() { var rv = Reader.GetSchemaTable(); if (rv == null) { throw new ObjectDisposedException(GetType().Name); } for (var i = 0; i < _additionalFields.Count; i++) { var row = rv.NewRow(); row["ColumnName"] = _additionalFields[i].ColumnName; row["ColumnOrdinal"] = GetAppendColumnOrdinal(i); row["DataType"] = _additionalFields[i].DataType; rv.Rows.Add(row); } return rv; } bool IDataReader.IsClosed => _underlyingReader?.IsClosed ?? true; bool IDataReader.NextResult() => Reader.NextResult(); bool IDataReader.Read() { _currentRowIndex++; return Reader.Read(); } int IDataReader.RecordsAffected => Reader.RecordsAffected; void IDisposable.Dispose() { _underlyingReader?.Close(); _underlyingReader?.Dispose(); _underlyingReader = null; _cmd?.Dispose(); _cmd = null; } int IDataRecord.FieldCount => Reader.FieldCount + _additionalFields.Count; bool IDataRecord.GetBoolean(int i) => Reader.GetBoolean(i); byte IDataRecord.GetByte(int i) => Reader.GetByte(i); long IDataRecord.GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length) => Reader.GetBytes(i, fieldOffset, buffer, bufferoffset, length); char IDataRecord.GetChar(int i) => Reader.GetChar(i); long IDataRecord.GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length) => Reader.GetChars(i, fieldoffset, buffer, bufferoffset, length); IDataReader IDataRecord.GetData(int i) => Reader.GetData(i); string IDataRecord.GetDataTypeName(int i) => Reader.GetDataTypeName(i); DateTime IDataRecord.GetDateTime(int i) => Reader.GetDateTime(i); decimal IDataRecord.GetDecimal(int i) => Reader.GetDecimal(i); double IDataRecord.GetDouble(int i) => Reader.GetDouble(i); Type IDataRecord.GetFieldType(int i) => Reader.GetFieldType(i); float IDataRecord.GetFloat(int i) => Reader.GetFloat(i); Guid IDataRecord.GetGuid(int i) => Reader.GetGuid(i); short IDataRecord.GetInt16(int i) => Reader.GetInt16(i); int IDataRecord.GetInt32(int i) { return i >= _originalOrdinalCount ? (int) ExecuteAdditionalFieldFunc(i) : Reader.GetInt32(i); } long IDataRecord.GetInt64(int i) => Reader.GetInt64(i); string IDataRecord.GetName(int i) { return i >= _originalOrdinalCount ? _additionalFields[GetAppendColumnIndex(i)].ColumnName : Reader.GetName(i); } int IDataRecord.GetOrdinal(string name) { for (var i = 0; i < _additionalFields.Count; i++) { if (name.Equals(_additionalFields[i].ColumnName, StringComparison.OrdinalIgnoreCase)) { return GetAppendColumnOrdinal(i); } } return Reader.GetOrdinal(name); } string IDataRecord.GetString(int i) => Reader.GetString(i); object IDataRecord.GetValue(int i) { return i >= _originalOrdinalCount ? ExecuteAdditionalFieldFunc(i) : Reader.GetValue(i); } int IDataRecord.GetValues(object[] values) => Reader.GetValues(values); bool IDataRecord.IsDBNull(int i) { return i >= _originalOrdinalCount ? ExecuteAdditionalFieldFunc(i) == null : Reader.IsDBNull(i); } object IDataRecord.this[string name] { get { var ordinal = ((IDataRecord) this).GetOrdinal(name); return ((IDataRecord) this).GetValue(ordinal); } } object IDataRecord.this[int i] => ((IDataRecord) this).GetValue(i); private int GetAppendColumnOrdinal(int index) { return _originalOrdinalCount + index; } private int GetAppendColumnIndex(int oridinal) { return oridinal - _originalOrdinalCount; } private object ExecuteAdditionalFieldFunc(int oridinal) { return _additionalFields[GetAppendColumnIndex(oridinal)].Func(_currentRowIndex, Reader); } public struct AdditionalField { public AdditionalField(string columnName, Type dataType, Func func = null) { ColumnName = columnName; DataType = dataType; Func = func; } public string ColumnName; public Type DataType; public Func Func; } } 

这是我写的一个快速的NUnit测试来展示它的用法

 [Test] public void ReaderTest() { using (var conn = new SqlConnection(ConnectionSettingsCollection.Default)) { conn.Open(); const string sql = @" SELECT 1 as OriginalField UNION SELECT -500 as OriginalField UNION SELECT 100 as OriginalField "; var additionalFields = new[] { new WrappedDataReader.AdditionalField("StaticField", typeof(int), delegate { return "X"; }), new WrappedDataReader.AdditionalField("CounterField", typeof(int), (i, reader) => i), new WrappedDataReader.AdditionalField("ComputedField", typeof(int), (i, reader) => (int) reader["OriginalField"] + 1000) }; const string expectedJson = @" [ {""OriginalField"":-500,""StaticField"":""X"",""CounterField"":0,""ComputedField"":500}, {""OriginalField"":1, ""StaticField"":""X"",""CounterField"":1,""ComputedField"":1001}, {""OriginalField"":100, ""StaticField"":""X"",""CounterField"":2,""ComputedField"":1100} ] "; var actualJson = ToJson(new WrappedDataReader(new SqlCommand(sql, conn).ExecuteReader(), additionalFields)); Assert.Zero(CultureInfo.InvariantCulture.CompareInfo.Compare(expectedJson, actualJson, CompareOptions.IgnoreSymbols)); } } private static string ToJson(IDataReader reader) { using (var strWriter = new StringWriter(new StringBuilder())) using (var jsonWriter = new JsonTextWriter(strWriter)) { jsonWriter.WriteStartArray(); while (reader.Read()) { jsonWriter.WriteStartObject(); for (var i = 0; i < reader.FieldCount; i++) { jsonWriter.WritePropertyName(reader.GetName(i)); jsonWriter.WriteValue(reader[i]); } jsonWriter.WriteEndObject(); } jsonWriter.WriteEndArray(); return strWriter.ToString(); } } 

有可能的

  • 创建自己的实现IDataReader接口的类
  • 在类构造函数中添加现有的DataReader
  • 根据需要覆盖接口以从Base DataReader返回结果或返回您自己的计算值

只是为了得到一个想法,这可能是一个简单的实现(我跳过了大多数方法)

 public class WrapperDataReader : IDataReader { private IDataReader reader; public WrapperDataReader(IDataReader reader) { this.reader = reader; } public void Close() { reader.Close(); } public int Depth { get { return reader.Depth; } } public DataTable GetSchemaTable() { var schemaTable = reader.GetSchemaTable(); // add your computed column to the schema table schemaTable.Rows.Add(...); return schemaTable; } public bool GetBoolean(int i) { return reader.GetBoolean(i); } public int GetOrdinal(string name) { if (name.Equals("displayName", StringComparison.InvariantCultureIgnoreCase)) return 15; return reader.GetOrdinal(name); } public string GetString(int i) { if (i == 15) return String.Format("{0}, {1}", GetString(1), GetString(2)); // lastname, firstname return reader.GetString(i); } } 

更新

由于您可以使用WriteToServer方法,因此可以使用带有DataTable的重载。

  var connectionString = "..."; var copy = new SqlBulkCopy(connectionString, SqlBulkCopyOptions.Default); copy.DestinationTableName = "Customers"; var reader = new SqlDataReader(); var table = new DataTable(); table.Load(reader); table.Columns.Add("DisplayName", typeof(string), "lastname, firstname"); table.Columns.Add("CustomerCode", typeof(string)); foreach (DataRow row in table.Rows) row["CustomerCode"] = ((int)row["id"] + 10000).ToString(); copy.WriteToServer(table); 

Datareader仅用于读取数据。 您无法修改其架构或值

数据集/ DataTable就是为此目的。

DataReader是只读构造,因此无法修改。 您可以使用DataTable

我找到了另一种可能的解决方案,我目前正在使用:

  1. 从现有DataReader创建以下对象:
    IEnumerable – 表示数据
    List – 表示数据DataReader的列。

  2. 修改数据并添加额外的列

  3. 创建一个方法AsDataReader(IEnumerable updatedData,updatedColumns List) ,它获取更新对象并返回* modified DataReader

    • 我使用yield return来提高性能。