当前位置:网站首页>Data Lake (XVII): Flink and iceberg integrate datastream API operations
Data Lake (XVII): Flink and iceberg integrate datastream API operations
2022-07-28 03:15:00 【Lanson】
Flink And Iceberg Integrate DataStream API operation
at present Flink Support use DataStream API and SQL API Read and write in real time Iceberg surface , We suggest you use SQL API Read and write in real time Iceberg surface .
Iceberg Supported by Flink Version is 1.11.x Above version , Currently tested Iceberg Version and Flink The version correspondence of is as follows :
Flink1.11.x Version and Iceberg0.11.1 Version match . Flink1.12.x~Flink1.1.x Version and Iceberg0.12.1 Version match ,SQL API Somewhat bug. Flink1.14.x Version and Iceberg0.12.1 The version can be integrated, but there are some small bug, For example, real-time reading Iceberg The data in are bug.
following Flink And Iceberg Integrated use of Flink Version is 1.13.5,Iceberg Version is 0.12.1 edition . Later use SQL API Used in operation Flink Version is 1.11.6,Iceberg Version is 0.11.1 edition .
One 、DataStream API Write in real time Iceberg surface
DataStream Api Way to operate Iceberg At present, only Java Api. Use DataStream API Write in real time Iceberg The specific operations of the table are as follows :
1、 First, in the Maven Import the following dependencies
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<!-- flink 1.12.x -1.13.x Version and Iceberg 0.12.1 Version compatibility , Cannot be associated with Flink 1.14 compatible -->
<flink.version>1.13.5</flink.version>
<!--<flink.version>1.12.1</flink.version>-->
<!--<flink.version>1.14.2</flink.version>-->
<!-- flink 1.11.x And Iceberg 0.11.1 appropriate -->
<!--<flink.version>1.11.6</flink.version>-->
<hadoop.version>3.2.2</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-iceberg</artifactId>
<version>1.13-vvr-4.0.7</version>
</dependency>
<!-- Flink operation Iceberg Needed Iceberg rely on -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime</artifactId>
<version>0.12.1</version>
<!--<version>0.11.1</version>-->
</dependency>
<!-- java Development Flink The required depend on -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Kafka Connector dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Read hdfs Documentation needs jar package -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- Flink SQL & Table-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- log4j and slf4j package , If you don't want to see logs in the console , You can comment out the following packages -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>
2、 Write code using DataStream API take Kafka Data written to Iceberg surface
import com.google.common.collect.ImmutableMap;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.TableLoader;
import org.apache.flink.table.data.StringData;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.types.Types;
import java.util.Map;
/**
* Use DataStream Api towards Iceberg Table write data
*/
public class StreamAPIWriteIceberg {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1. You have to set checkpoint ,Flink towards Iceberg When writing data in checkpoint After occurrence , Will commit data .
env.enableCheckpointing(5000);
//2. Read Kafka Medium topic data
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("node1:9092,node2:9092,node3:9092")
.setTopics("flink-iceberg-topic")
.setGroupId("my-group-id")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
//3. Process the data , Package as RowData object , Easy to save to Iceberg In the table .
SingleOutputStreamOperator<RowData> dataStream = kafkaSource.map(new MapFunction<String, RowData>() {
@Override
public RowData map(String s) throws Exception {
System.out.println("s = "+s);
String[] split = s.split(",");
GenericRowData row = new GenericRowData(4);
row.setField(0, Integer.valueOf(split[0]));
row.setField(1, StringData.fromString(split[1]));
row.setField(2, Integer.valueOf(split[2]));
row.setField(3, StringData.fromString(split[3]));
return row;
}
});
//4. establish Hadoop To configure 、Catalog Configuration and table Schema, It is convenient to find the corresponding table when writing data to the path later
Configuration hadoopConf = new Configuration();
Catalog catalog = new HadoopCatalog(hadoopConf,"hdfs://mycluster/flink_iceberg/");
// To configure iceberg Library name and table name
TableIdentifier name =
TableIdentifier.of("icebergdb", "flink_iceberg_tbl");
// establish Icebeng surface Schema
Schema schema = new Schema(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.required(2, "nane", Types.StringType.get()),
Types.NestedField.required(3, "age", Types.IntegerType.get()),
Types.NestedField.required(4, "loc", Types.StringType.get()));
// If there is a partition, specify the corresponding partition , here “loc” Column as partition column , You can specify unpartitioned Method does not set the table partition
// PartitionSpec spec = PartitionSpec.unpartitioned();
PartitionSpec spec = PartitionSpec.builderFor(schema).identity("loc").build();
// Appoint Iceberg The table data is formatted as Parquet Storage
Map<String, String> props =
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());
Table table = null;
// adopt catalog Determine if the table exists , Create... If it doesn't exist , Load when it exists
if (!catalog.tableExists(name)) {
table = catalog.createTable(name, schema, spec, props);
}else {
table = catalog.loadTable(name);
}
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl", hadoopConf);
//5. adopt DataStream Api towards Iceberg Middle write data
FlinkSink.forRowData(dataStream)
// This .table Or not , Appoint tableLoader The corresponding path can .
.table(table)
.tableLoader(tableLoader)
// The default is false, Additional data . If set to true Is to overwrite data
.overwrite(false)
.build();
env.execute("DataStream Api Write Data To Iceberg");
}
}
The above code has the following precautions :
Need to set up Checkpoint,Flink towards Iceberg writes Commit Data time , Only Checkpoint Only after success Commit data , Otherwise, later on Hive Data not found in query . Read Kafka The data needs to be packaged into RowData perhaps Row object , To Iceberg Write the data in the table . When writing out data, the default is to append data , If specified overwrite Is to cover all data . In the Iceberg Before writing data in the table, you need to create the corresponding Catalog、 surface Schema, Otherwise, when writing out, only specifying the corresponding path will report an error, and the corresponding Iceberg surface . Not recommended DataStream API towards Iceberg Write data in , It is recommended to use SQL API.
3、 stay Kafka Create the code specified in “flink-iceberg-topic” And start the code production data
# stay Kafka Created in flink-iceberg-topic topic
[[email protected] bin]# ./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic flink-iceberg-topic --partitions 3 --replication-factor 3
Create the above topic after , Startup code , And then to topic The following data are produced in :
[[email protected] bin]#./kafka-console-producer.sh --topic flink-iceberg-topic --broker-list node1:9092,node2:9092,node3:9092
1,zs,18,beijing
2,ls,19,shanghai
3,ww,20,beijing
4,ml,21,shanghai
You can see in the HDFS The corresponding data is saved in the corresponding path :

4、 adopt Hive View saved to Iceberg Data in
start-up Hive、Hive Metastore stay Hive Create mapping in Iceberg The appearance of :
CREATE TABLE flink_iceberg_tbl (
id int,
name string,
age int,
loc string
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl'
TBLPROPERTIES ('iceberg.catalog'='location_based_table');
Be careful : although loc It's a partition column , Ignore the partition column when creating , In addition, the path of the mapping table should be maintained and saved Iceberg The data path is consistent .
adopt Hive Query corresponding Iceberg Table data , give the result as follows :
hive> select * from flink_iceberg_tbl;
OK
2 ls 19 shanghai
3 ww 20 beijing
1 zs 18 beijing
4 ml 21 shanghai
Two 、DataStream API Batch / Real time reading Iceberg surface
DataStream API Read Iceberg Tables are also divided into batch reading and real-time reading . By means of “streaming(true/false)” To control .
1、 Batch / Full read
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.FlinkSource;
/**
* Use DataStream Api Batch / real time Read Iceberg data
*/
public class StreamAPIReadIceberg {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1. To configure TableLoader
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl", hadoopConf);
//2. from Iceberg Read the full amount in / Incremental reading of data
DataStream<RowData> batchData = FlinkSource.forRowData().env(env)
.tableLoader(tableLoader)
// The default is false, Read the whole batch , Set to true For streaming reading
.streaming(false)
.build();
batchData.map(new MapFunction<RowData, String>() {
@Override
public String map(RowData rowData) throws Exception {
int id = rowData.getInt(0);
String name = rowData.getString(1).toString();
int age = rowData.getInt(2);
String loc = rowData.getString(3).toString();
return id+","+name+","+age+","+loc;
}
}).print();
env.execute("DataStream Api Read Data From Iceberg");
}
}
give the result as follows :

2、 Real time reading
// When configuring streaming Parameter is true Time is read in real time
DataStream<RowData> batchData = FlinkSource.forRowData().env(env)
.tableLoader(tableLoader)
// The default is false, Read the whole batch , Set to true For streaming reading
.streaming(true)
.build();
Modify the above code and start , towards Hive Corresponding Iceberg surface “flink_iceberg_tbl” Insert 2 Data :
In the Hive Of Iceberg Before inserting data into the table, you need to add the following two packages :
add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;
towards Hive in Iceberg Table inserts two pieces of data
hive> insert into flink_iceberg_tbl values (5,'s1',30,'guangzhou'),(6,'s2',31,'tianjin');
After insertion , You can see Flink The console reads the corresponding data in real time

3、 ... and 、 Specify real-time incremental reading of data based on snapshots
In the above case, we found Flink Read out all the data in the table , We can also specify the corresponding snapshot-id Decide which data increments to read data based on .
DataStream<RowData> batchData = FlinkSource.forRowData().env(env)
.tableLoader(tableLoader)
// Read data incrementally in real time based on a snapshot , Snapshots need to be obtained from metadata
.startSnapshotId(4226332606322964975L)
// The default is false, Read the whole batch , Set to true For streaming reading
.streaming(true)
.build();
As a result, only the data after the specified snapshot is read , as follows :

Four 、 Merge data files
Iceberg Provide Api Merge small files into large files , Can pass Flink Batch tasks to execute .Flink Merge small files with Spark The merging of small and medium-sized files is exactly the same .
The code is as follows :
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewriteDataFilesActionResult;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.actions.Actions;
import org.apache.iceberg.hadoop.HadoopCatalog;
/**
* You can submit Flink Batch tasks to merge Data Files file .
*/
public class RewrietDataFiles {
public static void main(String[] args) {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//1. To configure TableLoader
Configuration hadoopConf = new Configuration();
//2. establish Hadoop To configure 、Catalog Configuration and table Schema, It is convenient to find the corresponding table when writing data to the path later
Catalog catalog = new HadoopCatalog(hadoopConf,"hdfs://mycluster/flink_iceberg/");
//3. To configure iceberg Library name and table name and load table
TableIdentifier name =
TableIdentifier.of("icebergdb", "flink_iceberg_tbl");
Table table = catalog.loadTable(name);
//4.. Merge data files Small files
RewriteDataFilesActionResult result = Actions.forTable(table)
.rewriteDataFiles()
// Default 512M , You can manually specify the merge file size by , And Spark In the same .
.targetSizeInBytes(536870912L)
.execute();
}
}
边栏推荐
- QFileDevice、QFile、QSaveFile、QTemporaryFile
- JVM memory layout detailed, illustrated, well written!
- 线程基础
- 【2022 牛客第二场J题 Link with Arithmetic Progression】三分套三分/三分极值/线性方程拟合最小二乘法
- Unexpected harvest of epic distributed resources, from basic to advanced are full of dry goods, big guys are strong!
- 酒店vr全景展示拍摄提供更多合作和洽谈的机会
- clientY vs pageY
- 对象数组转成strin再进行,隔开的字符串,包括赛选某个字段的子,或者求和,
- Redis群集
- vi命令详解
猜你喜欢

Actual case of ROS communication

Games101 review: ray tracing

MySQL index learning

基于c8t6芯片开发RC522模块实现呼吸灯

基于JSP&Servlet实现的众筹平台系统

ECCV 2022 | open source for generative knowledge distillation of classification, detection and segmentation

vscode debug显示多列数据

【2022 牛客第二场J题 Link with Arithmetic Progression】三分套三分/三分极值/线性方程拟合最小二乘法

Docker advanced -redis cluster configuration in docker container

GAMES101复习:光线追踪(Ray Tracing)
随机推荐
stm32F407-------FPU学习
Is the securities account given by qiniu safe? Can qiniu open an account and buy funds
数据湖:数据库数据迁移工具Sqoop
Decision tree and random forest learning notes (1)
"29 years old, general function test, how do I get five offers in a week?"
Where do I go to open an account for stock speculation? Is it safe to open an account on my mobile phone
JVM 内存布局详解,图文并茂,写得太好了!
Data center construction (III): introduction to data center architecture
NPDP考生!7月31号考试要求在这里看!
Trivy [1] tool scanning application
综合 案例
QT topic 1: implementing a simple calculator
ELS keyboard information
Original title of Blue Bridge Cup
Random forest and integration method learning notes
机器人工程是否有红利期
Superparameter adjustment and experiment - training depth neural network | pytorch series (26)
Stm32f407 ------- FPU learning
MySQL索引学习
Record of a cross domain problem