当前位置:网站首页>Oracle data to mysql FlinkSQL CDC to achieve synchronization
Oracle data to mysql FlinkSQL CDC to achieve synchronization
2022-08-02 01:00:00 【m0_67392811】
环境准备
1、flink 1.13.0
2、oracle 11g
3、flink-connector-oracle-cdc 2.1.0
1、oracle环境配置
首先需要安装oracle环境,参考 https://blog.csdn.net/qq_36039236/article/details/124224500?spm=1001.2014.3001.5502
进入容器进行配置:
docker exec -it oracle11 bash
# 切换到oracle用户
su - oracle
# 创建数据需要的目录,需要提前创建,否则报错目录不存在
mkdir /home/oracle/oracle-data-test
sqlplus /nolog
# 以 DBA 身份连接到数据库
SQL> conn /as sysdba
接下来进行相关重要的配置:
-- 启用日志归档
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/home/oracle/oracle-data-test' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
-- 检查日志归档是否开启
archive log list;
-- 为捕获的数据库启用补充日志记录,以便数据更改捕获更改的数据库行之前的状态,下面说明了如何在数据库级别进行配置.
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
-- 创建表空间
CREATE TABLESPACE logminer_tbs DATAFILE '/home/oracle/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
-- 创建用户family绑定表空间LOGMINER_TBS
CREATE USER family IDENTIFIED BY zyhcdc DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
-- 授予family用户dba的权限
grant connect,resource,dba to family;
-- 并授予权限
GRANT CREATE SESSION TO family;
GRANT SELECT ON V_$DATABASE to family;
GRANT FLASHBACK ANY TABLE TO family;
GRANT SELECT ANY TABLE TO family;
GRANT SELECT_CATALOG_ROLE TO family;
GRANT EXECUTE_CATALOG_ROLE TO family;
GRANT SELECT ANY TRANSACTION TO family;
GRANT EXECUTE ON SYS.DBMS_LOGMNR TO family;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO family;
GRANT CREATE TABLE TO family;
GRANT LOCK ANY TABLE TO family;
GRANT ALTER ANY TABLE TO family;
GRANT CREATE SEQUENCE TO family;
GRANT EXECUTE ON DBMS_LOGMNR TO family;
GRANT EXECUTE ON DBMS_LOGMNR_D TO family;
GRANT SELECT ON V_$LOG TO family;
GRANT SELECT ON V_$LOG_HISTORY TO family;
GRANT SELECT ON V_$LOGMNR_LOGS TO family;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO family;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO family;
GRANT SELECT ON V_$LOGFILE TO family;
GRANT SELECT ON V_$ARCHIVED_LOG TO family;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO family;
本地使用Navcat连接oracle:
-- 创建 STUDENT_INFO 表
create table student_info (
sid number(10) constraint pk_sid primary key,
sname varchar2(10),
sex varchar2(2)
);
-- 修改STUDENT_INFO表让其支持增量日志,这句先在Oracle里创建user表再执行
ALTER TABLE FAMILY.STUDENT_INFO ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

2、flink cdc程序开发
maven依赖:
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<flink.version>1.13.0</flink.version>
<scala.version>2.12</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc10</artifactId>
<version>19.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<!--Flink 默认使用的是 slf4j 记录日志,相当于一个日志的接口,我们这里使用 log4j 作为
具体的日志实现-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
功能代码:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* 测试 flink cdc 实时获取oracle数据变化
* @author zyh
*/
public class FlinkCdcOracleExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.disableOperatorChaining();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE student_info (
" +
" SNO INT NOT NULL,
" + // 注意字段名要大写
" SNAME STRING,
" +
" SEX STRING,
" +
" PRIMARY KEY(SNO) NOT ENFORCED
" +
" ) WITH (
" +
" 'connector' = 'oracle-cdc',
" +
" 'hostname' = 'localhost',
" +
" 'port' = '1521',
" +
" 'username' = 'family',
" +
" 'password' = 'zyhcdc',
" +
" 'database-name' = 'helowin',
" +
" 'schema-name' = 'FAMILY',
" + // 注意这里要大写
" 'table-name' = 'STUDENT_INFO',
" +
" 'debezium.log.mining.continuous.mine'='true',
"+
" 'debezium.log.mining.strategy'='online_catalog',
" +
" 'debezium.database.tablename.case.insensitive'='false',
"+
" 'scan.startup.mode' = 'initial')");
TableResult tableResult = tableEnv.executeSql("select * from student_info");
tableResult.print();
env.execute();
}
}
启动测试:
由于设置了’scan.startup.mode’ = ‘initial’,所以程序会初始化表中现有的数据.
现在对数据表执行以下操作:
-- 新增一条数据
insert into student_info (sno, sname, sex)
values (28, 'zyh-test', 'm');
-- 更新数据
update student_info t set t.sname='zyh666', t.sex='m' where t.sno=26;
-- 删除数据
delete from student_info where sno = 25;
程序执行结果:
自此,oracle-cdc的功能已经测试成功,将对应的结果写到mysql表中,可以使用flink sql将对应的结果写入mysql中,由于上面执行的结果属于撤回流,需要在flink sql中创建mysql sink表的时候指定主键,写入mysql的功能这里就不再贴出,小伙伴可以自己下去实现.
3、中间遇到的问题,排查解决
1、读取数据有延迟,在create语句配置以下两个选项进行解决:
'debezium.log.mining.strategy'='online_catalog',
'debezium.log.mining.continuous.mine'='true'
2、找不到表
[ERROR] Could not execute SQL statement. Reason:
io.debezium.DebeziumException: Supplemental logging not configured for table FAMILY.STUDENT_INFO Use command: ALTER TABLE LIUYUN.flink ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS
参看文档: https://docs.oracle.com/cd/E11882_01/server.112/e41084/sql_elements008.htm
可以在 create 语句中加上 :
'debezium.database.tablename.case.insensitive'='false'
参考资料
另外,除此之外,还可以使用kafka connect结合debezium的方式采集oracle数据,不过使用这种方式采集的是最原始的oracle变更日志数据,需要开发者再去进行一次日志内容的解析,增加了开发的难度以及维护成本,这种方式的具体操作步骤将会在下篇文章给出,希望感兴趣的小伙伴能够点个关注,哈哈~
先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在.深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小.自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前.因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担.添加下方名片,即可获取全套学习资料哦
边栏推荐
- String splitting function strtok exercise
- feign异常传递的两种方式 fallbackfactory和全局处理 获取服务端自定义异常
- Go语学习笔记 - gorm使用 - gorm处理错误 Web框架Gin(十)
- Knowing the inorder traversal of the array and the preorder traversal of the array, return the postorder history array
- How to use the go language standard library fmt package
- GateWay实现负载均衡
- 简单工厂模式
- hutool工具-----JSON工具-JSONUtil
- nodeJs--mime module
- 【软件工程之美 - 专栏笔记】34 | 账号密码泄露成灾,应该怎样预防?
猜你喜欢

期货公司开户实力经纪业务的规模

flyway的快速入门教程

C language character and string function summary (2)

实现删除-一个字符串中的指定字母,如:字符串“abcd”,删除其中的”a”字母,剩余”bcd”,也可以传递多个需要删除的字符,传递”ab”也可以做到删除”ab”,剩余”cd”。

NFT到底有哪些实际用途?

BGP综合实验 建立对等体、路由反射器、联邦、路由宣告及聚合

Go语学习笔记 - gorm使用 - gorm处理错误 Web框架Gin(十)

IDEA版Postman插件Restful Fast Request,细节到位,功能好用

Go 1.18 的那些事——工作区、模糊测试、泛型

Realize deletion - a specified letter in a string, such as: the string "abcd", delete the "a" letter in it, the remaining "bcd", you can also pass multiple characters to be deleted, and pass "ab" can
随机推荐
信息收集之cms指纹识别
JS中localStorage和sessionStorage
632. Minimum interval
辨析内存函数memset、memcmp、memmove以及memcpy
Angr(十二)——官方文档(Part3)
第 45 届ICPC亚洲区域赛(上海)G-Fibonacci
好的期货公司开户让人省心省钱
String splitting function strtok exercise
期货开户是否有资金门槛?
管理基础知识14
feign异常传递的两种方式 fallbackfactory和全局处理 获取服务端自定义异常
Flex布局详解
青蛙跳台阶
管理基础知识19
管理基础知识18
mapbox使用教程
ES6对箭头函数的理解
datagrip连接mysql数据库
Go语学习笔记 - gorm使用 - gorm处理错误 Web框架Gin(十)
22.卷积神经网络实战-Lenet5