当前位置:网站首页>Data Lake (19): SQL API reads Kafka data and writes it to Iceberg table in real time
Data Lake (19): SQL API reads Kafka data and writes it to Iceberg table in real time
2022-07-31 11:36:00 【Lanson】
SQL API 读取Kafka数据实时写入Iceberg表
从Kafka中实时读取数据写入到Iceberg表中,操作步骤如下:
一、首先需要创建对应的Iceberg表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
"'type'='iceberg'," +
"'catalog-type'='hadoop'," +
"'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.创建iceberg表 flink_iceberg_tbl
tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");
二、编写代码读取Kafka数据实时写入Iceberg
public class ReadKafkaToIceberg {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
/**
* 1.需要预先创建 Catalog 及Iceberg表
*/
//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
"'type'='iceberg'," +
"'catalog-type'='hadoop'," +
"'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.创建iceberg表 flink_iceberg_tbl
// tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");
//3.创建 Kafka Connector,连接消费Kafka中数据
tblEnv.executeSql("create table kafka_input_table(" +
" id int," +
" name varchar," +
" age int," +
" loc varchar" +
") with (" +
" 'connector' = 'kafka'," +
" 'topic' = 'flink-iceberg-topic'," +
" 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092'," +
" 'scan.startup.mode'='latest-offset'," +
" 'properties.group.id' = 'my-group-id'," +
" 'format' = 'csv'" +
")");
//4.配置 table.dynamic-table-options.enabled
Configuration configuration = tblEnv.getConfig().getConfiguration();
// 支持SQL语法中的 OPTIONS 选项
configuration.setBoolean("table.dynamic-table-options.enabled", true);
//5.写入数据到表 flink_iceberg_tbl3
tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 select id,name,age,loc from kafka_input_table");
//6.查询表数据
TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");
tableResult.print();
}
}
启动以上代码,向Kafka topic中生产如下数据:
1,zs,18,beijing
2,ls,19,shanghai
3,ww,20,beijing
4,ml,21,shanghai
我们可以看到控制台上有对应实时数据输出,查看对应的Icberg HDFS目录,数据写入成功.
边栏推荐
- SQL - Left join, Right join, Inner join
- mysql根据多字段分组——group by带两个或多个参数
- apisix-Getting Started
- In PLC communication error or timeout or download the prompt solution of the model
- Acwing-考研机试题
- MySQL index usage and optimization
- Master SSR
- 矩形脉冲波形的占空比及脉冲和瞬态特征的测量
- 7 天学个Go,Go 结构体 + Go range 来学学
- 502 bad gateway原因、解决方法
猜你喜欢
随机推荐
502 bad gateway原因、解决方法
结构化查询语言SQL-关系数据库标准语言
若枚举映射的值不存在,则不进行反序列化
3D激光SLAM:LeGO-LOAM论文解读---完整篇
Master SSR
Docker practical experience: Deploy mysql8 master-slave replication on Docker
MySQL row-level locks (row locks, adjacent key locks, gap locks)
Docker实践经验:Docker 上部署 mysql8 主从复制
The latest MySql installation teaching, very detailed
After class, watching the documentation and walking back to the lab, I picked up the forgotten SQL operators again
How MySQL's allowMultiQueries flag relates to JDBC and jOOQ
SAP Commerce Cloud Product Review 的添加逻辑
【虚拟化生态平台】树莓派安装虚拟化平台操作流程
musl Reference Manual
file contains vulnerabilities
DCM 中间件家族迎来新成员
keras自带数据集(横线生成器)
三层架构service、dao、controller层
一文带你了解redux的工作流程——actionreducerstore
LeetCode - 025. 链表中的两数相加