当前位置:网站首页>Flink Oracle CDC写入到HDFS
Flink Oracle CDC写入到HDFS
2022-08-05 05:14:00 【IT_xhf】
依赖包
引用maven依赖包
<oracle.cdc.version>2.2.0</oracle.cdc.version>
<hadoop.version>2.8.2</hadoop.version>
<avro.version>1.8.2</avro.version>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-oracle-cdc</artifactId>
<version>${oracle.cdc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
Oracle CDC Source
Properties properties = new Properties();
properties.put("decimal.handling.mode", "double");
DebeziumDeserializationSchema deserialization = new JsonDebeziumDeserializationSchema();
DebeziumSourceFunction source = OracleSource.builder().hostname("ip")
.port(1521)
.database("db_name")
.schemaList("schema_name")
.tableList("schema_name.table_name")
.username("username")
.password("password")
.startupOptions(StartupOptions.latest())
.debeziumProperties(properties)
.deserializer(deserialization)
.build();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.getCheckpointConfig().setCheckpointInterval(2*60*1000);
DataStreamSource streamSource = streamEnv.addSource(source);
针对Oracle的Number类型字段,必须要指定properties.put("decimal.handling.mode", "double");属性,要不然该类型的字段解析出来的是字节数组。
HDFS Sink
// org.apache.flink.configuration.Configuration conf = new org.apache.flink.configuration.Configuration();
// conf.setString("flink.hadoop.fs.defaultFS", "hdfs://emr-cluster");
// conf.setString("flink.hadoop.dfs.nameservices", "emr-cluster");
// conf.setString("flink.hadoop.dfs.ha.namenodes.emr-cluster", "nn1,nn2");
// conf.setString("flink.hadoop.dfs.namenode.rpc-address.emr-cluster.nn1","nn1_ip:port");
// conf.setString("flink.hadoop.dfs.namenode.rpc-address.emr-cluster.nn2", "nn2_ip:port");
// conf.setString("flink.hadoop.dfs.client.failover.proxy.provider.emr-cluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
// FileSystem.initialize(conf, null);
ParquetWriterFactory<RowData> factory = ParquetRowDataBuilder.createWriterFactory(rowType, new Configuration(), true);
Path basePath = new Path("hdfs://ip:port/user/hive/warehouse/stringvalue");
FileSink<RowData> fileSink = FileSink.forBulkFormat(basePath, factory)
// 为了解决小文件,加入了文件生成策略 最大256MB, 10分钟生成一次文件,但是好像没有效果,不知道是否和checkpoint的时间有关系
.withRollingPolicy(new FileSystemTableSink.TableRollingPolicy(true, 256*1024*1024, 10*60))
.withBucketAssigner(new BasePathBucketAssigner<>()).build();
streamSource .sinkTo(fileSink);
对于写入HDFS高可用,需要在程序里面加入以下代码
org.apache.flink.configuration.Configuration conf = new org.apache.flink.configuration.Configuration();
conf.setString("flink.hadoop.fs.defaultFS", "hdfs://emr-cluster");
conf.setString("flink.hadoop.dfs.nameservices", "emr-cluster");
conf.setString("flink.hadoop.dfs.ha.namenodes.emr-cluster", "nn1,nn2");
conf.setString("flink.hadoop.dfs.namenode.rpc-address.emr-cluster.nn1","nn1_ip:port");
conf.setString("flink.hadoop.dfs.namenode.rpc-address.emr-cluster.nn2", "nn2_ip:port");
conf.setString("flink.hadoop.dfs.client.failover.proxy.provider.emr-cluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
FileSystem.initialize(conf, null);
总结
1、对于Number字段默认情况下不会解析出来,需要设置properties.put(“decimal.handling.mode”, “double”);解析出来的字段变成double类型,对于数据比较大的,会以科学计数法显示。
2、高可用设置,如果不想在项目中引入core-site.xml和hdfs-site.xml文件,则需要在代码中引用。
边栏推荐
猜你喜欢

flink实例开发-详细使用指南

Convert the paper official seal in the form of a photo into an electronic official seal (no need to download ps)

Geek卸载工具

2022 Hangzhou Electric Multi-School 1st Session 01

Flink Table API 和 SQL之概述

【过一下12】整整一星期没记录

Matplotlib(二)—— 子图

vscode要安装的插件
![[Let's pass 14] A day in the study room](/img/fc/ff4161db8ed13a0c8ef75b066b8eab.png)
[Let's pass 14] A day in the study room

flink基本原理及应用场景分析
随机推荐
redis 缓存清除策略
【After a while 6】Machine vision video 【After a while 2 was squeezed out】
NodeJs接收上传文件并自定义保存路径
flink基本原理及应用场景分析
[Go through 10] sklearn usage record
Do you use tomatoes to supervise your peers?Add my study room, come on together
有用番茄来监督自己的同道中人吗?加一下我的自习室,一起加油
【过一下4】09-10_经典网络解析
In Opencv, imag=cv2.cvtColor(imag,cv2.COLOR_BGR2GRAY) error: error:!_src.empty() in function 'cv::cvtColor'
【MySQL】数据库多表链接的查询方式
Mesos学习
Pandas(五)—— 分类数据、读取数据库
拿出接口数组对象中的所有name值,取出同一个值
JSX基础
【过一下12】整整一星期没记录
【零基础开发NFT智能合约】如何使用工具自动生成NFT智能合约带白名单可Mint无需写代码
My 的第一篇博客!!!
第二讲 Linear Model 线性模型
[Decoding tools] Some online tools for Bitcoin
coppercam primer [6]