当前位置:网站首页>Data Lake (19): SQL API reads Kafka data and writes it to iceberg table in real time
Data Lake (19): SQL API reads Kafka data and writes it to iceberg table in real time
2022-07-24 03:37:00 【Lanson】
SQL API Read Kafka Data is written in real time Iceberg surface
from Kafka Read data in real time and write it to Iceberg In the table , The operation steps are as follows :
One 、 First, you need to create the corresponding Iceberg surface
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
//1. establish Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
"'type'='iceberg'," +
"'catalog-type'='hadoop'," +
"'warehouse'='hdfs://mycluster/flink_iceberg')");
//2. establish iceberg surface flink_iceberg_tbl
tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");Two 、 Write code to read Kafka Data is written in real time Iceberg
public class ReadKafkaToIceberg {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
/**
* 1. Pre creation required Catalog And Iceberg surface
*/
//1. establish Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
"'type'='iceberg'," +
"'catalog-type'='hadoop'," +
"'warehouse'='hdfs://mycluster/flink_iceberg')");
//2. establish iceberg surface flink_iceberg_tbl
// tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");
//3. establish Kafka Connector, Connected consumption Kafka Data in the
tblEnv.executeSql("create table kafka_input_table(" +
" id int," +
" name varchar," +
" age int," +
" loc varchar" +
") with (" +
" 'connector' = 'kafka'," +
" 'topic' = 'flink-iceberg-topic'," +
" 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092'," +
" 'scan.startup.mode'='latest-offset'," +
" 'properties.group.id' = 'my-group-id'," +
" 'format' = 'csv'" +
")");
//4. To configure table.dynamic-table-options.enabled
Configuration configuration = tblEnv.getConfig().getConfiguration();
// Support SQL The grammatical OPTIONS Options
configuration.setBoolean("table.dynamic-table-options.enabled", true);
//5. Write data to table flink_iceberg_tbl3
tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 select id,name,age,loc from kafka_input_table");
//6. Query table data
TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");
tableResult.print();
}
}Start the above code , towards Kafka topic The following data are produced in :
1,zs,18,beijing
2,ls,19,shanghai
3,ww,20,beijing
4,ml,21,shanghaiWe can see the corresponding real-time data output on the console , Check the corresponding Icberg HDFS Catalog , Data write success .
边栏推荐
- 数据湖:开源数据湖方案DeltaLake、Hudi、Iceberg对比分析
- Bingbing learning notes: basic operation of vim tool
- MySql的DDL和DML和DQL的基本语法
- MySQL学习——MySQL软件的安装及环境配置(Windows)详细!
- C user defined type details
- C文件操作详解
- 93. (leaflet chapter) leaflet situation plotting - modification of attack direction
- 拉格朗日插值法
- QT custom class uses custom parametric signals and slots
- Summary of Zhang Yu's 30 lectures on Advanced Mathematics
猜你喜欢

数据湖:Apache Hudi简介

Developers share mindspire Lite experience, one click image segmentation

Data Lake: comparative analysis of open source data Lake schemes deltalake, Hudi and iceberg

Do you know how to do interface testing well?

【云原生】快速了解Kubernetes

Convert the pseudo array returned by childNodes into a true array

Xiaodi and Xiaohui

实现两个页面之前的通信(使用localStorage)

IDEA Clone的项目报Cannot resolve symbol ‘Override‘

Qt ROS相关操作(运行终端指令、发布订阅自定义消息话题或服务、订阅图像并显示)
随机推荐
数据湖(十九):SQL API 读取Kafka数据实时写入Iceberg表
Y74. Chapter IV Prometheus large factory monitoring system and practice -- Introduction to promql and monitoring pod resources (V)
C动态内存管理详解
Convert the pseudo array returned by childNodes into a true array
RTOS内功修炼记(十) | 深度解析RTOS内核上下文切换机制
JS 數組 isAarray() typeof
It's solved with a cry
The local picture cannot be displayed after the uniapp H5 is packaged
4. Hezhou air32f103_ LCD
Gpushare. COM | how to use tensorboardx visualization tool?
C語言經典練習題(2)——“冒泡排序(Bubble Sort)“
uniapp H5打包后本地图片无法显示问题
Error code 0x80004005
Standard C language 10
[interpolation expression of applet, rendering judgment, binding events and sharing]
一篇搞定CAS,深度讲解,面试实践必备
MySql的DDL和DML和DQL的基本语法
Network parameter management
Express内置的中间件
拉格朗日插值法