当前位置:网站首页> 利用MySqlBulkLoader实现批量插入数据的示例详解
利用MySqlBulkLoader实现批量插入数据的示例详解
2022-06-28 15:01:00 【1024问】
介绍
1.将List转化为DataTable
2.将DataTable转换为标准的CSV文件
3.CSV文件导入数据到数据库
4.使用MySqlBulkLoader批量插入数据
5.完整的代码
介绍最近在项目中遇到插入数据瓶颈,几万、几十万、几百万的数据保存到MYSQL数据库,使用EF插入数据速度非常慢,数据量非常大时EF插入需要几十分钟,甚至几个小时,这样子的速度肯定不是我们所期望的。
后面经过了解与研究发现MySqlBulkLoader,可以批量将数据插入到数据库并且速度上面远远优于EF。
MySqlBulkLoader主要的实现方式:将需要插入的数据转成DataTable,DataTable转成一个CSV文件,将CSV文件使用批量导入的形式导入到数据库里面去。
注意:
1).数据库连接地址需要添加配置AllowLoadLocalInfile=true,允许本地文件导入;
Data Source = 数据库地址; Port = 端口; Initial Catalog = 数据库名; User Id = 用户名; Password = 密码;AllowLoadLocalInfile=true;
2).插入的时候会返回插入行数,但是检查所有的数据都正确,也没有报异常,却返回了插入数量为0,可以检查表是否有唯一索引,插入的数据是否违反了唯一索引。
(以下分块展示了代码,如果需要看完整的代码直接看 5.完整的代码)
1.将List转化为DataTable/// <summary> /// 将List转化为DataTable /// </summary> /// <returns></returns> public DataTable ListToDataTable<T>(List<T> data) { #region 创建一个DataTable,以实体名称作为DataTable名称 var tableName = typeof(T).Name; tableName = tableName.ToSnakeCase(); /*实体名称与表名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/ DataTable dt = new DataTable { TableName = tableName }; #endregion #region 拿取列名,以实体的属性名作为列名 var properties = typeof(T).GetProperties(); foreach (var item in properties) { var curFileName = item.Name; curFileName = curFileName.ToSnakeCase();/*列名与字段名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/ dt.Columns.Add(curFileName); } #endregion #region 列赋值 foreach (var item in data) { DataRow dr = dt.NewRow(); var columns = dt.Columns; var curPropertyList = item.GetType().GetProperties(); foreach (var p in curPropertyList) { var name = p.Name; name = name.ToSnakeCase();/*列名与字段名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/ var curValue = p.GetValue(item); int i = columns.IndexOf(name); dr[i] = curValue; } dt.Rows.Add(dr); } #endregion return dt; }2.将DataTable转换为标准的CSV文件 /// <summary> /// csv扩展 /// </summary> public static class CSVEx { /// <summary> ///将DataTable转换为标准的CSV文件 /// </summary> /// <param name="table">数据表</param> /// <param name="tmpPath">文件地址</param> /// <returns>返回标准的CSV</returns> public static void ToCsv(this DataTable table, string tmpPath) { //以半角逗号(即,)作分隔符,列为空也要表达其存在。 //列内容如存在半角逗号(即,)则用半角引号(即"")将该字段值包含起来。 //列内容如存在半角引号(即")则应替换成半角双引号("")转义,并用半角引号(即"")将该字段值包含起来。 StringBuilder sb = new StringBuilder(); DataColumn colum; foreach (DataRow row in table.Rows) { for (int i = 0; i < table.Columns.Count; i++) { Type _datatype = typeof(DateTime); colum = table.Columns[i]; if (i != 0) sb.Append("\t"); //if (colum.DataType == typeof(string) && row[colum].ToString().Contains(",")) //{ // sb.Append("\"" + row[colum].ToString().Replace("\"", "\"\"") + "\""); //} if (colum.DataType == _datatype) { sb.Append(((DateTime)row[colum]).ToString("yyyy/MM/dd HH:mm:ss")); } else sb.Append(row[colum].ToString()); } sb.Append("\r\n"); } StreamWriter sw = new StreamWriter(tmpPath, false, UTF8Encoding.UTF8); sw.Write(sb.ToString()); sw.Close(); } }3.CSV文件导入数据到数据库/// <summary> /// 批量导入mysql帮助类 /// </summary> public static class MySqlHelper { /// <summary> /// MySqlBulkLoader批量导入 /// </summary> /// <param name="_mySqlConnection">数据库连接地址</param> /// <param name="table"></param> /// <param name="csvName"></param> /// <returns></returns> public static int BulkLoad(MySqlConnection _mySqlConnection, DataTable table, string csvName) { var columns = table.Columns.Cast<DataColumn>().Select(colum => colum.ColumnName).ToList(); MySqlBulkLoader bulk = new MySqlBulkLoader(_mySqlConnection) { FieldTerminator = "\t", FieldQuotationCharacter = '"', EscapeCharacter = '"', LineTerminator = "\r\n", FileName = csvName, NumberOfLinesToSkip = 0, TableName = table.TableName, }; bulk.Columns.AddRange(columns); return bulk.Load(); } }4.使用MySqlBulkLoader批量插入数据/// <summary> /// 使用MySqlBulkLoader批量插入数据 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="data"></param> /// <returns></returns> /// <exception cref="Exception"></exception> public int BulkLoaderData<T>(List<T> data) { if (data.Count <= 0) return 0; var connectString = "数据库连接地址"; using (MySqlConnection connection = new MySqlConnection(connectString)) { MySqlTransaction sqlTransaction = null; try { if (connection.State == ConnectionState.Closed) { connection.Open(); } sqlTransaction = connection.BeginTransaction(); var dt = ListToDataTable<T>(data); //将List转成dataTable string tmpPath = Path.GetTempFileName(); dt.ToCsv(tmpPath); //将DataTable转成CSV文件 var insertCount = MySqlHelper.BulkLoad(connection, dt, tmpPath); //使用MySqlBulkLoader插入数据 sqlTransaction.Commit(); try { if (File.Exists(tmpPath)) File.Delete(tmpPath); } catch (Exception) { //删除文件失败 } return insertCount; //返回执行成功的条数 } catch (Exception e) { if (sqlTransaction != null) { sqlTransaction.Rollback(); } //执行异常 throw e; } } }5.完整的代码namespace WebApplication1.BrantchInsert{ /// <summary> /// 批量插入 /// </summary> public class BulkLoader { /// <summary> /// 测试批量插入入口 /// </summary> /// <returns></returns> public int BrantchDataTest() { #region 模拟数据 var data = new List<CrmCouponTestDto>() { new CrmCouponTestDto { Id=1, CouponCode="test001", CouponId = 1, MemberId=100, IssueTime=Convert.ToDateTime("2022-06-27 14:00:00"), UsageTime=Convert.ToDateTime("3000-12-31 00:00:00"), UsageShopId=0, UsageBillNo="", EffectiveStart=Convert.ToDateTime("2022-06-27 14:00:00"), EffectiveEnd=Convert.ToDateTime("2023-06-27 14:00:00"), Status=0 }, new CrmCouponTestDto { Id=2, CouponCode="test002", CouponId = 1, MemberId=101, IssueTime=Convert.ToDateTime("2022-06-27 14:00:00"), UsageTime=Convert.ToDateTime("2022-06-27 14:30:00"), UsageShopId=2, UsageBillNo="CS202206271430001", EffectiveStart=Convert.ToDateTime("2022-06-27 14:00:00"), EffectiveEnd=Convert.ToDateTime("2023-06-27 14:00:00"), Status=1 }, new CrmCouponTestDto { Id=3, CouponCode="test003", CouponId = 1, MemberId=102, IssueTime=Convert.ToDateTime("2022-06-27 14:00:00"), UsageTime=Convert.ToDateTime("3000-12-31 00:00:00"), UsageShopId=0, UsageBillNo="", EffectiveStart=Convert.ToDateTime("2022-06-27 14:00:00"), EffectiveEnd=Convert.ToDateTime("2023-06-27 14:00:00"), Status=0 }, new CrmCouponTestDto { Id=4, CouponCode="test004", CouponId = 1, MemberId=103, IssueTime=Convert.ToDateTime("2022-06-27 14:00:00"), UsageTime=Convert.ToDateTime("3000-12-31 00:00:00"), UsageShopId=0, UsageBillNo="", EffectiveStart=Convert.ToDateTime("2022-06-27 14:00:00"), EffectiveEnd=Convert.ToDateTime("2023-06-27 14:00:00"), Status=0 } }; #endregion var result = BulkLoaderData<CrmCouponTestDto>(data); return result; } /// <summary> /// 使用MySqlBulkLoader批量插入数据 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="data"></param> /// <returns></returns> /// <exception cref="Exception"></exception> public int BulkLoaderData<T>(List<T> data) { if (data.Count <= 0) return 0; var connectString = "数据库连接地址"; using (MySqlConnection connection = new MySqlConnection(connectString)) { MySqlTransaction sqlTransaction = null; try { if (connection.State == ConnectionState.Closed) { connection.Open(); } sqlTransaction = connection.BeginTransaction(); var dt = ListToDataTable<T>(data); //将List转成dataTable string tmpPath = Path.GetTempFileName(); dt.ToCsv(tmpPath); //将DataTable转成CSV文件 var insertCount = MySqlHelper.BulkLoad(connection, dt, tmpPath); //使用MySqlBulkLoader插入数据 sqlTransaction.Commit(); try { if (File.Exists(tmpPath)) File.Delete(tmpPath); } catch (Exception) { //删除文件失败 } return insertCount; //返回执行成功的条数 } catch (Exception e) { if (sqlTransaction != null) { sqlTransaction.Rollback(); } //执行异常 throw e; } } } /// <summary> /// 将List转化为DataTable核心方法 /// </summary> /// <returns></returns> public DataTable ListToDataTable<T>(List<T> data) { #region 创建一个DataTable,以实体名称作为DataTable名称 var tableName = typeof(T).Name; tableName = tableName.ToSnakeCase(); /*实体名称与表名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/ DataTable dt = new DataTable { TableName = tableName }; #endregion #region 拿取列名,以实体的属性名作为列名 var properties = typeof(T).GetProperties(); foreach (var item in properties) { var curFileName = item.Name; curFileName = curFileName.ToSnakeCase();/*列名与字段名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/ dt.Columns.Add(curFileName); } #endregion #region 列赋值 foreach (var item in data) { DataRow dr = dt.NewRow(); var columns = dt.Columns; var curPropertyList = item.GetType().GetProperties(); foreach (var p in curPropertyList) { var name = p.Name; name = name.ToSnakeCase();/*列名与字段名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/ var curValue = p.GetValue(item); int i = columns.IndexOf(name); dr[i] = curValue; } dt.Rows.Add(dr); } #endregion return dt; } } /// <summary> /// 批量导入mysql帮助类 /// </summary> public static class MySqlHelper { /// <summary> /// MySqlBulkLoader批量导入 /// </summary> /// <param name="_mySqlConnection">数据库连接地址</param> /// <param name="table"></param> /// <param name="csvName"></param> /// <returns></returns> public static int BulkLoad(MySqlConnection _mySqlConnection, DataTable table, string csvName) { var columns = table.Columns.Cast<DataColumn>().Select(colum => colum.ColumnName).ToList(); MySqlBulkLoader bulk = new MySqlBulkLoader(_mySqlConnection) { FieldTerminator = "\t", FieldQuotationCharacter = '"', EscapeCharacter = '"', LineTerminator = "\r\n", FileName = csvName, NumberOfLinesToSkip = 0, TableName = table.TableName, }; bulk.Columns.AddRange(columns); return bulk.Load(); } } /// <summary> /// csv扩展 /// </summary> public static class CSVEx { /// <summary> ///将DataTable转换为标准的CSV文件 /// </summary> /// <param name="table">数据表</param> /// <param name="tmpPath">文件地址</param> /// <returns>返回标准的CSV</returns> public static void ToCsv(this DataTable table, string tmpPath) { //以半角逗号(即,)作分隔符,列为空也要表达其存在。 //列内容如存在半角逗号(即,)则用半角引号(即"")将该字段值包含起来。 //列内容如存在半角引号(即")则应替换成半角双引号("")转义,并用半角引号(即"")将该字段值包含起来。 StringBuilder sb = new StringBuilder(); DataColumn colum; foreach (DataRow row in table.Rows) { for (int i = 0; i < table.Columns.Count; i++) { Type _datatype = typeof(DateTime); colum = table.Columns[i]; if (i != 0) sb.Append("\t"); //if (colum.DataType == typeof(string) && row[colum].ToString().Contains(",")) //{ // sb.Append("\"" + row[colum].ToString().Replace("\"", "\"\"") + "\""); //} if (colum.DataType == _datatype) { sb.Append(((DateTime)row[colum]).ToString("yyyy/MM/dd HH:mm:ss")); } else sb.Append(row[colum].ToString()); } sb.Append("\r\n"); } StreamWriter sw = new StreamWriter(tmpPath, false, UTF8Encoding.UTF8); sw.Write(sb.ToString()); sw.Close(); } } /// <summary> /// 字符串转化 /// </summary> public static class StringExtensions { /// <summary> /// 转换为 main_keys_id 这种形式的字符串方式 /// </summary> public static string ToSnakeCase(this string input) { if (string.IsNullOrEmpty(input)) { return input; } var startUnderscores = Regex.Match(input, @"^_+"); return startUnderscores + Regex.Replace(input, @"([a-z0-9])([A-Z])", "$1_$2").ToLower(); } } /// <summary> /// 实体 /// </summary> public class CrmCouponTestDto { /// <summary> /// ID /// </summary> public long Id { get; set; } /// <summary> /// 卡券号 /// </summary> public string CouponCode { get; set; } /// <summary> /// 卡券ID /// </summary> public int CouponId { get; set; } /// <summary> /// 会员ID /// </summary> public int MemberId { get; set; } /// <summary> /// 发放时间 /// </summary> public DateTime IssueTime { get; set; } /// <summary> /// 使用时间 /// </summary> public DateTime UsageTime { get; set; } /// <summary> /// 使用店铺ID /// </summary> public int UsageShopId { get; set; } /// <summary> /// 使用单号 /// </summary> public string UsageBillNo { get; set; } /// <summary> /// 有效开始时间 /// </summary> public DateTime EffectiveStart { get; set; } /// <summary> /// 有效结束时间 /// </summary> public DateTime EffectiveEnd { get; set; } /// <summary> /// 状态 /// CouponStatus 卡券状态: /// -1:未领用 /// 0:未使用 /// 1:已使用 /// 2:已过期 ///3:已作废 ///4:转赠中 /// </summary> public Int16 Status { get; set; } }}以上就是利用MySqlBulkLoader实现批量插入数据的示例详解的详细内容,更多关于MySqlBulkLoader批量插入数据的资料请关注软件开发网其它相关文章!
边栏推荐
猜你喜欢

dolphinscheduler2.X的安装(亲测有效)

动力电池,是这样被“瓜分”的

Power battery is divided up like this

Le patron a donné trois ordres: discret, discret, discret

坐拥1200亿,她又要IPO敲钟了

腾讯再遭大股东Prosus减持:后者还从京东套现37亿美元

Maingene listed on the Hong Kong Stock Exchange: IPO with a market value of HK $4.3 billion was ignored by the market

Ding! Techo day Tencent technology open day arrived as scheduled!

Jackie Chan and fast brand, who is the Savior of Kwai?

Leetcode 705. Design hash collection
随机推荐
Youju new material rushes to Shenzhen Stock Exchange: it plans to raise 650million yuan, with an annual revenue of 333million yuan
安杰思医学冲刺科创板:年营收3亿 拟募资7.7亿
Technical trendsetter
张同学还没学会当主播
验证回文串
Q-Tester 3.2:适用于开发、生产和售后的诊断测试软件
Case driven: a detailed guide from getting started to mastering shell programming
Leetcode(167)——两数之和 II - 输入有序数组
PMP认证证书的续证费用是多少?
[JS] Fibonacci sequence implementation (recursion and loop)
兼顾企业抗疫和发展的5个解决方案,来自IBM
Construction and management practice of ByteDance buried point data flow
5000倍回报,南非报业投资腾讯赚了一个省
vector详解+题目
从莫高窟到太平洋,海量数据找到了新家园
不要使用短路逻辑编写 stl sorter 多条件比较
Jingyuan's safe sprint to the Growth Enterprise Market: it plans to raise 400million yuan for investment and Yunyou software is the shareholder
老板囑咐了三遍:低調、低調、低調
物联网低代码平台常用《组件介绍》
雷科防务:4D毫米波雷达产品预计可以在年底量产供货