Realization of Large Data Batch Insertion in Several Databases by c

  • 2021-12-04 10:54:35
  • OfStack

Previously, I only knew that SqlServer supported batch insertion of data, but I didn't know that Oracle, SQLite and MySQL also supported it. However, Oracle needed to use Orace. DataAccess driver. Today, I posted several solutions for batch insertion of databases.

First of all, IProvider has a plug-in service interface IBatcherProvider for batch insertion, which has been mentioned in the previous article.


/// <summary> 
  ///  Provides methods for batch processing of data.  
  /// </summary> 
  public interface IBatcherProvider : IProviderService 
  { 
    /// <summary> 
    ///  Will  <see cref="DataTable"/>  Insert data into the database in batches.  
    /// </summary> 
    /// <param name="dataTable"> To batch insert  <see cref="DataTable"/> . </param> 
    /// <param name="batchSize"> The amount of data written in each batch. </param> 
    void Insert(DataTable dataTable, int batchSize = 10000); 
  } 

1. SqlServer data batch insertion

Batch insertion of SqlServer is very simple and can be done with SqlBulkCopy. Here is the implementation of this class:


/// <summary> 
  ///  For  System.Data.SqlClient  Gets or sets the method provided for batch operations.  
  /// </summary> 
  public sealed class MsSqlBatcher : IBatcherProvider 
  { 
    /// <summary> 
    ///  Gets or sets the context of the provider service.  
    /// </summary> 
    public ServiceContext ServiceContext { get; set; } 
 
    /// <summary> 
    ///  Will  <see cref="DataTable"/>  Insert data into the database in batches.  
    /// </summary> 
    /// <param name="dataTable"> To batch insert  <see cref="DataTable"/> . </param> 
    /// <param name="batchSize"> The amount of data written in each batch. </param> 
    public void Insert(DataTable dataTable, int batchSize = 10000) 
    { 
      Checker.ArgumentNull(dataTable, "dataTable"); 
      if (dataTable.Rows.Count == 0) 
      { 
        return; 
      } 
      using (var connection = (SqlConnection)ServiceContext.Database.CreateConnection()) 
      { 
        try 
        { 
          connection.TryOpen(); 
          // Derivative table names  
          var tableName = DbUtility.FormatByQuote(ServiceContext.Database.Provider.GetService<ISyntaxProvider>(), dataTable.TableName); 
          using (var bulk = new SqlBulkCopy(connection, SqlBulkCopyOptions.KeepIdentity, null) 
            { 
              DestinationTableName = tableName,  
              BatchSize = batchSize 
            }) 
          { 
            // Loop all columns for bulk Add mapping  
            dataTable.EachColumn(c => bulk.ColumnMappings.Add(c.ColumnName, c.ColumnName), c => !c.AutoIncrement); 
            bulk.WriteToServer(dataTable); 
            bulk.Close(); 
          } 
        } 
        catch (Exception exp) 
        { 
          throw new BatcherException(exp); 
        } 
        finally 
        { 
          connection.TryClose(); 
        } 
      } 
    } 
  } 

The above does not use transactions, using transactions will have a certain impact on performance. If you want to use transactions, you can set SqlBulkCopyOptions. UseInternalTransaction.

2. Oracle data batch insertion

System. Data. OracleClient does not support bulk inserts, so only Oracle. DataAccess components can be used as providers.


/// <summary> 
  /// Oracle.Data.Access  Gets or sets the methods for batch operations provided by the.  
  /// </summary> 
  public sealed class OracleAccessBatcher : IBatcherProvider 
  { 
    /// <summary> 
    ///  Gets or sets the context of the provider service.  
    /// </summary> 
    public ServiceContext ServiceContext { get; set; } 
 
    /// <summary> 
    ///  Will  <see cref="DataTable"/>  Insert data into the database in batches.  
    /// </summary> 
    /// <param name="dataTable"> To batch insert  <see cref="DataTable"/> . </param> 
    /// <param name="batchSize"> The amount of data written in each batch. </param> 
    public void Insert(DataTable dataTable, int batchSize = 10000) 
    { 
      Checker.ArgumentNull(dataTable, "dataTable"); 
      if (dataTable.Rows.Count == 0) 
      { 
        return; 
      } 
      using (var connection = ServiceContext.Database.CreateConnection()) 
      { 
        try 
        { 
          connection.TryOpen(); 
          using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) 
          { 
            if (command == null) 
            { 
              throw new BatcherException(new ArgumentException("command")); 
            } 
            command.Connection = connection; 
            command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable); 
            command.ExecuteNonQuery(); 
          } 
        } 
        catch (Exception exp) 
        { 
          throw new BatcherException(exp); 
        } 
        finally 
        { 
          connection.TryClose(); 
        } 
      } 
    } 
 
    /// <summary> 
    ///  Object for inserting data sql Statement.  
    /// </summary> 
    /// <param name="database"></param> 
    /// <param name="command"></param> 
    /// <param name="table"></param> 
    /// <returns></returns> 
    private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table) 
    { 
      var names = new StringBuilder(); 
      var values = new StringBuilder(); 
      // Will 1 A DataTable Converts the data of to an array  
      var data = table.ToArray(); 
 
      // Settings ArrayBindCount Attribute  
      command.GetType().GetProperty("ArrayBindCount").SetValue(command, table.Rows.Count, null); 
 
      var syntax = database.Provider.GetService<ISyntaxProvider>(); 
      for (var i = 0; i < table.Columns.Count; i++) 
      { 
        var column = table.Columns[i]; 
 
        var parameter = database.Provider.DbProviderFactory.CreateParameter(); 
        if (parameter == null) 
        { 
          continue; 
        } 
        parameter.ParameterName = column.ColumnName; 
        parameter.Direction = ParameterDirection.Input; 
        parameter.DbType = column.DataType.GetDbType(); 
        parameter.Value = data[i]; 
 
        if (names.Length > 0) 
        { 
          names.Append(","); 
          values.Append(","); 
        } 
        names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, column.ColumnName)); 
        values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName); 
 
        command.Parameters.Add(parameter); 
      } 
      return string.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values); 
    } 
  } 

The most important step above is to turn DataTable into an array representation of an array, that is, object [] []. The superscript of the former array is the number of columns, and the latter array is the number of rows. Therefore, the loop Columns takes the latter array as the value of Parameter, that is, the value of the parameter is an array. The insert statement is no different from the 1-like insertion statement.

3. Bulk insertion of SQLite data

Batch insertion of SQLite only needs to open transactions, and the specific principle of this is unknown.


public sealed class SQLiteBatcher : IBatcherProvider 
  { 
    /// <summary> 
    ///  Gets or sets the context of the provider service.  
    /// </summary> 
    public ServiceContext ServiceContext { get; set; } 
 
    /// <summary> 
    ///  Will  <see cref="DataTable"/>  Insert data into the database in batches.  
    /// </summary> 
    /// <param name="dataTable"> To batch insert  <see cref="DataTable"/> . </param> 
    /// <param name="batchSize"> The amount of data written in each batch. </param> 
    public void Insert(DataTable dataTable, int batchSize = 10000) 
    { 
      Checker.ArgumentNull(dataTable, "dataTable"); 
      if (dataTable.Rows.Count == 0) 
      { 
        return; 
      } 
      using (var connection = ServiceContext.Database.CreateConnection()) 
      { 
        DbTransaction transcation = null; 
        try 
        { 
          connection.TryOpen(); 
          transcation = connection.BeginTransaction(); 
          using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) 
          { 
            if (command == null) 
            { 
              throw new BatcherException(new ArgumentException("command")); 
            } 
            command.Connection = connection; 
 
            command.CommandText = GenerateInserSql(ServiceContext.Database, dataTable); 
            if (command.CommandText == string.Empty) 
            { 
              return; 
            } 
 
            var flag = new AssertFlag(); 
            dataTable.EachRow(row => 
              { 
                var first = flag.AssertTrue(); 
                ProcessCommandParameters(dataTable, command, row, first); 
                command.ExecuteNonQuery(); 
              }); 
          } 
          transcation.Commit(); 
        } 
        catch (Exception exp) 
        { 
          if (transcation != null) 
          { 
            transcation.Rollback(); 
          } 
          throw new BatcherException(exp); 
        } 
        finally 
        { 
          connection.TryClose(); 
        } 
      } 
    } 
 
    private void ProcessCommandParameters(DataTable dataTable, DbCommand command, DataRow row, bool first) 
    { 
      for (var c = 0; c < dataTable.Columns.Count; c++) 
      { 
        DbParameter parameter; 
        // Parameters are created for the first time to use cache  
        if (first) 
        { 
          parameter = ServiceContext.Database.Provider.DbProviderFactory.CreateParameter(); 
          parameter.ParameterName = dataTable.Columns[c].ColumnName; 
          command.Parameters.Add(parameter); 
        } 
        else 
        { 
          parameter = command.Parameters[c]; 
        } 
        parameter.Value = row[c]; 
      } 
    } 
 
    /// <summary> 
    ///  Object for inserting data sql Statement.  
    /// </summary> 
    /// <param name="database"></param> 
    /// <param name="table"></param> 
    /// <returns></returns> 
    private string GenerateInserSql(IDatabase database, DataTable table) 
    { 
      var syntax = database.Provider.GetService<ISyntaxProvider>(); 
      var names = new StringBuilder(); 
      var values = new StringBuilder(); 
      var flag = new AssertFlag(); 
      table.EachColumn(column => 
        { 
          if (!flag.AssertTrue()) 
          { 
            names.Append(","); 
            values.Append(","); 
          } 
          names.Append(DbUtility.FormatByQuote(syntax, column.ColumnName)); 
          values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName); 
        }); 
      return string.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values); 
    } 
  } 

4. MySql data batch insertion


/// <summary> 
  ///  For  MySql.Data  Gets or sets the methods for batch operations provided by the.  
  /// </summary> 
  public sealed class MySqlBatcher : IBatcherProvider 
  { 
    /// <summary> 
    ///  Gets or sets the context of the provider service.  
    /// </summary> 
    public ServiceContext ServiceContext { get; set; } 
 
    /// <summary> 
    ///  Will  <see cref="DataTable"/>  Insert data into the database in batches.  
    /// </summary> 
    /// <param name="dataTable"> To batch insert  <see cref="DataTable"/> . </param> 
    /// <param name="batchSize"> The amount of data written in each batch. </param> 
    public void Insert(DataTable dataTable, int batchSize = 10000) 
    { 
      Checker.ArgumentNull(dataTable, "dataTable"); 
      if (dataTable.Rows.Count == 0) 
      { 
        return; 
      } 
      using (var connection = ServiceContext.Database.CreateConnection()) 
      { 
        try 
        { 
          connection.TryOpen(); 
          using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) 
          { 
            if (command == null) 
            { 
              throw new BatcherException(new ArgumentException("command")); 
            } 
            command.Connection = connection; 
 
            command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable); 
            if (command.CommandText == string.Empty) 
            { 
              return; 
            } 
            command.ExecuteNonQuery(); 
          } 
        } 
        catch (Exception exp) 
        { 
          throw new BatcherException(exp); 
        } 
        finally 
        { 
          connection.TryClose(); 
        } 
      } 
    } 
 
    /// <summary> 
    ///  Object for inserting data sql Statement.  
    /// </summary> 
    /// <param name="database"></param> 
    /// <param name="command"></param> 
    /// <param name="table"></param> 
    /// <returns></returns> 
    private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table) 
    { 
      var names = new StringBuilder(); 
      var values = new StringBuilder(); 
      var types = new List<DbType>(); 
      var count = table.Columns.Count; 
      var syntax = database.Provider.GetService<ISyntaxProvider>(); 
      table.EachColumn(c => 
        { 
          if (names.Length > 0) 
          { 
            names.Append(","); 
          } 
          names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, c.ColumnName)); 
          types.Add(c.DataType.GetDbType()); 
        }); 
 
      var i = 0; 
      foreach (DataRow row in table.Rows) 
      { 
        if (i > 0) 
        { 
          values.Append(","); 
        } 
        values.Append("("); 
        for (var j = 0; j < count; j++) 
        { 
          if (j > 0) 
          { 
            values.Append(", "); 
          } 
          var isStrType = IsStringType(types[j]); 
          var parameter = CreateParameter(database.Provider, isStrType, types[j], row[j], syntax.ParameterPrefix, i, j); 
          if (parameter != null) 
          { 
            values.Append(parameter.ParameterName); 
            command.Parameters.Add(parameter); 
          } 
          else if (isStrType) 
          { 
            values.AppendFormat("'{0}'", row[j]); 
          } 
          else 
          { 
            values.Append(row[j]); 
          } 
        } 
        values.Append(")"); 
        i++; 
      } 
      return string.Format("INSERT INTO {0}({1}) VALUES {2}", DbUtility.FormatByQuote(syntax, table.TableName), names, values); 
    } 
 
    /// <summary> 
    ///  Determines whether it is a string category.  
    /// </summary> 
    /// <param name="dbType"></param> 
    /// <returns></returns> 
    private bool IsStringType(DbType dbType) 
    { 
      return dbType == DbType.AnsiString || dbType == DbType.AnsiStringFixedLength || dbType == DbType.String || dbType == DbType.StringFixedLength; 
    } 
 
    /// <summary> 
    ///  Create parameters.  
    /// </summary> 
    /// <param name="provider"></param> 
    /// <param name="isStrType"></param> 
    /// <param name="dbType"></param> 
    /// <param name="value"></param> 
    /// <param name="parPrefix"></param> 
    /// <param name="row"></param> 
    /// <param name="col"></param> 
    /// <returns></returns> 
    private DbParameter CreateParameter(IProvider provider, bool isStrType, DbType dbType, object value, char parPrefix, int row, int col) 
    { 
      // If all the parameters are generated, the speed will be slow, so only the data type is string ( Include ' No. ) And date type  
      if ((isStrType && value.ToString().IndexOf('\'') != -1) || dbType == DbType.DateTime) 
      { 
        var name = string.Format("{0}p_{1}_{2}", parPrefix, row, col); 
        var parameter = provider.DbProviderFactory.CreateParameter(); 
        parameter.ParameterName = name; 
        parameter.Direction = ParameterDirection.Input; 
        parameter.DbType = dbType; 
        parameter.Value = value; 
        return parameter; 
      } 
      return null; 
    } 
  } 

Batch insertion of MySql is to write all the values in the values of the statement, for example, insert batcher (id, name) values (1, '1', 2, '2', 3, '3',...... 10, '10').

Step 5 Test

Next, write a test case to see the effect of using batch insert in 1.


    public void TestBatchInsert() 
    { 
      Console.WriteLine(TimeWatcher.Watch(() => 
        InvokeTest(database => 
          { 
            var table = new DataTable("Batcher"); 
            table.Columns.Add("Id", typeof(int)); 
            table.Columns.Add("Name1", typeof(string)); 
            table.Columns.Add("Name2", typeof(string)); 
            table.Columns.Add("Name3", typeof(string)); 
            table.Columns.Add("Name4", typeof(string)); 
 
            // Structure 100000 Bar data  
            for (var i = 0; i < 100000; i++) 
            { 
              table.Rows.Add(i, i.ToString(), i.ToString(), i.ToString(), i.ToString()); 
            } 
 
            // Get  IBatcherProvider 
            var batcher = database.Provider.GetService<IBatcherProvider>(); 
            if (batcher == null) 
            { 
              Console.WriteLine(" Bulk inserts are not supported. "); 
            } 
            else 
            { 
              batcher.Insert(table); 
            } 
 
            // Output batcher Data volume of table  
            var sql = new SqlCommand("SELECT COUNT(1) FROM Batcher"); 
            Console.WriteLine(" Current common  {0}  Bar data ", database.ExecuteScalar(sql)); 
 
          }))); 
    } 

The following table lists the time taken by each of the four databases to generate 100,000 pieces of data

数据库

耗用时间

MsSql 00:00:02.9376300
Oracle 00:00:01.5155959
SQLite 00:00:01.6275634
MySql 00:00:05.4166891


Related articles: