当前位置:网站首页>如何使用 DATAX 以 UPSERT 语义更新下游 ORACLE 数据库中的数据?
如何使用 DATAX 以 UPSERT 语义更新下游 ORACLE 数据库中的数据?
2022-06-13 11:32:00 【IT明哥】
如何使用 DATAX 以 UPSERT 语义更新下游 ORACLE 数据库中的数据?
1 业务背景
在数据架构上,很多大数据项目,都会将 HIVE/SPARK 等离线计算引擎计算获得的结果数据同步到下游业务系统的线上数据库,以对外提供服务,而且很多业务系统需要为客户提供稳定的 7*24小时的数据查询功能,要求底层数据库中的数据,需要是准确的,不能出现部分数据缺失的情况。
具体到数据同步工具的选型上,datax 是阿里开源的一款流行的数据集成工具,通过插件机制实现了多种异构数据之间的高效的离线数据同步,目前开源版本 datax 支持的插件已经将近 30 多种了,所以很多大数据项目都选用了 DATAX 来做离线数据的同步。
为保证数据的完整性与准确性,在使用 DATAX 进行数据同步时,目前很多项目都采用了先删除旧数据再插入计算生成的新数据的方式 (通过在作业中配置preSql执行旧数据的删除),此时当需要同步的数据量比较大时,旧数据的删除与新数据的插入,都需要一段时间,此时下游数据库中的表不可避免地会有一段时间的空档期,查询不到对应的数据。
怎么解决这个问题呢?
DATAX 官方推荐的一种方式是配置使用临时表,先向临时表导入数据,完成后再 rename 到线上表(可以通过在作业中配置postSql完成这类操作)。
除了临时表这种曲线救国的方式,也可以尝试以 UPSERT 语义直接更新下游数据库中线上的目标表数据。
那么 DATAX 中,不同数据库 WRITER 插件都是怎么实现 UPSERT 语义的呢?
2 DATAX 常见数据库 WRITER 插件是怎么实现 UPSERT 语义的?
datax 的 MysqlWriter 和 oceanbasev10writer, 支持配置 writeMode 参数为 insert/replace/update,可以通过该参数控制写入数据到目标表时,底层采用 insert into/replace into/INSERT INTO ... ON DUPLICATE KEY UPDATE 语句:
其中 insert into 当主键/唯一性索引冲突时会写不进去冲突的行; 后两者没有遇到主键/唯一性索引冲突时与 insert into 行为一致,遇到冲突时会用新行替换原有行所有字段;
datax原生的 OracleWriter 和PostgresqlWriter,不支持配置writeMode 参数,在底层实现上都是通过 JDBC 连接远程 Oracle/PG 数据库,并执行相应的 insert into ... sql 语句将数据写入 Oracle/pg,在内部会分批次提交入库。
那么,能不能更改原生的 OracleWriter 以支持 UPSERT 语义插入 ORALCE 呢?
ORACLE 的 MERGE INTO 语句
Oracle 9i 引入了对 merge语句的支持, 通过 merge 能够在一个SQL语句中对一个表同时执行 inserts 和 updates操作, Oracle 10g 对 MERGE 语句又做了如下增强:
UPDATE或INSERT子句是可选的 UPDATE和INSERT子句可以加WHERE子句 在ON条件中可以使用常量过滤谓词来insert所有的行到目标表中,不需要连接源表和目标表 UPDATE子句后面可以跟DELETE子句来去除一些不需要的行。
merge into 语句语法如下:
MERGE INTO [target-table] A USING [source-table sql] B
ON([conditional expression] and [...]...)
WHEN MATCHED THEN
[UPDATE sql]
WHEN NOT MATCHED THEN
[INSERT sql]
merge into 语句实例如下:
MERGE INTO member_staging x
USING (SELECT member_id, first_name, last_name, rank FROM members) y
ON (x.member_id = y.member_id)
WHEN MATCHED THEN
UPDATE SET x.first_name = y.first_name,
x.last_name = y.last_name,
x.rank = y.rank
WHERE x.first_name <> y.first_name OR
x.last_name <> y.last_name OR
x.rank <> y.rank
WHEN NOT MATCHED THEN
INSERT(x.member_id, x.first_name, x.last_name, x.rank)
VALUES(y.member_id, y.first_name, y.last_name, y.rank);
所以,虽然 oracle 不支持类似 MYSQL的 REPLACE INTO 和 INSERT ... ON DUPLICATE KEY UPDATE,但由于 ORACLE 原生支持 MERGE INTO 语句,我们完全可以更改datax 的 OracleWriter 源码,通过 merge into 语句,实现 UPSERT 语义。
更改 DATAX oracleWriter 以通过 MERGE INTO 语句实现 UPSERT 语义 涉及改动的 datax源码中类和方法的改动点主要有:
com.alibaba.datax.plugin.writer.oraclewriter.OracleWriter.Job#init:更改该方法以允许用户配置 writeMode; com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil#dealWriteMode:更改该方法以获取用户配置的 uniqueKeys 并在调用 WriterUtil.getWriteTemplate 时传递 uniqueKeys; com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil#getWriteTemplate: 更改该方法,以在用户配置 writeMode 使用 replace 且配置了uniqueKeys时,拼接获取 ORACLE MERGE INTO 语句对应的 preparedStatement 字符串; com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task#init:更改该方法以获取用户配置的 uniqueKeys; com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task#calcWriteRecordSql:更改该方法以在调用 WriterUtil.getWriteTemplate 时传递 uniqueKeys; com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task#fillPreparedStatementColumnType(PreparedStatement, int, int, String, Column): 更改该方法以在用户配置 writeMode 使用 replace 且配置了uniqueKeys时,对 ORACLE MERGE INTO 语句对应的 preparedStatement 的变量进行 setString 等赋值操作;
com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil#getWriteTemplate 方法拼接获取的 ORACLE MERGE INTO 语句对应的 preparedStatement 字符串,示例内容如下:
MERGE INTO %s x
USING (SELECT ? as member_id, ? as first_name, ? as last_name, ? as rank FROM dual) y
ON (x.member_id = y.member_id and x.xxx = y.xx)
WHEN MATCHED THEN UPDATE SET
x.first_name = y.first_name,
x.last_name = y.last_name,
x.rank = y.rank
WHEN NOT MATCHED THEN INSERT(x.member_id, x.first_name, x.last_name, x.rank)
VALUES(?,?,?,?);
!关注不迷路~ 各种福利、资源定期分享!欢迎小伙伴们扫码添加明哥微信,后台加群交流学习。

边栏推荐
- Euler function and finding Euler function by linear sieve
- Count the number of special subsequences (0, 1, 2) DP
- Performance monster on arm64: installation and performance test of API gateway Apache APIs IX on AWS graviton3
- 【TcaplusDB知识库】TcaplusDB单据受理-建表审批介绍
- 高斯消元求n元方程组
- Analysis and summary of 2021ccpc online games
- [tcapulusdb knowledge base] tcapulusdb cluster management introduction
- (Yousheng small message-04) how to use mobile WPS for electronic signature on PDF
- 程序员面试这么重视考察概念还是第一次见
- Environ. Sci. Technol. (if=9.028) | impact of urban greening on atmospheric environment
猜你喜欢

树莓派开发笔记(十六):树莓派4B+安装mariadb数据库(mysql开源分支)并测试基本操作

Mac MySQL installation tutorial

Anonymity in Web3 and NFT

(small information for children to children-03) batch template production of basic information collection folder for children (including PDF, word and certificate folder)

欧拉函数和线性筛求欧拉函数

Nim游戏阶梯 Nim游戏和SG函数应用(集合游戏)

F2. nearest beautiful number (hard version)

C#/VB. Net to generate directory bookmarks when word is converted to PDF

Vivo large scale kubernetes cluster automation operation and maintenance practice

Nim game ladder Nim game and SG function application (set game)
随机推荐
Vivo large scale kubernetes cluster automation operation and maintenance practice
Prim finding minimum spanning tree (naive dense graph)
Type de condition pour ts Advanced
Ue5 random point in bounding boxf from stream
轻量级实时语义分割:ENet & ERFNet
求组合数四种方法
【TcaplusDB知识库】TcaplusDB单据受理-建表审批介绍
Apache apisik v2.14.1 exploratory release to expand into more fields
Performance monster on arm64: installation and performance test of API gateway Apache APIs IX on AWS graviton3
日志1111
Web3和NFT中的匿名性问题
Chapter VI i/o management
[tcapulusdb knowledge base] tcapulusdb cluster management introduction
【TcaplusDB知识库】Tmonitor单机安装指引介绍(二)
1051. 高度检查器
State compression DP example (traveling salesman problem and rectangle filling problem)
塔米狗知识|全面剖析国有企业并购含义及其作用
5.5 clock screensaver
2021ccpc online competition list
【ROS】MoveIt-rviz-七自由度机械臂仿真