当前位置:网站首页>Flink Oracle CDC写入到HDFS
Flink Oracle CDC写入到HDFS
2022-08-05 05:14:00 【IT_xhf】
依赖包
引用maven依赖包
<oracle.cdc.version>2.2.0</oracle.cdc.version>
<hadoop.version>2.8.2</hadoop.version>
<avro.version>1.8.2</avro.version>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-oracle-cdc</artifactId>
<version>${oracle.cdc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
Oracle CDC Source
Properties properties = new Properties();
properties.put("decimal.handling.mode", "double");
DebeziumDeserializationSchema deserialization = new JsonDebeziumDeserializationSchema();
DebeziumSourceFunction source = OracleSource.builder().hostname("ip")
.port(1521)
.database("db_name")
.schemaList("schema_name")
.tableList("schema_name.table_name")
.username("username")
.password("password")
.startupOptions(StartupOptions.latest())
.debeziumProperties(properties)
.deserializer(deserialization)
.build();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.getCheckpointConfig().setCheckpointInterval(2*60*1000);
DataStreamSource streamSource = streamEnv.addSource(source);
针对Oracle的Number类型字段,必须要指定properties.put("decimal.handling.mode", "double");属性,要不然该类型的字段解析出来的是字节数组。
HDFS Sink
// org.apache.flink.configuration.Configuration conf = new org.apache.flink.configuration.Configuration();
// conf.setString("flink.hadoop.fs.defaultFS", "hdfs://emr-cluster");
// conf.setString("flink.hadoop.dfs.nameservices", "emr-cluster");
// conf.setString("flink.hadoop.dfs.ha.namenodes.emr-cluster", "nn1,nn2");
// conf.setString("flink.hadoop.dfs.namenode.rpc-address.emr-cluster.nn1","nn1_ip:port");
// conf.setString("flink.hadoop.dfs.namenode.rpc-address.emr-cluster.nn2", "nn2_ip:port");
// conf.setString("flink.hadoop.dfs.client.failover.proxy.provider.emr-cluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
// FileSystem.initialize(conf, null);
ParquetWriterFactory<RowData> factory = ParquetRowDataBuilder.createWriterFactory(rowType, new Configuration(), true);
Path basePath = new Path("hdfs://ip:port/user/hive/warehouse/stringvalue");
FileSink<RowData> fileSink = FileSink.forBulkFormat(basePath, factory)
// 为了解决小文件,加入了文件生成策略 最大256MB, 10分钟生成一次文件,但是好像没有效果,不知道是否和checkpoint的时间有关系
.withRollingPolicy(new FileSystemTableSink.TableRollingPolicy(true, 256*1024*1024, 10*60))
.withBucketAssigner(new BasePathBucketAssigner<>()).build();
streamSource .sinkTo(fileSink);
对于写入HDFS高可用,需要在程序里面加入以下代码
org.apache.flink.configuration.Configuration conf = new org.apache.flink.configuration.Configuration();
conf.setString("flink.hadoop.fs.defaultFS", "hdfs://emr-cluster");
conf.setString("flink.hadoop.dfs.nameservices", "emr-cluster");
conf.setString("flink.hadoop.dfs.ha.namenodes.emr-cluster", "nn1,nn2");
conf.setString("flink.hadoop.dfs.namenode.rpc-address.emr-cluster.nn1","nn1_ip:port");
conf.setString("flink.hadoop.dfs.namenode.rpc-address.emr-cluster.nn2", "nn2_ip:port");
conf.setString("flink.hadoop.dfs.client.failover.proxy.provider.emr-cluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
FileSystem.initialize(conf, null);
总结
1、对于Number字段默认情况下不会解析出来,需要设置properties.put(“decimal.handling.mode”, “double”);解析出来的字段变成double类型,对于数据比较大的,会以科学计数法显示。
2、高可用设置,如果不想在项目中引入core-site.xml和hdfs-site.xml文件,则需要在代码中引用。
边栏推荐
- 【过一下10】sklearn使用记录
- 小白一枚各位大牛轻虐虐
- 数据库期末考试,选择、判断、填空题汇总
- 机器学习(一) —— 机器学习基础
- DOM及其应用
- Database experiment five backup and recovery
- RDD和DataFrame和Dataset
- In Opencv, imag=cv2.cvtColor(imag,cv2.COLOR_BGR2GRAY) error: error:!_src.empty() in function 'cv::cvtColor'
- [Redis] Resid的删除策略
- Error creating bean with name 'configDataContextRefresher' defined in class path resource
猜你喜欢

解决:Unknown column ‘id‘ in ‘where clause‘ 问题

flink项目开发-flink的scala shell命令行交互模式开发

Flink accumulator Counter 累加器 和 计数器

Calling Matlab configuration in pycharm: No module named 'matlab.engine'; 'matlab' is not a package

第5讲 使用pytorch实现线性回归

flink yarn-session的两种使用方式
![[Study Notes Dish Dog Learning C] Classic Written Exam Questions of Dynamic Memory Management](/img/0b/f7d9205c616f7785519cf94853d37d.png)
[Study Notes Dish Dog Learning C] Classic Written Exam Questions of Dynamic Memory Management

OFDM Lecture 16 5 -Discrete Convolution, ISI and ICI on DMT/OFDM Systems

Flink Broadcast 广播变量

Convert the paper official seal in the form of a photo into an electronic official seal (no need to download ps)
随机推荐
位运算符与逻辑运算符的区别
【技能】长期更新
【过一下9】卷积
Dashboard Display | DataEase Look at China: Data Presents China's Capital Market
el-pagination左右箭头替换成文字上一页和下一页
flink项目开发-配置jar依赖,连接器,类库
学习总结day5
Lecture 3 Gradient Tutorial Gradient Descent and Stochastic Gradient Descent
02.01-----The role of parameter reference "&"
【After a while 6】Machine vision video 【After a while 2 was squeezed out】
The fourth back propagation back propagation
【过一下11】随机森林和特征工程
Error creating bean with name 'configDataContextRefresher' defined in class path resource
分布式和集群
拿出接口数组对象中的所有name值,取出同一个值
第三讲 Gradient Tutorial梯度下降与随机梯度下降
学习总结week2_5
ES6基础语法
[Software Exam System Architect] Software Architecture Design ③ Domain-Specific Software Architecture (DSSA)
Matplotlib(二)—— 子图