当前位置:网站首页>通过Flink-Sql将Kafka数据写入HDFS
通过Flink-Sql将Kafka数据写入HDFS
2022-08-05 05:14:00 【IT_xhf】
系列文章目录
提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加
例如:第一章 Python 机器学习入门之pandas的使用
提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档
前言
近期,公司有需求将Kafka的数据写入到Hive表中,当时看到Flink有一个File Connector可以将文件写入到HDFS,所以开始了解Flink-Sql写入到HDFS的使用。
一、创建Hive Catalog
将Flink-Sql的元数据通过hive catalog保存起来。这样通过Flink Sql创建的表都会保存到Hive中。
CREATE CATALOG myhive_default WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/etc/hive/conf'
);
use catalog myhive_default;
二、创建表
1.创建Kafka表
CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
log_ts TIMESTAMP(3),
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP column
) WITH (
'connector' = 'kafka',
'topic' = 'kafka2hive2',
'properties.bootstrap.servers' = 'ip1:9092,ip2:9092,ip3:9092',
'properties.group.id' = 'kafka2hive',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
2.创建Hive表
CREATE external TABLE fs_table (
user_id STRING,
order_amount DOUBLE
) partitioned by (dt string,h string,m string)
stored as ORC
TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',
'sink.partition-commit.delay'='0s',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.policy.kind'='metastore');
3. 执行同步语句
set execution.checkpointing.interval=10sec;
insert into fs_table
SELECT user_id, order_amount,DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH'), DATE_FORMAT(log_ts, 'mm') FROM kafka_table;
总结
不开启checkpoint, 写入到Hive的文件都是inprogress状态, 所以在执行之前要开启checkpoint。任务checkpoint后,临时文件会变成正式文件。
参考资料
边栏推荐
- 02.01-----The role of parameter reference "&"
- Using pip to install third-party libraries in Pycharm fails to install: "Non-zero exit code (2)" solution
- 【After a while 6】Machine vision video 【After a while 2 was squeezed out】
- range函数作用
- day6-列表作业
- Flink 状态与容错 ( state 和 Fault Tolerance)
- 【过一下6】机器视觉视频 【过一下2被挤掉了】
- Flink Distributed Cache 分布式缓存
- 数据库期末考试,选择、判断、填空题汇总
- The role of the range function
猜你喜欢

vscode+pytorch use experience record (personal record + irregular update)

第5讲 使用pytorch实现线性回归

Lecture 3 Gradient Tutorial Gradient Descent and Stochastic Gradient Descent

将照片形式的纸质公章转化为电子公章(不需要下载ps)

Difference between for..in and for..of

Multi-threaded query results, add List collection

Wise Force Deleter强制删除工具
![[Go through 4] 09-10_Classic network analysis](/img/f2/e6e71869b8ab014cc1eea0537fc2e7.png)
[Go through 4] 09-10_Classic network analysis

Geek卸载工具

SQL(一) —— 增删改查
随机推荐
Difference between for..in and for..of
【Over 16】Looking back at July
小白一枚各位大牛轻虐虐
Wise Force Deleter强制删除工具
flink实例开发-batch批处理实例
day12函数进阶作业
[Go through 7] Notes from the first section of the fully connected neural network video
CAP+BASE
02.01-----The role of parameter reference "&"
pycharm中调用Matlab配置:No module named ‘matlab.engine‘; ‘matlab‘ is not a package
学习总结week3_2函数进阶
Flink accumulator Counter 累加器 和 计数器
基于Flink CDC实现实时数据采集(四)-Sink接口实现
Matplotlib(三)—— 实践
ES6 Set、WeakSet
vscode+pytorch使用经验记录(个人记录+不定时更新)
day8字典作业
flink部署操作-flink standalone集群安装部署
OFDM Lecture 16 5 -Discrete Convolution, ISI and ICI on DMT/OFDM Systems
分布式和集群