当前位置:网站首页>Data Lake (19): SQL API reads Kafka data and writes it to Iceberg table in real time
Data Lake (19): SQL API reads Kafka data and writes it to Iceberg table in real time
2022-07-31 11:36:00 【Lanson】
SQL API 读取Kafka数据实时写入Iceberg表
从Kafka中实时读取数据写入到Iceberg表中,操作步骤如下:
一、首先需要创建对应的Iceberg表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
"'type'='iceberg'," +
"'catalog-type'='hadoop'," +
"'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.创建iceberg表 flink_iceberg_tbl
tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");
二、编写代码读取Kafka数据实时写入Iceberg
public class ReadKafkaToIceberg {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
/**
* 1.需要预先创建 Catalog 及Iceberg表
*/
//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
"'type'='iceberg'," +
"'catalog-type'='hadoop'," +
"'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.创建iceberg表 flink_iceberg_tbl
// tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");
//3.创建 Kafka Connector,连接消费Kafka中数据
tblEnv.executeSql("create table kafka_input_table(" +
" id int," +
" name varchar," +
" age int," +
" loc varchar" +
") with (" +
" 'connector' = 'kafka'," +
" 'topic' = 'flink-iceberg-topic'," +
" 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092'," +
" 'scan.startup.mode'='latest-offset'," +
" 'properties.group.id' = 'my-group-id'," +
" 'format' = 'csv'" +
")");
//4.配置 table.dynamic-table-options.enabled
Configuration configuration = tblEnv.getConfig().getConfiguration();
// 支持SQL语法中的 OPTIONS 选项
configuration.setBoolean("table.dynamic-table-options.enabled", true);
//5.写入数据到表 flink_iceberg_tbl3
tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 select id,name,age,loc from kafka_input_table");
//6.查询表数据
TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");
tableResult.print();
}
}
启动以上代码,向Kafka topic中生产如下数据:
1,zs,18,beijing
2,ls,19,shanghai
3,ww,20,beijing
4,ml,21,shanghai
我们可以看到控制台上有对应实时数据输出,查看对应的Icberg HDFS目录,数据写入成功.
边栏推荐
- 瑞吉外卖项目:新增菜品与菜品分页查询
- AWS亚马逊云账号注册,免费申请12个月亚马逊云服务器详细教程
- MySQL 的 limit 分页查询及性能问题
- 初始JDBC 编程
- Can I find a Go job in 7 days?Learn Go with arrays and pointers
- 3D激光SLAM:LeGO-LOAM论文解读---完整篇
- 便利贴--46{基于移动端长页中分页加载逻辑封装}
- 使用内存映射加快PyTorch数据集的读取
- Three-tier architecture service, dao, controller layer
- If the value of the enum map does not exist, deserialization is not performed
猜你喜欢
随机推荐
台达PLC出现通信错误或通信超时或下载时提示机种不符的解决办法总结
三层架构service、dao、controller层
St. Regis Takeaway Project: File Upload and Download
准确率(Accuracy)、精度(Precision)、召回率(Recall)和 mAP 的图解
LeetCode 1161.最大层内元素和:层序遍历
Docker安装canal、mysql进行简单测试与实现redis和mysql缓存一致性
若枚举映射的值不存在,则不进行反序列化
3D激光SLAM:LeGO-LOAM论文解读---点云分割部分
Many mock tools, this time I chose the right one
下课看着文档走回实验室,我重新拾起了遗忘的SQL运算符
数据持久化技术——MP
学习爬虫之Scrapy框架学习(1)---Scrapy框架初学习及豆瓣top250电影信息获取的实战!
If the value of the enum map does not exist, deserialization is not performed
【软件工程之美 - 专栏笔记】33 | 测试工具:为什么不应该通过QQ/微信/邮件报Bug?
CoCube群机器人预览→资讯剧透←
redis-enterprise use
502 bad gateway原因、解决方法
【Go事】一眼看穿 Go 的集合和切片
“带薪划水”偷刷阿里老哥的面经宝典,三次挑战字节,终成正果
3D激光SLAM:LeGO-LOAM论文解读---完整篇









