当前位置:网站首页>Flink Oracle CDC写入到HDFS
Flink Oracle CDC写入到HDFS
2022-08-05 05:14:00 【IT_xhf】
依赖包
引用maven依赖包
<oracle.cdc.version>2.2.0</oracle.cdc.version>
<hadoop.version>2.8.2</hadoop.version>
<avro.version>1.8.2</avro.version>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-oracle-cdc</artifactId>
<version>${oracle.cdc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
Oracle CDC Source
Properties properties = new Properties();
properties.put("decimal.handling.mode", "double");
DebeziumDeserializationSchema deserialization = new JsonDebeziumDeserializationSchema();
DebeziumSourceFunction source = OracleSource.builder().hostname("ip")
.port(1521)
.database("db_name")
.schemaList("schema_name")
.tableList("schema_name.table_name")
.username("username")
.password("password")
.startupOptions(StartupOptions.latest())
.debeziumProperties(properties)
.deserializer(deserialization)
.build();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.getCheckpointConfig().setCheckpointInterval(2*60*1000);
DataStreamSource streamSource = streamEnv.addSource(source);
针对Oracle的Number类型字段,必须要指定properties.put("decimal.handling.mode", "double");
属性,要不然该类型的字段解析出来的是字节数组。
HDFS Sink
// org.apache.flink.configuration.Configuration conf = new org.apache.flink.configuration.Configuration();
// conf.setString("flink.hadoop.fs.defaultFS", "hdfs://emr-cluster");
// conf.setString("flink.hadoop.dfs.nameservices", "emr-cluster");
// conf.setString("flink.hadoop.dfs.ha.namenodes.emr-cluster", "nn1,nn2");
// conf.setString("flink.hadoop.dfs.namenode.rpc-address.emr-cluster.nn1","nn1_ip:port");
// conf.setString("flink.hadoop.dfs.namenode.rpc-address.emr-cluster.nn2", "nn2_ip:port");
// conf.setString("flink.hadoop.dfs.client.failover.proxy.provider.emr-cluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
// FileSystem.initialize(conf, null);
ParquetWriterFactory<RowData> factory = ParquetRowDataBuilder.createWriterFactory(rowType, new Configuration(), true);
Path basePath = new Path("hdfs://ip:port/user/hive/warehouse/stringvalue");
FileSink<RowData> fileSink = FileSink.forBulkFormat(basePath, factory)
// 为了解决小文件,加入了文件生成策略 最大256MB, 10分钟生成一次文件,但是好像没有效果,不知道是否和checkpoint的时间有关系
.withRollingPolicy(new FileSystemTableSink.TableRollingPolicy(true, 256*1024*1024, 10*60))
.withBucketAssigner(new BasePathBucketAssigner<>()).build();
streamSource .sinkTo(fileSink);
对于写入HDFS高可用,需要在程序里面加入以下代码
org.apache.flink.configuration.Configuration conf = new org.apache.flink.configuration.Configuration();
conf.setString("flink.hadoop.fs.defaultFS", "hdfs://emr-cluster");
conf.setString("flink.hadoop.dfs.nameservices", "emr-cluster");
conf.setString("flink.hadoop.dfs.ha.namenodes.emr-cluster", "nn1,nn2");
conf.setString("flink.hadoop.dfs.namenode.rpc-address.emr-cluster.nn1","nn1_ip:port");
conf.setString("flink.hadoop.dfs.namenode.rpc-address.emr-cluster.nn2", "nn2_ip:port");
conf.setString("flink.hadoop.dfs.client.failover.proxy.provider.emr-cluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
FileSystem.initialize(conf, null);
总结
1、对于Number字段默认情况下不会解析出来,需要设置properties.put(“decimal.handling.mode”, “double”);解析出来的字段变成double类型,对于数据比较大的,会以科学计数法显示。
2、高可用设置,如果不想在项目中引入core-site.xml和hdfs-site.xml文件,则需要在代码中引用。
边栏推荐
- [Go through 7] Notes from the first section of the fully connected neural network video
- Wise Force Deleter强制删除工具
- The software design experiment four bridge model experiment
- "PHP8 Beginner's Guide" A brief introduction to PHP
- DOM and its applications
- coppercam primer [6]
- day11-函数作业
- 学习总结week3_1函数
- Geek卸载工具
- Flink HA安装配置实战
猜你喜欢
随机推荐
物理层的接口有哪几个方面的特性?各包含些什么内容?
学习总结week2_4
redis cache clearing strategy
Multi-threaded query results, add List collection
[Go through 4] 09-10_Classic network analysis
el-table鼠标移入表格改变显示背景色
【After a while 6】Machine vision video 【After a while 2 was squeezed out】
学习总结week3_4类与对象
分布式和集群
Pandas(五)—— 分类数据、读取数据库
【Untitled】
flink中文文档-目录v1.4
Flink HA配置
Lecture 5 Using pytorch to implement linear regression
【过一下3】卷积&图像噪音&边缘&纹理
关于基于若依框架的路由跳转
对数据排序
【技能】长期更新
Xiaobai, you big bulls are lightly abused
Dashboard Display | DataEase Look at China: Data Presents China's Capital Market