当前位置:网站首页>How to use dataX to update the data in the downstream Oracle database with the update semantics?

How to use dataX to update the data in the downstream Oracle database with the update semantics?

2022-06-13 11:42:00 It Mingge

How to use DATAX With UPSERT Semantic update downstream ORACLE Data in the database ?

1 Business background

On the data architecture , Many big data projects , Will be HIVE/SPARK The result data calculated by the offline computing engine is synchronized to the online database of the downstream business system , To provide external services , And many business systems need to provide customers with stable 7*24 Hour data query function , Require data in the underlying database , Need to be accurate , There should be no missing data .

Specific to the selection of data synchronization tools ,datax It is a popular data integration tool open source by Alibaba , The plug-in mechanism enables efficient offline data synchronization between heterogeneous data , The current open source version datax The supported plug-ins are nearly 30 Many kinds , So many big data projects choose DATAX To synchronize offline data .

To ensure the integrity and accuracy of data , In the use of DATAX Data synchronization , At present, many projects use the method of deleting old data and then inserting new data generated by calculation ( By configuring preSql Delete old data ), At this time, when the amount of data to be synchronized is large , Delete old data and insert new data , It all takes a while , At this time, the tables in the downstream database will inevitably have a period of gap , The corresponding data cannot be queried .

How to solve this problem ?

DATAX One of the official recommended methods is to configure and use temporary tables , First, import data to the temporary table , When you're done rename To the online table ( You can configure in the job postSql Complete such operations ).

In addition to the temporary table, this curvilinear way to save the country , You can also try to UPSERT Semantic directly updates the target table data on the downstream database line .

that DATAX in , Different databases WRITER How are plug-ins implemented UPSERT What about semantic ?

2 DATAX Common databases WRITER How the plug-in is implemented UPSERT Semantic ?

  • datax Of MysqlWriter and oceanbasev10writer, The supporting configuration writeMode Parameter is insert/replace/update, This parameter can be used to control when writing data to the target table , The underlying the insert into/replace into/INSERT INTO ... ON DUPLICATE KEY UPDATE sentence :

    • among insert into When primary key / When the uniqueness index conflicts, the conflicting rows will not be written ;
    • The latter two do not encounter primary keys / The unique index conflicts with insert into Act in concert , In case of conflict, all fields of the original row will be replaced with a new row ;
  • datax Native OracleWriter and PostgresqlWriter, Configuration is not supported writeMode Parameters , The underlying implementation is through JDBC Connect remote Oracle/PG database , And execute the corresponding insert into ... sql Statement to write data to Oracle/pg, It will be submitted to the warehouse in batches internally .

that , Can I change the native OracleWriter To support the UPSERT Semantic insertion ORALCE Well ?

  1. ORACLE Of MERGE INTO sentence

Oracle 9i Introduced to the merge Statement support , adopt merge Can be in a SQL Statement to a table at the same time inserts and updates operation , Oracle 10g Yes MERGE The statement is enhanced as follows :

  • UPDATE or INSERT Clauses are optional
  • UPDATE and INSERT Clause can be added WHERE Clause
  • stay ON Constant filter predicates can be used in conditions to insert All rows into the target table , There is no need to connect the source table and the target table
  • UPDATE Clause can be followed by DELETE Clause to remove some unnecessary lines .

merge into The sentence syntax is as follows :

MERGE INTO [target-table] A USING [source-table sql] 
ON([conditional expression] and [...]...) 
WHEN MATCHED THEN
 [UPDATE sql] 
WHEN NOT MATCHED THEN 
[INSERT sql]

merge into The statement example is as follows :

MERGE INTO member_staging x
USING (SELECT member_id, first_name, last_name, rank FROM members) y
ON (x.member_id  = y.member_id)
WHEN MATCHED THEN
    UPDATE SET x.first_name 
= y.first_name, 
                        x.last_name = y.last_name, 
                        x.rank = y.rank
    WHERE x.first_name <> y.first_name OR 
           x.last_name <> y.last_name OR 
           x.rank <> y.rank 
WHEN NOT MATCHED THEN
    INSERT(x.member_id, x.first_name, x.last_name, x.rank)  
    VALUES(y.member_id, y.first_name, y.last_name, y.rank)
;    

therefore , although oracle Does not support similar MYSQL Of REPLACE INTO and INSERT ... ON DUPLICATE KEY UPDATE, But because of ORACLE Native support MERGE INTO sentence , We can completely change datax Of OracleWriter Source code , adopt merge into sentence , Realization UPSERT semantics .

  1. change DATAX oracleWriter In order to pass the MERGE INTO Statements for UPSERT semantics Involving changes datax The main change points of classes and methods in the source code are :
  • com.alibaba.datax.plugin.writer.oraclewriter.OracleWriter.Job#init: Change this method to allow the user to configure writeMode;
  • com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil#dealWriteMode: Change this method to get the user configured uniqueKeys And in the call WriterUtil.getWriteTemplate Time transfer uniqueKeys;
  • com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil#getWriteTemplate: Change the method , In user configuration writeMode Use replace And configured uniqueKeys when , Splicing access ORACLE MERGE INTO The statement corresponds to preparedStatement character string ;
  • com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task#init: Change this method to get the user configured uniqueKeys;
  • com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task#calcWriteRecordSql: Change the method to call WriterUtil.getWriteTemplate Time transfer uniqueKeys;
  • com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task#fillPreparedStatementColumnType(PreparedStatement, int, int, String, Column): Change this method to configure the writeMode Use replace And configured uniqueKeys when , Yes ORACLE MERGE INTO The statement corresponds to preparedStatement The variable of setString And so on ;

com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil#getWriteTemplate Method obtained by splicing ORACLE MERGE INTO The statement corresponds to preparedStatement character string , Examples are as follows :

MERGE INTO %s x
USING (SELECT ? as member_id, ? as first_name, ? as last_name, ? as rank FROM dual) y
ON (x.member_id  = y.member_id and x.xxx = y.xx)
WHEN MATCHED THEN UPDATE SET 
                x.first_name 
= y.first_name, 
                x.last_name = y.last_name, 
                x.rank = y.rank
WHEN NOT MATCHED THEN INSERT(x.member_id, x.first_name, x.last_name, x.rank)  
    VALUES(?,?,?,?)
;    

! Focus on not getting lost ~ All kinds of benefits 、 Resources are shared regularly ! Welcome to scan the code and add Mingge wechat , Backstage plus group exchange and learning .

IT Mingo
IT Mingo
原网站

版权声明
本文为[It Mingge]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/164/202206131131455478.html