当前位置:网站首页>Data Lake (XVII): Flink and iceberg integrate datastream API operations
Data Lake (XVII): Flink and iceberg integrate datastream API operations
2022-07-28 03:15:00 【Lanson】
Flink And Iceberg Integrate DataStream API operation
at present Flink Support use DataStream API and SQL API Read and write in real time Iceberg surface , We suggest you use SQL API Read and write in real time Iceberg surface .
Iceberg Supported by Flink Version is 1.11.x Above version , Currently tested Iceberg Version and Flink The version correspondence of is as follows :
Flink1.11.x Version and Iceberg0.11.1 Version match . Flink1.12.x~Flink1.1.x Version and Iceberg0.12.1 Version match ,SQL API Somewhat bug. Flink1.14.x Version and Iceberg0.12.1 The version can be integrated, but there are some small bug, For example, real-time reading Iceberg The data in are bug.
following Flink And Iceberg Integrated use of Flink Version is 1.13.5,Iceberg Version is 0.12.1 edition . Later use SQL API Used in operation Flink Version is 1.11.6,Iceberg Version is 0.11.1 edition .
One 、DataStream API Write in real time Iceberg surface
DataStream Api Way to operate Iceberg At present, only Java Api. Use DataStream API Write in real time Iceberg The specific operations of the table are as follows :
1、 First, in the Maven Import the following dependencies
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<!-- flink 1.12.x -1.13.x Version and Iceberg 0.12.1 Version compatibility , Cannot be associated with Flink 1.14 compatible -->
<flink.version>1.13.5</flink.version>
<!--<flink.version>1.12.1</flink.version>-->
<!--<flink.version>1.14.2</flink.version>-->
<!-- flink 1.11.x And Iceberg 0.11.1 appropriate -->
<!--<flink.version>1.11.6</flink.version>-->
<hadoop.version>3.2.2</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-iceberg</artifactId>
<version>1.13-vvr-4.0.7</version>
</dependency>
<!-- Flink operation Iceberg Needed Iceberg rely on -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime</artifactId>
<version>0.12.1</version>
<!--<version>0.11.1</version>-->
</dependency>
<!-- java Development Flink The required depend on -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Kafka Connector dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Read hdfs Documentation needs jar package -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- Flink SQL & Table-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- log4j and slf4j package , If you don't want to see logs in the console , You can comment out the following packages -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>
2、 Write code using DataStream API take Kafka Data written to Iceberg surface
import com.google.common.collect.ImmutableMap;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.TableLoader;
import org.apache.flink.table.data.StringData;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.types.Types;
import java.util.Map;
/**
* Use DataStream Api towards Iceberg Table write data
*/
public class StreamAPIWriteIceberg {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1. You have to set checkpoint ,Flink towards Iceberg When writing data in checkpoint After occurrence , Will commit data .
env.enableCheckpointing(5000);
//2. Read Kafka Medium topic data
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("node1:9092,node2:9092,node3:9092")
.setTopics("flink-iceberg-topic")
.setGroupId("my-group-id")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
//3. Process the data , Package as RowData object , Easy to save to Iceberg In the table .
SingleOutputStreamOperator<RowData> dataStream = kafkaSource.map(new MapFunction<String, RowData>() {
@Override
public RowData map(String s) throws Exception {
System.out.println("s = "+s);
String[] split = s.split(",");
GenericRowData row = new GenericRowData(4);
row.setField(0, Integer.valueOf(split[0]));
row.setField(1, StringData.fromString(split[1]));
row.setField(2, Integer.valueOf(split[2]));
row.setField(3, StringData.fromString(split[3]));
return row;
}
});
//4. establish Hadoop To configure 、Catalog Configuration and table Schema, It is convenient to find the corresponding table when writing data to the path later
Configuration hadoopConf = new Configuration();
Catalog catalog = new HadoopCatalog(hadoopConf,"hdfs://mycluster/flink_iceberg/");
// To configure iceberg Library name and table name
TableIdentifier name =
TableIdentifier.of("icebergdb", "flink_iceberg_tbl");
// establish Icebeng surface Schema
Schema schema = new Schema(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.required(2, "nane", Types.StringType.get()),
Types.NestedField.required(3, "age", Types.IntegerType.get()),
Types.NestedField.required(4, "loc", Types.StringType.get()));
// If there is a partition, specify the corresponding partition , here “loc” Column as partition column , You can specify unpartitioned Method does not set the table partition
// PartitionSpec spec = PartitionSpec.unpartitioned();
PartitionSpec spec = PartitionSpec.builderFor(schema).identity("loc").build();
// Appoint Iceberg The table data is formatted as Parquet Storage
Map<String, String> props =
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());
Table table = null;
// adopt catalog Determine if the table exists , Create... If it doesn't exist , Load when it exists
if (!catalog.tableExists(name)) {
table = catalog.createTable(name, schema, spec, props);
}else {
table = catalog.loadTable(name);
}
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl", hadoopConf);
//5. adopt DataStream Api towards Iceberg Middle write data
FlinkSink.forRowData(dataStream)
// This .table Or not , Appoint tableLoader The corresponding path can .
.table(table)
.tableLoader(tableLoader)
// The default is false, Additional data . If set to true Is to overwrite data
.overwrite(false)
.build();
env.execute("DataStream Api Write Data To Iceberg");
}
}
The above code has the following precautions :
Need to set up Checkpoint,Flink towards Iceberg writes Commit Data time , Only Checkpoint Only after success Commit data , Otherwise, later on Hive Data not found in query . Read Kafka The data needs to be packaged into RowData perhaps Row object , To Iceberg Write the data in the table . When writing out data, the default is to append data , If specified overwrite Is to cover all data . In the Iceberg Before writing data in the table, you need to create the corresponding Catalog、 surface Schema, Otherwise, when writing out, only specifying the corresponding path will report an error, and the corresponding Iceberg surface . Not recommended DataStream API towards Iceberg Write data in , It is recommended to use SQL API.
3、 stay Kafka Create the code specified in “flink-iceberg-topic” And start the code production data
# stay Kafka Created in flink-iceberg-topic topic
[[email protected] bin]# ./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic flink-iceberg-topic --partitions 3 --replication-factor 3
Create the above topic after , Startup code , And then to topic The following data are produced in :
[[email protected] bin]#./kafka-console-producer.sh --topic flink-iceberg-topic --broker-list node1:9092,node2:9092,node3:9092
1,zs,18,beijing
2,ls,19,shanghai
3,ww,20,beijing
4,ml,21,shanghai
You can see in the HDFS The corresponding data is saved in the corresponding path :

4、 adopt Hive View saved to Iceberg Data in
start-up Hive、Hive Metastore stay Hive Create mapping in Iceberg The appearance of :
CREATE TABLE flink_iceberg_tbl (
id int,
name string,
age int,
loc string
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl'
TBLPROPERTIES ('iceberg.catalog'='location_based_table');
Be careful : although loc It's a partition column , Ignore the partition column when creating , In addition, the path of the mapping table should be maintained and saved Iceberg The data path is consistent .
adopt Hive Query corresponding Iceberg Table data , give the result as follows :
hive> select * from flink_iceberg_tbl;
OK
2 ls 19 shanghai
3 ww 20 beijing
1 zs 18 beijing
4 ml 21 shanghai
Two 、DataStream API Batch / Real time reading Iceberg surface
DataStream API Read Iceberg Tables are also divided into batch reading and real-time reading . By means of “streaming(true/false)” To control .
1、 Batch / Full read
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.FlinkSource;
/**
* Use DataStream Api Batch / real time Read Iceberg data
*/
public class StreamAPIReadIceberg {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1. To configure TableLoader
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl", hadoopConf);
//2. from Iceberg Read the full amount in / Incremental reading of data
DataStream<RowData> batchData = FlinkSource.forRowData().env(env)
.tableLoader(tableLoader)
// The default is false, Read the whole batch , Set to true For streaming reading
.streaming(false)
.build();
batchData.map(new MapFunction<RowData, String>() {
@Override
public String map(RowData rowData) throws Exception {
int id = rowData.getInt(0);
String name = rowData.getString(1).toString();
int age = rowData.getInt(2);
String loc = rowData.getString(3).toString();
return id+","+name+","+age+","+loc;
}
}).print();
env.execute("DataStream Api Read Data From Iceberg");
}
}
give the result as follows :

2、 Real time reading
// When configuring streaming Parameter is true Time is read in real time
DataStream<RowData> batchData = FlinkSource.forRowData().env(env)
.tableLoader(tableLoader)
// The default is false, Read the whole batch , Set to true For streaming reading
.streaming(true)
.build();
Modify the above code and start , towards Hive Corresponding Iceberg surface “flink_iceberg_tbl” Insert 2 Data :
In the Hive Of Iceberg Before inserting data into the table, you need to add the following two packages :
add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;
towards Hive in Iceberg Table inserts two pieces of data
hive> insert into flink_iceberg_tbl values (5,'s1',30,'guangzhou'),(6,'s2',31,'tianjin');
After insertion , You can see Flink The console reads the corresponding data in real time

3、 ... and 、 Specify real-time incremental reading of data based on snapshots
In the above case, we found Flink Read out all the data in the table , We can also specify the corresponding snapshot-id Decide which data increments to read data based on .
DataStream<RowData> batchData = FlinkSource.forRowData().env(env)
.tableLoader(tableLoader)
// Read data incrementally in real time based on a snapshot , Snapshots need to be obtained from metadata
.startSnapshotId(4226332606322964975L)
// The default is false, Read the whole batch , Set to true For streaming reading
.streaming(true)
.build();
As a result, only the data after the specified snapshot is read , as follows :

Four 、 Merge data files
Iceberg Provide Api Merge small files into large files , Can pass Flink Batch tasks to execute .Flink Merge small files with Spark The merging of small and medium-sized files is exactly the same .
The code is as follows :
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewriteDataFilesActionResult;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.actions.Actions;
import org.apache.iceberg.hadoop.HadoopCatalog;
/**
* You can submit Flink Batch tasks to merge Data Files file .
*/
public class RewrietDataFiles {
public static void main(String[] args) {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//1. To configure TableLoader
Configuration hadoopConf = new Configuration();
//2. establish Hadoop To configure 、Catalog Configuration and table Schema, It is convenient to find the corresponding table when writing data to the path later
Catalog catalog = new HadoopCatalog(hadoopConf,"hdfs://mycluster/flink_iceberg/");
//3. To configure iceberg Library name and table name and load table
TableIdentifier name =
TableIdentifier.of("icebergdb", "flink_iceberg_tbl");
Table table = catalog.loadTable(name);
//4.. Merge data files Small files
RewriteDataFilesActionResult result = Actions.forTable(table)
.rewriteDataFiles()
// Default 512M , You can manually specify the merge file size by , And Spark In the same .
.targetSizeInBytes(536870912L)
.execute();
}
}
边栏推荐
- MySQL索引学习
- Data Lake: database data migration tool sqoop
- Qt官方示例:Fridge Magnets Example(冰箱贴)
- C#设置Textbox控件不可编辑
- Development and design logic of rtsp/onvif protocol easynvr video platform one click upgrade scheme
- 工程地质实习-工程地质 题集
- My approval & signature function of conference OA project
- Building of APP automation environment (I)
- els 键盘信息
- R 笔记 MICE
猜你喜欢

Comprehensive case

Comprehensive comparative study of image denoising

Es6.--promise, task queue and event cycle

stm32F407-------FPU学习

Superparameter adjustment and experiment - training depth neural network | pytorch series (26)

意外收获史诗级分布式资源,从基础到进阶都干货满满,大佬就是强!
[email protected] Annotation usage"/>[email protected] Annotation usage

Commissioning experience of ROS

The test post changes jobs repeatedly, jumping and jumping, and then it disappears

微服务架构统一安全认证设计与实践
随机推荐
Stm32f407 ------- FPU learning
数据湖:数据库数据迁移工具Sqoop
QFileDevice、QFile、QSaveFile、QTemporaryFile
Using pytorch's tensorboard visual deep learning indicators | pytorch series (25)
GAMES101复习:光线追踪(Ray Tracing)
ELS keyboard information
QML使用Layout布局时出现大量<Unknown File>: QML QQuickLayoutAttached: Binding loop detected for property循环绑定警告
汇总了50多场面试,4-6月面经笔记和详解(含核心考点及6家大厂)
More than 50 interviews have been summarized, and notes and detailed explanations have been taken from April to June (including core test sites and 6 large factories)
Kubernetes-----介绍
【stream】并行流与顺序流
傅里叶级数
决策树与随机森林学习笔记(1)
每日刷题巩固知识
Full of dry goods, hurry in!!! Easy to master functions in C language
C#实现弹出一个对话框的同时,后面的form不可用
树莓派开发继电器控制灯
els 键盘信息
[QNX hypervisor 2.2 user manual]9.10 pass
OA项目之我的审批(会议查询&会议签字)