当前位置:网站首页>Data Lake (19): SQL API reads Kafka data and writes it to iceberg table in real time

Data Lake (19): SQL API reads Kafka data and writes it to iceberg table in real time

2022-07-24 03:37:00 Lanson

SQL API Read Kafka Data is written in real time Iceberg surface

from Kafka Read data in real time and write it to Iceberg In the table , The operation steps are as follows :

One 、 First, you need to create the corresponding Iceberg surface

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
//1. establish Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
        "'type'='iceberg'," +
        "'catalog-type'='hadoop'," +
        "'warehouse'='hdfs://mycluster/flink_iceberg')");
//2. establish iceberg surface  flink_iceberg_tbl
tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");

Two 、 Write code to read Kafka Data is written in real time Iceberg

public class ReadKafkaToIceberg {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
        env.enableCheckpointing(1000);

        /**
         * 1. Pre creation required  Catalog  And Iceberg surface 
         */
        //1. establish Catalog
        tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
                "'type'='iceberg'," +
                "'catalog-type'='hadoop'," +
                "'warehouse'='hdfs://mycluster/flink_iceberg')");

        //2. establish iceberg surface  flink_iceberg_tbl
//        tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");

        //3. establish  Kafka Connector, Connected consumption Kafka Data in the 
        tblEnv.executeSql("create table kafka_input_table(" +
                " id int," +
                " name varchar," +
                " age int," +
                " loc varchar" +
                ") with (" +
                " 'connector' = 'kafka'," +
                " 'topic' = 'flink-iceberg-topic'," +
                " 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092'," +
                " 'scan.startup.mode'='latest-offset'," +
                " 'properties.group.id' = 'my-group-id'," +
                " 'format' = 'csv'" +
                ")");

        //4. To configure  table.dynamic-table-options.enabled
        Configuration configuration = tblEnv.getConfig().getConfiguration();
        //  Support SQL The grammatical  OPTIONS  Options 
        configuration.setBoolean("table.dynamic-table-options.enabled", true);

        //5. Write data to table  flink_iceberg_tbl3
        tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 select id,name,age,loc from kafka_input_table");

        //6. Query table data 
        TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");
        tableResult.print();
    }
}

Start the above code , towards Kafka topic The following data are produced in :

1,zs,18,beijing
2,ls,19,shanghai
3,ww,20,beijing
4,ml,21,shanghai

We can see the corresponding real-time data output on the console , Check the corresponding Icberg HDFS Catalog , Data write success .

原网站

版权声明
本文为[Lanson]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/205/202207240331465261.html