当前位置:网站首页>Data Lake (19): SQL API reads Kafka data and writes it to Iceberg table in real time
Data Lake (19): SQL API reads Kafka data and writes it to Iceberg table in real time
2022-07-31 11:36:00 【Lanson】
SQL API 读取Kafka数据实时写入Iceberg表
从Kafka中实时读取数据写入到Iceberg表中,操作步骤如下:
一、首先需要创建对应的Iceberg表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
"'type'='iceberg'," +
"'catalog-type'='hadoop'," +
"'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.创建iceberg表 flink_iceberg_tbl
tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");
二、编写代码读取Kafka数据实时写入Iceberg
public class ReadKafkaToIceberg {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
/**
* 1.需要预先创建 Catalog 及Iceberg表
*/
//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
"'type'='iceberg'," +
"'catalog-type'='hadoop'," +
"'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.创建iceberg表 flink_iceberg_tbl
// tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");
//3.创建 Kafka Connector,连接消费Kafka中数据
tblEnv.executeSql("create table kafka_input_table(" +
" id int," +
" name varchar," +
" age int," +
" loc varchar" +
") with (" +
" 'connector' = 'kafka'," +
" 'topic' = 'flink-iceberg-topic'," +
" 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092'," +
" 'scan.startup.mode'='latest-offset'," +
" 'properties.group.id' = 'my-group-id'," +
" 'format' = 'csv'" +
")");
//4.配置 table.dynamic-table-options.enabled
Configuration configuration = tblEnv.getConfig().getConfiguration();
// 支持SQL语法中的 OPTIONS 选项
configuration.setBoolean("table.dynamic-table-options.enabled", true);
//5.写入数据到表 flink_iceberg_tbl3
tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 select id,name,age,loc from kafka_input_table");
//6.查询表数据
TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");
tableResult.print();
}
}
启动以上代码,向Kafka topic中生产如下数据:
1,zs,18,beijing
2,ls,19,shanghai
3,ww,20,beijing
4,ml,21,shanghai
我们可以看到控制台上有对应实时数据输出,查看对应的Icberg HDFS目录,数据写入成功.
边栏推荐
- “带薪划水”偷刷阿里老哥的面经宝典,三次挑战字节,终成正果
- The latest MySql installation teaching, very detailed
- MySQL 行级锁(行锁、临键锁、间隙锁)
- ApiPost is really fragrant and powerful, it's time to throw away Postman and Swagger
- 基于Multisim的函数信号发生器–方波、三角波、正弦波[通俗易懂]
- Usage of JOIN in MySQL
- 「R」使用ggpolar绘制生存关联网络图
- 透过开发抽奖小程序,体会创新与迭代
- 如何正确地把服务器端返回的文件二进制流写入到本地保存成文件
- Redis-基础
猜你喜欢

After class, watching the documentation and walking back to the lab, I picked up the forgotten SQL operators again
![[Virtualization ecological platform] Raspberry Pi installation virtualization platform operation process](/img/23/d4754ec38e50f320fc4ed90a1e5bbc.png)
[Virtualization ecological platform] Raspberry Pi installation virtualization platform operation process

3D激光SLAM:LeGO-LOAM论文解读---完整篇

淀粉与纤维素

【虚拟化生态平台】平台架构图&思路和实现细节

Android studio连接MySQL并完成简单的登录注册功能

分布式事务——分布式事务简介、分布式事务框架 Seata(AT模式、Tcc模式、Tcc Vs AT)、分布式事务—MQ
![[Virtualization Ecological Platform] Platform Architecture Diagram & Ideas and Implementation Details](/img/a5/29c59399eea5466277a840922bdcef.png)
[Virtualization Ecological Platform] Platform Architecture Diagram & Ideas and Implementation Details

5 个开源的 Rust Web 开发框架,你选择哪个?

Redis学习笔记-3.慢查询和其他高级数据结构
随机推荐
第十二章 使用中的 OpenAPI 属性
strings包详细文档+示例
Docker搭建Mysql主从复制
SQL - Left join, Right join, Inner join
pycharm汉化教程(碧蓝幻想汉化插件安装)
使用 Excel 读取 SAP ABAP CDS View 通过 ODBC 暴露出来的数据
Obsidian设置图床
蓝牙协议栈开发板 STM32F1 跑蓝牙协议栈 –传统蓝牙搜索演示以及实现原理[通俗易懂]
IDEA 配置方法注释自动参数
“带薪划水”偷刷阿里老哥的面经宝典,三次挑战字节,终成正果
If the value of the enum map does not exist, deserialization is not performed
deeplab implements its own remote sensing geological segmentation dataset
分布式事务Seata详细使用教程
Power BI----几个常用的分析方法和相适应的视觉对象
Many mock tools, this time I chose the right one
musl Reference Manual
关于Mysql数据库的介绍
线程池 ThreadPoolExecutor 详解
R 语言data.frame 中的另一行中减去一行
Redis学习笔记-3.慢查询和其他高级数据结构