当前位置:网站首页>Data Lake (19): SQL API reads Kafka data and writes it to Iceberg table in real time
Data Lake (19): SQL API reads Kafka data and writes it to Iceberg table in real time
2022-07-31 11:36:00 【Lanson】
SQL API 读取Kafka数据实时写入Iceberg表
从Kafka中实时读取数据写入到Iceberg表中,操作步骤如下:
一、首先需要创建对应的Iceberg表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
"'type'='iceberg'," +
"'catalog-type'='hadoop'," +
"'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.创建iceberg表 flink_iceberg_tbl
tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");
二、编写代码读取Kafka数据实时写入Iceberg
public class ReadKafkaToIceberg {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
/**
* 1.需要预先创建 Catalog 及Iceberg表
*/
//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
"'type'='iceberg'," +
"'catalog-type'='hadoop'," +
"'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.创建iceberg表 flink_iceberg_tbl
// tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");
//3.创建 Kafka Connector,连接消费Kafka中数据
tblEnv.executeSql("create table kafka_input_table(" +
" id int," +
" name varchar," +
" age int," +
" loc varchar" +
") with (" +
" 'connector' = 'kafka'," +
" 'topic' = 'flink-iceberg-topic'," +
" 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092'," +
" 'scan.startup.mode'='latest-offset'," +
" 'properties.group.id' = 'my-group-id'," +
" 'format' = 'csv'" +
")");
//4.配置 table.dynamic-table-options.enabled
Configuration configuration = tblEnv.getConfig().getConfiguration();
// 支持SQL语法中的 OPTIONS 选项
configuration.setBoolean("table.dynamic-table-options.enabled", true);
//5.写入数据到表 flink_iceberg_tbl3
tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 select id,name,age,loc from kafka_input_table");
//6.查询表数据
TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");
tableResult.print();
}
}
启动以上代码,向Kafka topic中生产如下数据:
1,zs,18,beijing
2,ls,19,shanghai
3,ww,20,beijing
4,ml,21,shanghai
我们可以看到控制台上有对应实时数据输出,查看对应的Icberg HDFS目录,数据写入成功.
边栏推荐
猜你喜欢
最新MySql安装教学,非常详细
cesium-Web网页优化进阶
AWS Amazon cloud account registration, free application for 12 months Amazon cloud server detailed tutorial
SQLServer2019 installation (Windows)
The latest MySql installation teaching, very detailed
下课看着文档走回实验室,我重新拾起了遗忘的SQL运算符
IDEA 配置方法注释自动参数
St. Regis Takeaway Project: New dishes and dishes paged query
「MySQL」- 基础增删改查
Cloudera Manager —— 端到端的企业数据中心管理工具
随机推荐
redis-enterprise use
安装MYSQL遇到问题:write configuration file卡主
7 days to learn Go, Go structure + Go range to learn
应用层基础 —— 认识URL
CWE4.8 -- 2022年危害最大的25种软件安全问题
oracle优化:instr做join条件很慢「建议收藏」
R语言做面板panelvar例子
3D激光SLAM:LeGO-LOAM论文解读---点云分割部分
Redis-基础
MySQL 的 limit 分页查询及性能问题
【Go事】一眼看穿 Go 的集合和切片
B/S架构模式的一个整体执行流程
Docker搭建Mysql主从复制
Unix知识:shell详细解读
【虚拟化生态平台】平台架构图&思路和实现细节
apisix-Getting Started
Usage of JOIN in MySQL
5 open source Rust web development frameworks, which one do you choose?
一文带你了解redux的工作流程——actionreducerstore
文件包含漏洞