当前位置:网站首页>Sparksql inserts or updates in batches and saves data to MySQL
Sparksql inserts or updates in batches and saves data to MySQL
2022-07-29 04:56:00 【Alex_ 81D】
stay sparksql in , Save data to data , Only Append , Overwrite , ErrorIfExists, Ignore Four patterns , Does not meet the needs of the project , Here is a general description of our needs , When the data in the business library changes , You need to update 、 Insert 、 Delete data warehouse ods Layer data , Therefore, we need to transform the source code .
Current basis spark save Source code , Carry out further transformation , Save data in bulk , If it exists, it will be updated non-existent The insert
import com.sun.corba.se.impl.activation.ServerMain.logError
import org.apache.spark.SparkContext
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.getCommonJDBCType
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties
object TestInsertOrUpdateMysql {
val url: String = "jdbc:mysql://192.168.1.1:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowMultiQueries=true&autoReconnect=true&failOverReadOnly=false"
val driver: String = "com.mysql.jdbc.Driver"
val user: String = "123"
val password: String = "123"
val sql: String = "select * from testserver "
val table: String = "testinsertorupdate"
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.appName("testSqlServer").getOrCreate()
val dbtable = "(" + sql + ") AS Temp"
val jdbcDF = spark.read
.format("jdbc")
.option("driver", driver)
.option("user", user)
.option("password", password)
.option("url", url)
.option("dbtable", dbtable)
.load()
jdbcDF.show()
// Normal write to database
//commonWrite(jdbcDF)
//saveorupdate
insertOrUpdateToMysql("id", jdbcDF, spark)
println("====================== Program end ======================")
}1. First, let's see how the ordinary insertion is written
def commonWrite(jdbcDF: DataFrame): Unit = {
val properties = new Properties()
properties.put("user", user)
properties.put("password", password)
properties.put("driver", driver)
jdbcDF.write.mode(SaveMode.Append).jdbc(url, table, properties)
}This method is relatively limited , You can only do some simple insertion ( Append or overwrite SaveMode.Append)
So what is the new way of writing , First write mysql Syntax rules for updating or inserting :
INSERT INTO t_name ( c1, c2, c3 )
VALUES
( 1, '1', '1')
ON DUPLICATE KEY UPDATE
c2 = '2';It should be noted that there must be a primary key , You can't update without a primary key ;
2. to glance at insertorupdate Writing
// Write to database , Batch insert Or update data , The method reference Spark.write.save() Source code
// The following rules :
// If there is no key primary key field, it is insert , Update when there is
def insertOrUpdateToMysql(primaryKey: String, jdbcDF: DataFrame, spark: SparkSession): Unit = {
val sc: SparkContext = spark.sparkContext
spark.conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
//1. Load driver
Class.forName(driver);
//2. Get database connection
val conn: Connection = DriverManager.getConnection(url, user, password);
val tableSchema = jdbcDF.schema
val columns = tableSchema.fields.map(x => x.name).mkString(",")
val placeholders = tableSchema.fields.map(_ => "?").mkString(",")
val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders) on duplicate key update "
val update = tableSchema.fields.map(x =>
x.name.toString + "=?"
).mkString(",")
//ON DUPLICATE KEY UPDATE
//on conflict($primaryKey) do update set
val realsql = sql.concat(update)
conn.setAutoCommit(false)
val dialect = JdbcDialects.get(conn.getMetaData.getURL)
val broad_ps = sc.broadcast(conn.prepareStatement(realsql))
val numFields = tableSchema.fields.length * 2
// call spark The function that comes with , Get attribute fields and field types
val nullTypes = tableSchema.fields.map(f => getJdbcType(f.dataType, dialect).jdbcNullType)
val setters = tableSchema.fields.map(f => makeSetter(conn, f.dataType))
var rowCount = 0
val batchSize = 2000
val updateindex = numFields / 2
try {
jdbcDF.foreachPartition(iterator => {
// Traverse batch submissions
val ps = broad_ps.value
try {
while (iterator.hasNext) {
val row = iterator.next()
var i = 0
while (i < numFields) {
i < updateindex match {
case true => {
if (row.isNullAt(i)) {
ps.setNull(i + 1, nullTypes(i))
} else {
setters(i).apply(ps, row, i, 0)
}
}
case false => {
if (row.isNullAt(i - updateindex)) {
ps.setNull(i + 1, nullTypes(i - updateindex))
} else {
setters(i - updateindex).apply(ps, row, i, updateindex)
}
}
}
i = i + 1
}
ps.addBatch()
rowCount += 1
if (rowCount % batchSize == 0) {
ps.executeBatch()
rowCount = 0
}
}
if (rowCount > 0) {
ps.executeBatch()
}
} finally {
ps.close()
}
})
conn.commit()
} catch {
case e: Exception =>
logError("Error in execution of insert. " + e.getMessage)
conn.rollback()
// insertError(connectionPool("OuCloud_ODS"),"insertOrUpdateToPgsql",e.getMessage)
} finally {
conn.close()
}
}Several source code packages
private def getJdbcType(dt: DataType, dialect: JdbcDialect): JdbcType = {
dialect.getJDBCType(dt).orElse(getCommonJDBCType(dt)).getOrElse(
throw new IllegalArgumentException(s"Can't get JDBC type for ${dt.catalogString}"))
}
private type JDBCValueSetter_add = (PreparedStatement, Row, Int, Int) => Unit
private def makeSetter(conn: Connection, dataType: DataType): JDBCValueSetter_add = dataType match {
case IntegerType =>
(stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
stmt.setInt(pos + 1, row.getInt(pos - currentpos))
case LongType =>
(stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
stmt.setLong(pos + 1, row.getLong(pos - currentpos))
case DoubleType =>
(stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
stmt.setDouble(pos + 1, row.getDouble(pos - currentpos))
case FloatType =>
(stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
stmt.setFloat(pos + 1, row.getFloat(pos - currentpos))
case ShortType =>
(stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
stmt.setInt(pos + 1, row.getShort(pos - currentpos))
case ByteType =>
(stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
stmt.setInt(pos + 1, row.getByte(pos - currentpos))
case BooleanType =>
(stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
stmt.setBoolean(pos + 1, row.getBoolean(pos - currentpos))
case StringType =>
(stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
stmt.setString(pos + 1, row.getString(pos - currentpos))
case BinaryType =>
(stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
stmt.setBytes(pos + 1, row.getAs[Array[Byte]](pos - currentpos))
case TimestampType =>
(stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
stmt.setTimestamp(pos + 1, row.getAs[java.sql.Timestamp](pos - currentpos))
case DateType =>
(stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
stmt.setDate(pos + 1, row.getAs[java.sql.Date](pos - currentpos))
case t: DecimalType =>
(stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
stmt.setBigDecimal(pos + 1, row.getDecimal(pos - currentpos))
case _ =>
(stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
throw new IllegalArgumentException(
s"Can't translate non-null value for field $pos")
}
There is a key attribute that I list , No mistake will be reported Exception in thread "main" java.io.NotSerializableException: com.mysql.jdbc.JDBC42PreparedStatement:
spark.conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")Add this configuration This third-party serialization By default javaSerializer no way
give the result as follows , There are some holes in this , Friends in need , We can communicate

attach postgreesql Update or insert syntax :
INSERT INTO test_001 ( c1, c2, c3 )
VALUES( ?, ?, ? )
ON conflict ( ID ) DO
UPDATE SET c1=?,c2 = ?,c3 = ?;MySQL Of on duplicate key update Use _ El Nino's summer blog -CSDN Blog
Blogger qq:907044657, Welcome to exchange and study together , If you have any questions, please point out , Reprint, please indicate the source , Thank you very much
边栏推荐
- 【微信小程序--解决display:flex最后一行对齐问题。(不连续排列会分到两边)】
- Download addresses of various versions of MySQL and multi version coexistence installation
- How to monitor micro web services
- 网络之以太网
- < El table column> place multiple pictures
- Reveal installation configuration debugging
- 如何避免示波器电流探头损坏
- Various configurations when pulsar starts the client (client, producer, consumer)
- P2181 diagonal
- Correct user dragging method
猜你喜欢

spinning up安装完使用教程测试是否成功,出现Library“GLU“ not found和‘from pyglet.gl import *错误解决办法

ios面试准备 - 网络篇

数据湖:分布式开源处理引擎Spark

Use more flexible and convenient Rogowski coil

使用近场探头和电流探头进行EMI干扰排查

< El table column> place multiple pictures

The most comprehensive promotion plan for the launch of new products

带你一文理解JS数组

新产品上市最全推广方案

Un7.28: common commands of redis client.
随机推荐
1 句代码,搞定 ASP.NET Core 绑定多个源到同一个类
P1009 [noip1998 popularization group] sum of factorials
How to debug UDP port
leetcode 763. Partition Labels 划分字母区间(中等)
Using jupyter (I), install jupyter under windows, open the browser, and modify the default opening address
Reply from the Secretary of jindawei: the company is optimistic about the market prospect of NMN products and has launched a series of products
Opencv environment construction
un7.28:redis客户端常用命令。
Implementation of img responsive pictures (including the usage of srcset attribute and sizes attribute, and detailed explanation of device pixel ratio)
Makefile+make Basics
常见的限流方式
【微信小程序--解决display:flex最后一行对齐问题。(不连续排列会分到两边)】
Basic grammar of C language
虚拟偶像的歌声原来是这样生成的!
Data Lake: spark, a distributed open source processing engine
The most complete NLP Chinese and English stop words list in the whole station (including punctuation marks, which can be copied directly)
如何避免示波器电流探头损坏
IOS interview preparation - Online
Delete blank pages in word documents
Word如何查看文档修改痕迹?Word查看文档修改痕迹的方法