当前位置:网站首页>flink-sql所有表连接器

flink-sql所有表连接器

2022-08-04 05:27:00 第一片心意

版本说明

本文档介绍的各种flink sql的语法基于flink-1.13.x,flink版本低于1.13.x的用户,在sql运行出错误时,需要自行去flink官网查看对应版本的语法支持。

另外,flink新版本支持的语法,文档中会进行特殊标注,说明对应语法在 flink 哪个版本开始支持,但凡是没有特殊标注的,均支持flink-1.13.x及以上版本。

简介

介绍

Flink的Table API和SQL程序可以连接到其他外部系统,用于读写批处理表和流处理表。
表source提供对存储在外部系统(如数据库、键值存储、消息队列或文件系统)中数据的访问。表sink向外部存储系统发送数据。根据source和sink的类型,它们支持不同的格式,如CSVAvroParquetORC

本节描述如何使用内置的连接器在Flink中注册表source和表sink。注册source或sink后,可以通过表API和SQL语句访问它。

如果你实现自己的自定义表source或sink,请查看自定义source和sink连接器页面。

支持的连接器

Flink内置各种连接器。下表列出了所有可用的连接器。

NameVersionSourceSink
FilesystemBounded and Unbounded Scan, LookupStreaming Sink, Batch Sink
Elasticsearch6.x & 7.xNot supportedStreaming Sink, Batch Sink
Apache Kafka0.10+Unbounded ScanStreaming Sink, Batch Sink
Amazon Kinesis Data StreamsUnbounded ScanStreaming Sink
JDBCBounded Scan, LookupStreaming Sink, Batch Sink
Apache HBase1.4.x & 2.2.xBounded Scan, LookupStreaming Sink, Batch Sink
Apache HiveSupported VersionsUnbounded Scan, Bounded Scan, LookupStreaming Sink, Batch Sink

使用连接器

Flink支持使用SQL CREATE TABLE语句来注册表。可以定义表名、表schema和用于连接外部系统的表选项。

有关创建表的更多信息,请参阅语法部分。

下面的代码展示了如何连接到Kafka来读写JSON格式记录的完整示例。

CREATE TABLE MyUserTable (
    -- 声明表的schema
    `user` BIGINT,
    `message` STRING,
    `rowtime` TIMESTAMP(3) METADATA FROM 'timestamp',    -- 使用元数据字段来访问kafka数据的timestamp时间戳
    `proctime` AS PROCTIME(),    -- 使用计算列来定义处理时间属性
    WATERMARK FOR `rowtime` AS `rowtime` - INTERVAL '5' SECOND    -- 使用WATERMARK语句定义rowtime属性
) WITH (
    -- 定义连接的外部系统属性
    'connector' = 'kafka',
    'topic' = 'topic_name',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json'   -- 声明这个外部系统使用format
);

所需的连接属性被转换为基于字符串的键值对。工厂将基于工厂标识符(本例中是kafka和json)从键值对中创建配置好的表source、表sink和相应的format格式。
在为每个组件搜索一个匹配的工厂时,会考虑所有可以通过Java的服务提供者接口(SPI)找到的工厂。

如果找不到任何工厂或找到了多个与给定属性匹配的工厂,则将抛出一个异常,并提供有关可以使用的工厂和受支持属性的附加信息。

schema匹配

SQL CREATE TABLE语句的body子句定义了物理列、约束、水印的名称和类型。Flink不保存数据,因此schema定义仅声明如何将物理列从外部系统映射到Flink。
映射可能不是按名称一一对应的,这取决于格式和连接器的实现。例如,MySQL数据库表是按字段名(不区分大小写)映射的,CSV文件系统是按字段顺序映射的(字段名可以是任意的)。这些规则将根据对应的连接器来解析。

下面的示例展示了一个简单的schema,其中没有时间属性、输入/输出到表列的一对一字段映射。

CREATE TABLE MyTable (
    MyField1 INT,
    MyField2 STRING,
    MyField3 BOOLEAN
) WITH (
    ...
)

元数据

一些连接器和格式公开了附加的元数据字段,可以在物理列后面的元数据列中访问这些字段。有关元数据列的更多信息,请参阅CREATE TABLE部分。

主键

主键约束表明表中的一列或一组列是唯一的,并且它们不包含NULL值。主键唯一地标识表中的一行。

source表的主键用于优化元数据信息。sink表的主键通常用于插入更新数据。

SQL标准指定主键约束可以是ENFORCED的,也可以是NOT ENFORCED的。这将控制是否对传入/传出数据执行约束检查。
Flink本身并不拥有数据,因此唯一支持的模式是NOT ENFORCED模式。确保查询执行的主键强制约束由用户实现。

CREATE TABLE MyTable (
    MyField1 INT,
    MyField2 STRING,
    MyField3 BOOLEAN,
    PRIMARY KEY (MyField1, MyField2) NOT ENFORCED  -- 根据字段定义主键列
) WITH (
    ...
)

时间属性

在处理无界流表时,时间属性是必不可少的。因此,proctimerowtime属性都可以定义为schema的一部分。

有关Flink中的时间处理(尤其是事件时间)的更多信息,建议参阅事件时间部分。

处理时间属性

为了在模式中声明proctime属性,可以使用计算列语法声明一个由proctime()内置函数生成的计算列。计算列是不存储在物理数据中的虚拟列。

CREATE TABLE MyTable (
    MyField1 INT,
    MyField2 STRING,
    MyField3 BOOLEAN
    MyField4 AS PROCTIME() -- 定义一个处理时间属性列
) WITH (
    ...
)

rowtime时间属性

为了控制表的事件时间行为,Flink提供了预定义的时间戳提取器和水印策略。

有关在DDL中定义时间属性的更多信息,请参阅CREATE TABLE语句。

支持以下时间戳提取器:

-- 使用已存在的TIMESTAMP(3)类型的字段作为事件时间属性
CREATE TABLE MyTable (
    ts_field TIMESTAMP(3),
    WATERMARK FOR ts_field AS ...
) WITH (
    ...
)

-- 使用系统函数、UDF、表达式来提取期望的TIMESTAMP(3)类型的事件时间属性
CREATE TABLE MyTable (
    log_ts STRING,
    ts_field AS TO_TIMESTAMP(log_ts),
    WATERMARK FOR ts_field AS ...
) WITH (
    ...
)

支持的水印策略如下:

-- 为严格升序的事件时间属性设置水印策略。发出到目前为止观察到的最大时间戳水印。时间戳大于最大时间戳的行不属于延迟。
CREATE TABLE MyTable (
    ts_field TIMESTAMP(3),
    WATERMARK FOR ts_field AS ts_field
) WITH (
    ...
)

-- 设置升序事件时间属性的水印策略。发出到目前为止观察到的最大时间戳减去1的水印。时间戳大于或等于最大时间戳的行不属于延迟。
CREATE TABLE MyTable (
    ts_field TIMESTAMP(3),
    WATERMARK FOR ts_field AS ts_field - INTERVAL '0.001' SECOND
) WITH (
    ...
)

-- 为事件时间属性设置水印策略,这些事件时间属性在有限的时间间隔内是无序的。发出的水印是观察到的最大时间戳减去指定的延迟,例如2秒。
CREATE TABLE MyTable (
    ts_field TIMESTAMP(3),
    WATERMARK FOR ts_field AS ts_field - INTERVAL '2' SECOND
) WITH (
    ...
)

一定要同时声明时间戳和水印。触发基于时间的操作需要水印。

SQL字段类型

请参阅数据类型章节,了解如何在SQL中声明类型。

Kafka

介绍

支持:

  • Scan Source: Unbounded无界流
  • Sink: Streaming Append Mode 流式仅追加模式

Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力。

依赖

为了使用Kafka连接器,以下依赖项需要使用自动化构建工具(如Maven或SBT)的项目和SQL客户端与SQL JAR包。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.13.0</version>
</dependency>

注意自己使用的 kafka 和 scala 版本。

Kafka 连接器目前并不包含在 Flink 的二进制发行版中,请查阅
这里了解如何在集群运行中引用 Kafka 连接器。

创建 Kafka 表

以下示例展示如何创建 Kafka 表:

CREATE TABLE KafkaTable (
    `user_id` BIGINT,
    `item_id` BIGINT,
    `behavior` STRING,
    `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_behavior',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'csv'
)

可用的元数据

以下的连接器元数据可以在表定义中通过元数据列的形式获取。

R/W列展示元数据是可读的(R)还是可写的(W)。 只读列必须声明为VIRTUAL以在INSERT INTO操作中排除它们。

数据类型描述R/W
topicSTRING NOT NULLKafka 记录的 Topic 名。R
partitionINT NOT NULLKafka 记录的 partition IDR
headersMAP NOT NULL二进制 Map 类型的 Kafka 记录头(Header)。R/W
leader-epochINT NULLKafka 记录的 Leader epoch(如果可用)。R
offsetBIGINT NOT NULLKafka 记录在 partition 中的 offsetR
timestampTIMESTAMP_LTZ(3) NOT NULLKafka 记录的时间戳。R/W
timestamp-typeSTRING NOT NULLKafka 记录的时间戳类型。可能的类型有 NoTimestampTypeCreateTime(在写入元数据时设置),或 LogAppendTimeR

以下扩展的CREATE TABLE示例展示了使用这些元数据字段的语法:

CREATE TABLE KafkaTable (
    `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
    `partition` BIGINT METADATA VIRTUAL,
    `offset` BIGINT METADATA VIRTUAL,
    `user_id` BIGINT,
    `item_id` BIGINT,
    `behavior` STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_behavior',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'csv'
);

格式元数据信息

连接器可以读取消息格式的元数据。格式元数据的配置键以**value.**作为前缀。

以下示例展示了如何获取 Kafka 和 Debezium 的元数据字段:

CREATE TABLE KafkaTable (
    `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- 获取Debezium格式元数据
    `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- 获取Debezium格式元数据
    `partition_id` BIGINT METADATA FROM 'partition' VIRTUAL,  -- 获取kafka格式元数据
    `offset` BIGINT METADATA VIRTUAL,  -- 获取kafka格式元数据
    `user_id` BIGINT,
    `item_id` BIGINT,
    `behavior` STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_behavior',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'earliest-offset',
    'value.format' = 'debezium-json'
);

连接器参数

参数是否必选从 flink-1.15.x 开始支持
是否可传递
默认值数据类型描述
connector必选(无)String指定使用的连接器,Kafka 连接器使用kafka
topic作为 sink 时必选(无)String当表用作 source 时读取该参数指定取数据的 topic 名。支持用分号间隔的 topic 列表,如topic-1;topic-2
注意,对 source 表而言,topictopic-pattern 两个选项只能使用其中一个。当表被用作 sink 时,该配置表示写入的 topic 名。注意 sink 表不支持 topic 列表。
topic-pattern可选(无)String匹配读取 topic 名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的 topic 都将被 Kafka consumer 订阅。注意,对 source 表而言,topictopic-pattern 两个选项只能使用其中一个。
properties.bootstrap.servers必选(无)String逗号分隔的 Kafka broker 列表。
properties.group.idflink-1.13.x:作为source时必选置
flink-1.14.x:作为 source 时可选,sink 不可用
(无)Stringflink-1.13.x:Kafka source 的 consumer group id,对于 Kafka sink 可选填。
flink-1.14.x:Kafka source 的 consumer group id,如果没有指定 group id,则使用自动生成的KafkaSource-{tableIdentifier}
properties.*可选(无)String可以设置和传递任意 Kafka 的配置项。后缀名必须匹配在Kafka 配置文档中定义的配置键。Flink 将移除 properties. 配置键前缀并将变换后的配置键和值传入底层的 Kafka 客户端。例如,你可以通过**‘properties.allow.auto.create.topics’ = ‘false’**来禁用 topic 的自动创建。但是不支持配置某些配置项,因为 Flink 会覆盖这些配置,例如key.deserializervalue.deserializer
format必选(无)String用来序列化或反序列化 Kafka 消息的格式。 请参阅格式页面以获取更多关于格式的细节和相关配置项。注意:该配置项和value.format二者必需其一。在kafka中,对key和value都可以进行单独的格式配置。
key.format可选(无)String用来序列化和反序列化 Kafka 消息键(Key)的格式。 请参阅格式页面以获取更多关于格式的细节和相关配置项。 注意:如果定义了键格式,则配置项key.fields也是必需的。否则 Kafka 记录将使用空值作为键。
key.fields可选[]List<String>表结构中用来配置消息键(Key)格式数据类型的字段列表。默认情况下该列表为空,因此消息键没有定义。列表格式为field1;field2
key.fields-prefix可选(无)String为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项key.fields都需要使用带前缀的名称。
当构建消息键格式字段时,前缀会被移除,消息键格式将会使用无前缀的名称。请注意该配置项要求必须将value.fields-include配置为EXCEPT_KEY。比如,key字段和value字段重名,就需要配置该参数以区分哪个字段属于key,哪个字段属于value。
value.format必选(无)String序列化和反序列化 Kafka 消息体时使用的格式。 请参阅格式页面以获取更多关于格式的细节和相关配置项。 注意:该配置项和format二者必需其一。
value.fields-include可选ALL枚举类型
可选值:ALLEXCEPT_KEY
定义消息体(Value)格式如何处理消息键(Key)字段的策略。默认情况下使用ALL,表示表结构中所有的字段都会包含在消息体格式中,即消息键字段在消息键和消息体格式中都会出现。
scan.startup.mode可选group-offsetsStringKafka consumer 的启动模式。有效值为:earliest-offsetlatest-offsetgroup-offsetstimestampspecific-offsets。 请参阅下方起始消费偏移量以获取更多细节。
scan.startup.specific-offsets可选(无)String在使用specific-offsets启动模式时为每个 partition 指定 offset,例如partition:0,offset:42;partition:1,offset:300
scan.startup.timestamp-millis可选(无)Long在使用timestamp启动模式时指定启动的时间戳(单位毫秒)。
scan.topic-partition-discovery.interval可选(无)DurationConsumer 定期探测动态创建的 Kafka topic 和 partition 的时间间隔。
sink.partitioner可选‘default’StringFlink partition 到 Kafka partition 的分区映射关系,可选值有:
default:使用 Kafka 默认的分区器对消息进行分区。
fixed:每个 Flink partition 最终对应最多一个 Kafka partition。
round-robin:Flink partition 按轮循(round-robin)的模式对应到 Kafka partition。只有当未指定消息的消息键时生效。
自定义FlinkKafkaPartitioner的子类:例如’org.mycompany.MyPartitioner’。
请参阅下方Sink 分区以获取更多细节。
sink.semantic可选at-least-onceStringflink-1.13.x:定义 Kafka sink 的语义。有效值为’at-least-once’,‘exactly-once’和’none’。请参阅一致性保证以获取更多细节。
flink-1.14.x:过期,请使用sink.delivery-guarantee
从 flink-1.14.x 开始支持
sink.delivery-guarantee
可选at-least-onceString定义 kafka sink 的消息交付语义。可用枚举值为:at-least-onceexactly-oncenone。查看一致性保证来获取更多细节。
从 flink-1.14.x 开始支持
sink.transactional-id-prefix
可选(无)String如果消息交付语义被配置为 exactly-once ,则必须设置该值,作为打开的 kafka 事务标识符的前缀。
sink.parallelism可选(无)Integer定义 Kafka sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。

特性

消息键(Key)与消息体(Value)的格式

Kafka 消息的消息键和消息体部分都可以使用某种格式来序列化或反序列化成二进制数据。

消息体格式

Kafka 的消息键是可选的,以下语句将使用消息体格式读取和写入消息,但不使用消息键格式。
format选项与value.format意义相同。所有的格式配置使用格式识别符作为前缀。

CREATE TABLE KafkaTable (
    `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
    `user_id` BIGINT,
    `item_id` BIGINT,
    `behavior` STRING
) WITH (
    'connector' = 'kafka',
    ...
    'format' = 'json',
    'json.ignore-parse-errors' = 'true'
)

消息体格式将配置为以下的数据类型:

ROW<`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING>

上述数据类型表示,这三个字段值将用于构建消息体内容。

消息键和消息体格式

以下示例展示了如何同时配置和使用消息键和消息体格式。格式配置使用keyvalue加上格式识别符作为前缀。

CREATE TABLE KafkaTable (
    `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
    `user_id` BIGINT,
    `item_id` BIGINT,
    `behavior` STRING
) WITH (
'connector' = 'kafka',
    ...
    
    'key.format' = 'json',
    'key.json.ignore-parse-errors' = 'true',
    'key.fields' = 'user_id;item_id',
    
    'value.format' = 'json',
    'value.json.fail-on-missing-field' = 'false',
    'value.fields-include' = 'ALL'
)

消息键格式包含了在key.fields中列出的字段(使用’;'分隔)和字段顺序。 因此将配置为以下的数据类型:

ROW<`user_id` BIGINT, `item_id` BIGINT>

由于消息体格式配置为**‘value.fields-include’ = ‘ALL’**,所以消息键字段也会出现在消息体格式的数据类型中:

ROW<`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING>

重名的格式字段

如果消息键字段和消息体字段重名,则连接器无法根据表结构信息将这些列区分开。
key.fields-prefix配置项可以在表结构中为每个消息键字段名指定共同的前缀,以和消息值字段名区分开,并在配置消息键格式的时候保留原名。

以下示例展示了在消息键和消息体中同时包含version字段的情况:

CREATE TABLE KafkaTable (
    `k_version` INT,
    `k_user_id` BIGINT,
    `k_item_id` BIGINT,
    `version` INT,
    `behavior` STRING
) WITH (
    'connector' = 'kafka',
    ...
    
    'key.format' = 'json',
    'key.fields-prefix' = 'k_',
    'key.fields' = 'k_version;k_user_id;k_item_id',
    
    'value.format' = 'json',
    'value.fields-include' = 'EXCEPT_KEY'
)

消息体格式必须配置为EXCEPT_KEY模式。格式将被配置为以下的数据类型:

消息键格式:

ROW<`version` INT, `user_id` BIGINT, `item_id` BIGINT>

消息体格式:

ROW<`version` INT, `behavior` STRING>

Topic 和 Partition 的探测

topictopic-pattern配置项决定了 source 消费的 topic 或 topic 的匹配规则。topic配置项可接受使用分号间隔的 topic 列表,例如topic-1;topic-2
topic-pattern配置项使用正则表达式来探测匹配的 topic。例如topic-pattern设置为test-topic-[0-9],则在作业启动时,所有匹配该正则表达式的 topic(以test-topic-开头,以一位数字结尾)都将被 consumer 订阅。

为允许 consumer 在作业启动之后探测到动态创建的 topic,请将scan.topic-partition-discovery.interval配置为一个非负值。这将使 consumer 能够探测匹配名称规则的 topic 和新的 partition。

请参阅Kafka DataStream 连接器
文档以获取更多关于 topic 和 partition 探测的信息。

注意 topic 列表和 topic 匹配规则只适用于 source。对于 sink 端,Flink 目前只支持单一 topic。

起始消费偏移量

scan.startup.mode配置项决定了 Kafka consumer 的启动模式。有效值为:

  • group-offsets:从 Zookeeper/Kafka 中某个指定的消费组已提交的偏移量开始。
  • earliest-offset:从可能的最早偏移量开始。
  • latest-offset:从最新偏移量开始。
  • timestamp:从每个 partition 指定的时间戳开始。
  • specific-offsets:从每个 partition 指定的偏移量开始。

默认值group-offsets表示从 Zookeeper/Kafka 中最近一次已提交的偏移量开始消费。

如果使用了timestamp,必须使用另外一个配置项scan.startup.timestamp-millis来指定一个从格林尼治标准时间 1970 年 1 月 1 日 00:00:00.000 开始计算的毫秒单位时间戳作为起始时间。

如果使用了specific-offsets,必须使用另外一个配置项scan.startup.specific-offsets来为每个 partition 指定起始偏移量,
例如,选项值partition:0,offset:42;partition:1,offset:300表示 partition0从偏移量42开始,partition1从偏移量300开始。

CDC 变更日志(Changelog) Source

Flink 原生支持使用 Kafka 作为 CDC 变更日志(changelog) source。
如果 Kafka topic 中的消息是通过变更数据捕获(CDC)工具从其他数据库捕获的变更事件,则可以使用 CDC 格式将消息解析为 Flink SQL 系统中的插入(INSERT)、更新(UPDATE)、删除(DELETE)消息。

在许多情况下,变更日志(changelog) source 都是非常有用的功能,例如将数据库中的增量数据同步到其他系统,审核日志,数据库的物化视图,时态表关联数据库表的更改历史等。

Flink 提供了几种 CDC 格式:

  • debezium
  • canal
  • maxwell

Sink 分区

配置项sink.partitioner指定了从 Flink 分区到 Kafka 分区的映射关系。
默认情况下,Flink 使用Kafka 默认分区器来对消息进行分区。默认分区器对没有消息键的消息使用粘性分区策略(sticky partition strategy)进行分区,对含有消息键的消息使用 murmur2 哈希算法计算分区。

为了控制数据行到分区的路由,也可以提供自定义的 sink 分区器。fixed 分区器会将同一个 Flink 分区中的消息写入同一个 Kafka 分区,从而减少网络连接的开销。

一致性保证

默认情况下,如果查询在启用 checkpoint 模式下执行时,Kafka sink 按照至少一次(at-lease-once)语义保证将数据写入到 Kafka topic 中。

当 Flink checkpoint 启用时,kafka连接器可以提供精确一次(exactly-once)的语义保证。

除了启用 Flink checkpoint,还可以通过传入对应的sink.semantic选项来选择三种不同的运行模式:

  • none:Flink 不保证任何语义。已经写出的记录可能会丢失或重复。
  • at-least-once(默认设置):保证没有记录会丢失(但可能会重复)。
  • exactly-once:使用 Kafka 事务提供精确一次(exactly-once)语义。当使用事务向 Kafka 写入数据时,
    请将所有从 Kafka 中消费记录的应用中的消费者isolation.level配置项设置成实际所需的值(read_committedread_uncommitted,后者为默认值)。

安全

从 flink-1.14.x 开始支持。

为了开启包括加密和认证的安全配置,你只需要在 table option (也就是 create table with 中的设置)中设置以 properties. 开头的安全配置项即可。
下面的代码片段展示如何配置使用 PLAIN 作为 SASL 机制以及提供 JAAS 配置:

CREATE TABLE KafkaTable (
    `user_id` BIGINT,
    `item_id` BIGINT,
    `behavior` STRING,
    `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
    'connector' = 'kafka',
    ...
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.mechanism' = 'PLAIN',
    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";'
)

下面是更复杂的案例,使用 SASL_SSL 作为安全协议,并且使用 SCRAM-SHA-256 作为 SASL 机制:

CREATE TABLE KafkaTable (
    `user_id` BIGINT,
    `item_id` BIGINT,
    `behavior` STRING,
    `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
    'connector' = 'kafka',
    ...
    'properties.security.protocol' = 'SASL_SSL',
    -- SSL 配置
    -- 配置服务器提供的 truststore CA 的路径
    'properties.ssl.truststore.location' = '/path/to/kafka.client.truststore.jks',
    'properties.ssl.truststore.password' = 'test1234',
    -- 如果要求客户端要求身份认证,则需要配置 keystore (私钥) 路径。
    'properties.ssl.keystore.location' = '/path/to/kafka.client.keystore.jks',
    'properties.ssl.keystore.password' = 'test1234',
    -- SASL 配置
    -- 设置 SASL 机制为 SCRAM-SHA-256。
    'properties.sasl.mechanism' = 'SCRAM-SHA-256',
    -- 配置 JAAS 
    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";'
)

请注意,如果你迁移了 kafka 客户端的依赖 jar 包,则登录模块 sasl.jaas.config 的 class path 可能会不一样,因此你需要重写 JAR 中该模块的 class path。
比如,如果你使用的是 SQL 客户端 JAR ,该 JAR 包已经将 kafka 客户端依赖迁移到了 org.apache.flink.kafka.shaded.org.apache.kafka
所以普通登录模块的路径应该是 org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule

获取更多安全配置的扩展细节,请参考 apache kafka 文档中的安全章节。

Source 按分区 Watermark

Flink 对于 Kafka 支持发送按分区的 watermark。Watermark 在 Kafka consumer 中生成。 按分区 watermark 的合并方式和在流 shuffle 时合并 Watermark 的方式一致。
Source 输出的 watermark 由读取的分区中最小的 watermark 决定。如果 topic 中的某些分区闲置,watermark 生成器将不会向前推进。
可以在表配置中设置table.exec.source.idle-timeout选项来避免上述问题。

请参阅Kafka watermark 策略以获取更多细节。

数据类型映射

Kafka 将消息键值以二进制进行存储,因此 Kafka 并不存在 schema 或数据类型。
Kafka 消息使用格式配置进行序列化和反序列化,例如 csvjsonavro。 因此,数据类型映射取决于使用的格式。请参阅格式页面以获取更多细节。

Upsert Kafka

介绍

支持:

  • Scan Source: Unbounded
  • Sink: Streaming Upsert Mode

Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。

作为 source,upsert-kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。
更准确地说,如果有这个 key,则数据记录中的 value 被解释为同一 key 的最后一个 value 的 UPDATE,如果不存在相应的 key,则该更新被视为 INSERT
用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同 key 的现有行都会被覆盖。
另外,value 为空的消息将会被视作为 DELETE 消息。

作为 sink,upsert-kafka 连接器可以消费 changelog 流。
它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。
Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。

依赖

为了使用Upsert Kafka连接器,以下依赖项需要在使用自动化构建工具(如Maven或SBT)的项目和SQL客户端导入。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.13.0</version>
</dependency>

注意自己所使用的 kafka 和 scala 版本。

完整示例

下面的示例展示了如何创建和使用 Upsert Kafka 表:

CREATE TABLE pageviews_per_region (
    user_region STRING,
    pv BIGINT,
    uv BIGINT,
PRIMARY KEY (user_region) NOT ENFORCED -- 定义主键,主键字段可以有多个,这些字段共同组成key的字段
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'pageviews_per_region',
    'properties.bootstrap.servers' = '...',
    'key.format' = 'avro',
    'value.format' = 'avro'
);

CREATE TABLE pageviews (
    user_id BIGINT,
    page_id BIGINT,
    view_time TIMESTAMP,
    user_region STRING,
    WATERMARK FOR viewtime AS view_time - INTERVAL '2' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'page_views',
    'properties.bootstrap.servers' = '...',
    'format' = 'json'
);

-- 计算 pv、uv 并插入到 upsert-kafka sink
INSERT INTO pageviews_per_region
SELECT
    user_region,
    COUNT(*),
    COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;

注意确保在 DDL 中定义主键。

可用的元数据

查看常规kafka连接器以获取可以用元数据信息。

连接器参数

参数是否必选默认值数据类型描述
connector必选(none)String指定要使用的连接器,Upsert Kafka 连接器使用:upsert-kafka
topic必选(none)String用于读取和写入的 Kafka topic 名称。
properties.bootstrap.servers必选(none)String以逗号分隔的 Kafka brokers 列表。
properties.*可选(none)String该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。 Flink 会自动移除 选项名中的 properties. 前缀,并将转换后的键名以及值传入 KafkaClient。 例如,你可以通过设置 ‘properties.allow.auto.create.topics’ = ‘false’ 来禁止自动创建 topic。 但是,某些选项,例如key.deserializervalue.deserializer 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。
key.format必选(none)String用于对 Kafka 消息中 key 部分序列化和反序列化的格式。key 字段由 PRIMARY KEY 语法指定。支持的格式包括 csvjsonavro。请参考格式页面以获取更多详细信息和格式参数。
key.fields-prefix可选(none)String为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 key.fields 都需要使用带前缀的名称。 当构建消息键格式字段时,前缀会被移除,消息键格式将会使用无前缀的名称。请注意该配置项要求必须将 value.fields-include 配置为 EXCEPT_KEY。比如,key字段和value字段重名,就需要配置该参数以区分哪个字段属于key,哪个字段属于value。
value.format必选(none)String用于对 Kafka 消息中 value 部分序列化和反序列化的格式。支持的格式包括 csvjsonavro。请参考格式页面以获取更多详细信息和格式参数。
value.fields-include必选‘ALL’String控制哪些字段应该出现在 value 中。可取值:
ALL:消息的 value 部分将包含 schema 中所有的字段,包括定义为主键的字段。
EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。
sink.parallelism可选(none)Integer定义 upsert-kafka sink 算子的并行度。默认情况下,由框架确定并行度,与上游链接算子的并行度保持一致。
sink.buffer-flush.max-rows可选0Integer缓存刷新前,最多能缓存多少条记录。当 sink 收到很多同 key 上的更新时,缓存将保留同 key 的最后一条记录,因此 sink 缓存能帮助减少发往 Kafka topic 的数据量,以及避免发送不必要的删除消息。可以通过设置为 0 来禁用它。默认不开启。注意,如果要开启 sink 缓存,需要同时设置 sink.buffer-flush.max-rowssink.buffer-flush.interval 两个选项为大于零的值。
sink.buffer-flush.interval可选0Duration缓存刷新的间隔时间,超过该时间后异步线程将刷新缓存数据。当 sink 收到很多同 key 上的更新时,缓存将保留同 key 的最后一条记录,因此 sink 缓存能帮助减少发往 Kafka topic 的数据量,以及避免发送不必要的删除消息。 可以通过设置为 0 来禁用它。默认不开启。注意,如果要开启 sink 缓存,需要同时设置 sink.buffer-flush.max-rowssink.buffer-flush.interval 两个选项为大于零的值。

特性

键(key)和值(value)的格式

查看常规kafka连接器以获取键和值的格式的详细信息。注意,该连接器要求必须同时指定键和值的格式,键字段通过主键 PRIMARY KEY 约束语法派生。

下面的例子展示如何同时指定和配置键和值的格式。格式选项通过key或者value为前缀来作为格式的选项标识符。

CREATE TABLE KafkaTable (
    `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
    `user_id` BIGINT,
    `item_id` BIGINT,
    `behavior` STRING,
    PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    ...
    
    'key.format' = 'json',
    'key.json.ignore-parse-errors' = 'true',
    
    'value.format' = 'json',
    'value.json.fail-on-missing-field' = 'false',
    'value.fields-include' = 'EXCEPT_KEY'
)

主键约束

Upsert Kafka 始终以 upsert 方式工作,并且需要在 DDL 中定义主键。
在具有相同主键值的消息按序存储在同一个分区的前提下,在 changelog source 定义主键,意味着在物化后的 changelog 上主键具有唯一性。定义的主键将决定哪些字段出现在 Kafka 消息的 key 中。

一致性保证

默认情况下,如果启用 checkpoint,Upsert Kafka sink 会保证至少一次将数据插入 Kafka topic。
这意味着,Flink 可以将具有相同 key 的重复记录写入 Kafka topic。
但由于该连接器以 upsert 的模式工作,该连接器作为 source 读入时,可以确保具有相同主键值下仅最后一条消息会生效。因此,upsert-kafka 连接器可以像 HBase sink 一样实现幂等写入。

为每个分区生成相应的 watermark

Flink 支持根据 Upsert Kafka 的每个分区的数据特性发送相应的 watermark。
当使用这个特性的时候,watermark 是在 Kafka consumer 内部生成的。合并每个分区生成的 watermark 的方式和 stream shuffle 的方式是一致的。
数据源产生的 watermark 是取决于该 consumer 负责的所有分区中当前最小的 watermark。
如果该 consumer 负责的部分分区是闲置的,则整体的 watermark 并不会前进。在这种情况下,可以通过设置合适的 table.exec.source.idle-timeout 来缓解这个问题。

如想获得更多细节,请查阅 Kafka 水印策略.

数据类型映射

Upsert Kafka 用字节存储消息的 key 和 value,因此没有 schema 或数据类型。
消息按格式进行序列化和反序列化,例如:csvjsonavro。因此数据类型映射表由指定的格式确定。请参考格式页面以获取更多详细信息。

JDBC

介绍

支持:

  • Scan Source: Bounded
  • Lookup Source: Sync Mode
  • Sink: Batch Sink:
  • Streaming Append & Upsert Mode

JDBC 连接器允许使用 JDBC 驱动向任意类型的关系型数据库读取或者写入数据。本文档描述了针对关系型数据库如何通过建立 JDBC 连接器来执行 SQL 查询。

如果在 DDL 中定义了主键,则JDBC sink 将以 upsert 模式与外部系统交换 UPDATE/DELETE 消息;否则,它将以 append 模式与外部系统交换消息且不支持消费 UPDATE/DELETE 消息。

依赖

为了使用JDBC连接器,以下依赖项对于使用自动化构建工具(如Maven或SBT)的项目和带有SQL JAR包的SQL Client都是必需的。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.11</artifactId>
    <version>1.13.0</version>
</dependency>

注意自己使用的 flink 和 scala 版本。

JDBC 连接器并不是发布版的一部分,需要自己手动添加依赖。

在连接到具体数据库时,也需要对应的驱动依赖,目前支持的驱动如下:

DriverGroup IdArtifact IdJAR
MySQLmysqlmysql-connector-java下载
从flink-1.15.x开始支持
Oracle
com.oracle.database.jdbcojdbc8下载
PostgreSQLorg.postgresqlpostgresql下载
Derbyorg.apache.derbyderby下载

当前,JDBC 连接器和驱动不在 Flink 二进制发布包中,请参阅这里了解在集群上执行时何连接它们。

创建 JDBC 表

JDBC table 可以按如下定义:

-- 在 Flink SQL 中注册一张 MySQL 表 'users'
CREATE TABLE MyUserTable (
    id BIGINT,
    name STRING,
    age INT,
    status BOOLEAN,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3306/mydatabase',
    'table-name' = 'users'
);

-- 从另一张表 "T" 将数据写入到 JDBC 表中
INSERT INTO MyUserTable
SELECT id, name, age, status FROM T;

-- 查看 JDBC 表中的数据
SELECT id, name, age, status FROM MyUserTable;

-- JDBC 表在时态表关联中作为维表
SELECT * FROM myTopic
LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = MyUserTable.id;

连接器参数

参数要求从 flink-1.15.x 开始支持
是否可传递
默认值类型描述
connector必填(none)String指定使用什么类型的连接器,这里应该是jdbc
url必填(none)StringJDBC 数据库 url
table-name必填(none)String连接到 JDBC 表的名称。
driver可选(none)String用于连接到此 URL 的 JDBC 驱动类名,如果不设置,将自动从 URL 中推导。
username可选(none)StringJDBC 用户名。如果指定了 usernamepassword 中的任一参数,则两者必须都被指定。
password可选(none)StringJDBC 密码。
connection.max-retry-timeout可选60sDuration最大重试超时时间,以秒为单位且不应该小于 1 秒。
scan.partition.column可选(none)String用于将输入数据进行分区的列名。请参阅下面的分区扫描部分了解更多详情。
scan.partition.num可选(none)Integer分区数。
scan.partition.lower-bound可选(none)Integer第一个分区的最小值。
scan.partition.upper-bound可选(none)Integer最后一个分区的最大值。
scan.fetch-size可选0Integer每次循环读取时应该从数据库中获取的行数。如果指定的值为 0,则该配置项会被忽略。
scan.auto-commit可选trueBoolean在 JDBC 驱动程序上设置 auto-commit 标志, 它决定了每个语句是否在事务中自动提交。有些 JDBC 驱动程序,特别是 Postgres,可能需要将此设置为 false 以便流化结果。
lookup.cache.max-rows可选(none)Integerlookup cache 的最大行数,若超过该值,则最老的行记录将会过期。 默认情况下,lookup cache 是未开启的。请参阅下面的 Lookup Cache 部分了解更多详情。
lookup.cache.ttl可选(none)Durationlookup cache 中每一行记录的最大存活时间,若超过该时间,则最老的行记录将会过期。 默认情况下,lookup cache 是未开启的。请参阅下面的 Lookup Cache 部分了解更多详情。
从flink-1.15.x开始支持
lookup.cache.caching-missing-key
可选trueBoolean是否缓存未命中的 key ,默认为 true
lookup.max-retries可选3Integer查询数据库失败的最大重试时间。
sink.buffer-flush.max-rows可选100Integerflush 前缓存记录的最大值,可以设置为 0 来禁用它。
sink.buffer-flush.interval可选1sDurationflush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 0 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 sink.buffer-flush.max-rows 设置为 0 并配置适当的 flush 时间间隔。
sink.max-retries可选3Integer写入记录到数据库失败后的最大重试次数。
sink.parallelism可选(none)Integer用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。

特性

键处理

当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作。

upsert 模式下,Flink 将根据主键判断是插入新行还是更新已存在的行,这种方式可以确保幂等性。
为了确保输出结果是符合预期的,推荐为表定义主键并且确保主键是底层数据库中表的唯一键或主键。
在 append 模式下,Flink 会把所有记录解释为 INSERT 消息,如果违反了底层数据库中主键或者唯一约束,INSERT 插入可能会失败。

有关 PRIMARY KEY 语法的更多详细信息,请参见 CREATE TABLE DDL

分区扫描

为了在并行 Source task 实例中加速读取数据,Flink 为 JDBC table 提供了分区扫描的特性。

如果下述分区扫描参数中的任一项被指定,则下述所有的分区扫描参数必须都被指定。
这些参数描述了在多个 task 并行读取数据时如何对表进行分区。 scan.partition.column 必须是相关表中的数字、日期或时间戳列。
注意,scan.partition.lower-boundscan.partition.upper-bound 用于决定分区的起始位置和结束位置,以过滤表中的数据。
如果是批处理作业,也可以在提交 flink 作业之前获取最大值和最小值。

  • scan.partition.column:用于进行分区的列名。
  • scan.partition.num:分区数。
  • scan.partition.lower-bound:第一个分区的最小值。
  • scan.partition.upper-bound:最后一个分区的最大值。

注意

最大值和最小值设置,对于数据库中的时间和日期字段,该值应该使用毫秒值,而不是字符串。

flink会根据分区数对整个分区范围进行切分,然后将切分后的所有区间分配给所有source并行度。
flink生成的查询sql语句最后面会添加 where 分区字段过滤条件,使用 between 关键字。

Lookup Cache

JDBC 连接器可以在时态表关联中作为一个可 lookup 的 source (又称为维表),当前只支持同步的查找模式。

默认情况下,lookup cache 是未启用的,可以通过设置 lookup.cache.max-rowslookup.cache.ttl 参数来启用。

lookup cache 的主要目的是用于提高时态表关联 JDBC 连接器的性能。
默认情况下,lookup cache 不开启,所以所有请求都会发送到外部数据库。当 lookup cache 被启用时,每个进程(即 TaskManager)将维护一个缓存。
Flink 将优先查找缓存,只有当缓存未查找到时才向外部数据库发送请求,并使用返回的数据更新缓存。
当缓存命中最大缓存行 lookup.cache.max-rows 或当行超过最大存活时间 lookup.cache.ttl 时,缓存中最老的行将被设置为已过期。
缓存中的记录可能不是最新的,用户可以将 lookup.cache.ttl 设置为一个更小的值以获得更新的刷新数据,但这可能会增加发送到数据库的请求数。所以要做好吞吐量和正确性之间的平衡。

flink-1.15.x:默认情况下,flink 会缓存主键查询为空的结果,你可以设置参数 lookup.cache.caching-missing-keyfalse 来改变这个行为。

幂等写入

如果在 DDL 中定义了主键,JDBC sink 将使用 upsert 语义而不是普通的 INSERT 语义。upsert 语义指的是如果数据主键值在数据库中已存在,则更新现有行;如果不存在,则插入新行,这种方式确保了幂等性。

如果出现故障,Flink 作业会从上次成功的 checkpoint 恢复并重新处理,但这可能导致在恢复过程中重复处理消息。强烈推荐使用 upsert 模式,因为如果需要重复处理记录,它有助于避免违反数据库主键约束和产生重复数据。

除了故障恢复场景外,数据源(kafka topic)也可能随着时间的推移自然地包含多个具有相同主键的记录,这使得 upsert 模式是符合用户期待的。

upsert 没有标准的语法,下表描述了不同数据库的 DML 语法:

DatabaseUpsert Grammar
MySQLINSERT … ON DUPLICATE KEY UPDATE …
从flink-1.15.x开始支持
Oracle
MERGE INTO … USING (…) ON (…)
WHEN MATCHED THEN UPDATE SET (…)
WHEN NOT MATCHED THEN INSERT (…)
VALUES (…)
PostgreSQLINSERT … ON CONFLICT … DO UPDATE SET …

Postgres 数据库作为 Catalog

注意,该特性只支持到 flink-1.14.x ,在 flink-1.15.x 中,将 postgremysql 合并到一起实现了。

JdbcCatalog 允许用户通过 JDBC 协议将 Flink 连接到关系数据库。

目前,PostgresCatalog 是 JDBC Catalog 的唯一实现,PostgresCatalog 只支持有限的 Catalog 方法,包括:

// Postgres Catalog 支持的方法
PostgresCatalog.databaseExists(String databaseName)
PostgresCatalog.listDatabases()
PostgresCatalog.getDatabase(String databaseName)
PostgresCatalog.listTables(String databaseName)
PostgresCatalog.getTable(ObjectPath tablePath)
PostgresCatalog.tableExists(ObjectPath tablePath)

其他的 Catalog 方法现在还是不支持的。

PostgresCatalog 的使用

请参阅 Dependencies 部分了解如何配置 JDBC 连接器和 Postgres 驱动。

Postgres catalog 支持以下参数:

  • name:必填,catalog 的名称。
  • default-database:必填,默认要连接的数据库。
  • username:必填,Postgres 账户的用户名。
  • password:必填,账户的密码。
  • base-url:必填,应该符合 jdbc:postgresql://<ip>:<port> 的格式,同时这里不应该包含数据库名。

SQL

CREATE CATALOG mypg WITH(
    'type' = 'jdbc',
    'default-database' = '...',
    'username' = '...',
    'password' = '...',
    'base-url' = '...'
);

USE CATALOG mypg;

PostgresSQL 元空间映射

除了数据库之外,postgresSQL 还有一个额外的命名空间 schema。一个 Postgres 实例可以拥有多个数据库,每个数据库可以拥有多个 schema,其中一个 schema 默认名为 public,每个 schema 可以包含多张表。
在 Flink 中,当查询由 Postgres catalog 注册的表时,用户可以使用 schema_name.table_name 或只写 table_name,其中 schema_name 是可选的,默认值为 public

因此,Flink Catalog 和 Postgres 之间的元空间映射如下:

Flink Catalog 元空间结构Postgres 元空间结构
catalog 名称 (只能在flink中定义)N/A
database 名称database 名称
table 名称[schema_name.]table_name

Flink 中的 Postgres 表的完整路径应该是:

<catalog>.<db>.`<schema.table>`

如果指定了 schema,请注意需要转义<schema.table>,也就是使用转义字符 ` 将其括起来。

这里提供了一些访问 Postgres 表的例子:

-- 扫描 'public' schema(即默认 schema)中的 'test_table' 表,schema 名称可以省略
SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;

-- 扫描 'custom_schema' schema 中的 'test_table2' 表,
-- 自定义 schema 不能省略,并且必须与表一起转义。
SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;

JDBC Catalog

从 flink-1.15.x 开始支持,并且将 postgre 和 mysql 合并到了一起实现。

JdbcCatalog 允许用户使用 flink 通过 JDBC 协议去连接关系型数据库。

目前已经有两个 JDBC catalog 的实现,Postgres CatalogMySQL Catalog 。他们支持下面的 catalog 函数,其他函数目前还不支持。

// Postgres 和 MySQL Catalog 支持的函数
databaseExists(String databaseName);
listDatabases();
getDatabase(String databaseName);
listTables(String databaseName);
getTable(ObjectPath tablePath);
tableExists(ObjectPath tablePath);

其他的 catalog 函数目前还不支持。

使用 JDBC catalog

该章节描述怎么创建并使用 Postgres Catalog 或 MySQL Catalog 。怎么添加 JDBC 连接器和相关的驱动,请参考上面的依赖章节。

JDBC catalog 支持下面的选项配置:

  • name:必选,catalog的名称。
  • default-database:必选,连快将诶的默认数据库。
  • username:必选,Postgres/MySQL 账户的用户名。
  • password:必选,账户的密码。
  • base-url:必选,不要包含数据库名称。
    • 对于 Postgres Catalog ,该参数应该为:jdbc:postgresql://<ip>:<port>
    • 对于 MySQL Catalog ,该参数应该为:jdbc:mysql://<ip>:<port>

SQL:

CREATE CATALOG my_catalog WITH(
    'type' = 'jdbc',
    'default-database' = '...',
    'username' = '...',
    'password' = '...',
    'base-url' = '...'
);

USE CATALOG my_catalog;

JDBC Catalog for PostgreSQL

PostgreSQL 元空间映射。

PostgreSQL 基于数据库有一个额外的命名空间作为 schema 。一个 Postgres 示例可以有多个数据库,每个数据可以有多个 schema ,默认的 schema 名为 public ,每个 schema 可以有多张表。
在 flink 中,当查询注册到 Postgres catalog 中的表时,用户可以使用 schema_name.table_name 或者是只使用 table_name 。schema_name 是可选的,默认为 public

在 flink catalog 和 Postgres 之间的元空间映射如下:

Flink Catalog 元空间结构Postgres 元空间结构
catalog name (只能在 flink 中定义)N/A
database namedatabase name
table name[schema_name.]table_name

Flink 中的 Postgres 表的完整路径应该是:

<catalog>.<db>.`<schema.table>`

如果指定了 schema,请注意需要转义<schema.table>,也就是使用转义字符 ` 将其括起来。

下面是一些访问 Postgres 表的案例:

-- 浏览名为 'public' 的 schema 下的 'test_table' 表,使用默认的 schema,schema 名称可以被省略。
SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;

-- 浏览名为 'custom_schema' 的 schema 下的 'test_table2' 表,自定义 schema 不能被省略,而且必须和表一起被转义。
SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;

JDBC Catalog for MySQL

MySQL 元空间映射。

MySQL 实例中的数据库和注册到 MySQL catalog 中的数据库有相同的映射级别。一个 MySQL 实例可以有多个数据库,每个数据库可以有多张表。
在 flink 中,当查询注册到 MySQL catalog 中的表时,用户可以使用 database.table_name 或者只指定 table_name 。默认的数据库名为创建 MySQL Catalog 时指定的默认数据库。

flink Catalog 和 MySQL Catalog 之间的元空间映射关系如下:

Flink Catalog Metaspace StructureMySQL Metaspace Structure
catalog name (只能在 flink 中定义)N/A
database namedatabase name
table nametable_name

flink 中的 MySQL 表的完全路径应该为:

`<catalog>`.`<db>`.`<table>`

下面是一些访问 MySQL 表的案例:

-- 浏览 'test_table' 表,默认数据库为 'mydb'
SELECT * FROM mysql_catalog.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;

-- 浏览指定数据库下的 'test_table' 表。
SELECT * FROM mysql_catalog.given_database.test_table2;
SELECT * FROM given_database.test_table2;

数据类型映射

Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQLPostgresSQLDerby 等。
其中,Derby 通常是用于测试目的。下表列出了从关系数据库数据类型到 Flink SQL 数据类型的类型映射,映射表可以使得在 Flink 中定义 JDBC 表更加简单。

MySQL typePostgreSQL typeFlink SQL type
TINYINTTINYINT
SMALLINT
TINYINT UNSIGNED
SMALLINT
INT2
SMALLSERIAL
SERIAL2
SMALLINT
INT
MEDIUMINT
SMALLINT UNSIGNED
INTEGER
SERIAL
INT
BIGINT
INT UNSIGNED
BIGINT
BIGSERIAL
BIGINT
BIGINT UNSIGNEDDECIMAL(20, 0)
BIGINTBIGINTBIGINT
FLOATREAL
FLOAT4
FLOAT
DOUBLE
DOUBLE PRECISION
FLOAT8
DOUBLE PRECISION
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
NUMERIC(p, s)
DECIMAL(p, s)
DECIMAL(p, s)
BOOLEAN
TINYINT(1)
BOOLEANBOOLEAN
DATEDATEDATE
TIME [§]TIME [§] [WITHOUT TIMEZONE]TIME [§] [WITHOUT TIMEZONE]
DATETIME [§]TIMESTAMP [§] [WITHOUT TIMEZONE]TIMESTAMP [§] [WITHOUT TIMEZONE]
CHAR(n)

VARCHAR(n)
TEXT
CHAR(n)
CHARACTER(n)
VARCHAR(n)
CHARACTER VARYING(n)
TEXT
STRING
BINARY
VARBINARY
BLOB
BYTEABYTES
ARRAYARRAY

Elasticsearch

介绍

支持:

  • Sink: Batch
  • Sink: Streaming Append & Upsert Mode

Elasticsearch 连接器允许将数据写入到 Elasticsearch 引擎的索引中。

连接器可以工作在 upsert 模式下,使用 DDL 中定义的主键与外部系统交换 UPDATE/DELETE 消息。

如果 DDL 中没有定义主键,则连接器只能工作在 append 模式,只能与外部系统交换 INSERT 消息。

依赖

为了使用Elasticsearch连接器,以下依赖项对于使用自动化构建工具(如Maven或SBT)的项目和带有SQL JAR包的SQL Client都是必需的。

6.x

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
  <version>1.13.0</version>
</dependency>	

7.x and later versions

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
  <version>1.13.0</version>
</dependency>

注意自己使用的 flink 和 scala 版本。

创建 Elasticsearch 表

以下示例展示如何创建 Elasticsearch sink 表:

CREATE TABLE myUserTable (
    user_id STRING,
    user_name STRING
    uv BIGINT,
    pv BIGINT,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'http://localhost:9200',
    'index' = 'users'
);

连接器参数

参数是否必选从 flink-1.15.x 开始支持
是否可传递
默认值数据类型描述
connector必选(none)String指定要使用的连接器,有效值为:
elasticsearch-6:连接到 Elasticsearch 6.x 的集群。
elasticsearch-7:连接到 Elasticsearch 7.x 及更高版本的集群。
hosts必选(none)String要连接到的一台或多台 Elasticsearch 主机,例如 http://host_name:9092;http://host_name:9093
index必选(none)StringElasticsearch 中每条记录的索引。可以是一个静态索引(例如 myIndex)或一个动态索引(例如 index-{log_ts|yyyy-MM-dd})。 更多详细信息,请参见下面的动态索引部分。
document-type6.x 版本中必选6.x:是(none)StringElasticsearch 文档类型。在 elasticsearch-7 中不再需要。
document-id.key-delimiter可选_String复合键的分隔符(默认为**_),例如,指定为 ∗ ∗ 将导致文档 I D 为 ∗ ∗ K E Y 1 **将导致文档 ID 为**KEY1 将导致文档IDKEY1KEY2$KEY3**。
username可选(none)String用于连接 Elasticsearch 实例的用户名。请注意,Elasticsearch 没有预绑定安全特性,但你可以通过如下指南启用它来保护 Elasticsearch 集群。
password可选(none)String用于连接 Elasticsearch 实例的密码。如果配置了username,则此选项也必须配置为非空字符串。
failure-handler可选failString对 Elasticsearch 请求失败情况下的失败处理策略。有效策略为:
fail:如果请求失败并因此导致作业失败,则抛出异常。
ignore:忽略失败并放弃请求。
retry-rejected:重新添加由于队列容量饱和而失败的请求。
自定义类名称:使用 ActionRequestFailureHandler 的子类进行失败处理。
sink.flush-on-checkpoint可选trueBoolean是否在 checkpoint 时执行 flush。禁用后,在 checkpoint 时 sink 将不会等待所有的 pending 请求被 Elasticsearch 确认。因此,sink 不会为请求的 at-least-once 交付提供任何有力保证。
sink.bulk-flush.max-actions可选1000Integer每个批量请求的最大缓冲操作数。 可以设置为0来禁用它。
sink.bulk-flush.max-size可选2mbMemorySize每个批量请求的缓冲操作在内存中的最大值。单位必须为 MB。 可以设置为0来禁用它。
sink.bulk-flush.interval可选1sDurationflush 缓冲操作的间隔。 可以设置为0来禁用它。注意,sink.bulk-flush.max-sizesink.bulk-flush.max-actions都设置为0的这种 flush 间隔设置允许对缓冲操作进行完全异步处理。
sink.bulk-flush.backoff.strategy可选DISABLEDString指定在由于临时请求错误导致任何 flush 操作失败时如何执行重试。有效策略为:
DISABLED:不执行重试,即第一次请求错误后失败。
CONSTANT:以指定的重试延迟时间间隔来进行重试。
EXPONENTIAL:先等待回退延迟,然后在重试之间指数递增延迟时间。
sink.bulk-flush.backoff.max-retries可选flink-1.13.x:8
flink-1.15.x:(none)
Integer最大回退重试次数。
sink.bulk-flush.backoff.delay可选flink-1.13.x:50ms
flink-1.15.x:(none)
Duration每次回退尝试之间的延迟。对于 CONSTANT 回退策略,该值是每次重试之间的延迟。对于 EXPONENTIAL 回退策略,该值是初始的延迟。
在 flink-1.15.x 中被删除
connection.max-retry-timeout
可选(none)Duration最大重试超时时间。
connection.path-prefix可选(none)String添加到每个 REST 通信中的前缀字符串,例如,**/v
从 Flink-1.15.x 开始支持
connection.request-timeout
可选(none)Duration请求连接的超时时间,单位:毫秒,数值必须大于等于0。设置为0,表示超时时间无限大。
从 Flink-1.15.x 开始支持
connection.timeout
可选(none)Duration建立连接使用的超时时间,单位:毫秒,数值必须大于等于0。设置为0,表示超时时间无限大。
从 Flink-1.15.x 开始支持
socket.timeout
可选(none)Durationsocket 等待数据的超时时间(SO_TIMEOUT),换句话说,两个连续数据包之间不活跃的最大时间,数值必须大于等于0。设置为0,表示超时时间无限大。
format可选jsonStringElasticsearch 连接器支持指定格式。该格式必须生成一个有效的 json 文档。 默认使用内置的 json 格式。更多详细信息,请参阅 JSON Format 页面。

特性

Key 处理

Elasticsearch sink 可以根据是否定义了主键来确定是在 upsert 模式还是 append 模式下工作。
如果定义了主键,Elasticsearch sink 将以 upsert 模式工作,该模式可以消费包含 UPDATE/DELETE 的消息。 如果未定义主键,Elasticsearch sink 将以 append 模式工作,该模式只能消费包含 INSERT 的消息。

在 Elasticsearch 连接器中,主键用于计算 Elasticsearch 的文档 id,文档 id 为最多 512 字节且不包含空格的字符串。
Elasticsearch 连接器通过使用 document-id.key-delimiter 指定的键分隔符按照 DDL 中定义的顺序连接所有主键字段,为每一行记录生成一个文档 ID 字符串。
某些类型不允许作为主键字段,因为它们没有对应的字符串表示形式,例如,BYTESROWARRAYMAP 等。 如果未指定主键,Elasticsearch 将自动生成文档 id。

有关 PRIMARY KEY 语法的更多详细信息,请参见 CREATE TABLE DDL

动态索引

Elasticsearch sink 同时支持静态索引和动态索引。

如果想使用静态索引,则 index 选项值应为纯字符串,例如 myusers,所有记录都将被写入到 myusers 索引中。

如果想使用动态索引,你可以使用 {field_name} 来引用记录中的字段值来动态生成目标索引。
你也可以使用 {field_name|date_format_string}TIMESTAMP/DATE/TIME 类型的字段值转换为 date_format_string 指定的格式。

date_format_string 与 Java 的 DateTimeFormatter 兼容。
例如,如果选项值设置为 myusers-{log_ts|yyyy-MM-dd},则 log_ts 字段值为 2020-03-27 12:25:55 的记录将被写入到 myusers-2020-03-27 索引中。

数据类型映射

Elasticsearch 将文档存储在 JSON 字符串中。因此数据类型映射介于 Flink 数据类型和 JSON 数据类型之间。
Flink 为 Elasticsearch 连接器使用内置的 json 格式。更多类型映射的详细信息,请参阅 JSON Format 页面。

FileSystem

介绍

该连接器通过Flink FileSystem abstraction提供对文件系统中分区文件的访问。

文件系统连接器被包含在flink中,不需要添加额外的依赖即可使用。从文件系统中读写行数据,需要相对应的定义格式。

文件系统连接器允许从本地或分布式文件系统读取或写入数据,下面是一个文件系统表的定义:

CREATE TABLE MyUserTable (
  column_name1 INT,
  column_name2 STRING,
  ...
  part_name1 INT,
  part_name2 STRING
)
PARTITIONED BY (part_name1, part_name2)
WITH (
  'connector' = 'filesystem',                   -- 必填: 指定连接器名称
  'path' = 'file:///path/to/whatever',          -- 必填: 目录路径
  'format' = '...',                             -- 必填: 文件系统连接器要求指定一个format格式化
  'partition.default-name' = '...',             -- 可选: 如果动态分区字段值为null/空字符串,则使用指定的默认分区名称
  'sink.shuffle-by-partition.enable' = '...',   --可选:在sink阶段开启对动态分区文件数据的shuffle,开启之后可以减少写出文件的数量,但是有可能造成数据倾斜。默认为false。
  ...
);

请确保包含了flink文件系统需要的依赖。

文件系统的流处理source:

flink-1.13.x:文件系统的流处理source还在开发中。未来,社区将会增加对流处理的支持,比如分区和目录监控。
flink-1.15.x:文件系统的流处理source已经实现。

该文件系统连接器和以前传统的文件系统连接器有很多不同:path参数指定的是目录而不是一个文件,而且你无法获取指定路径中你声明的一个可读文件。

分区文件

文件系统分区支持使用标准的hive format格式,而且,它不要求分区被预注册在表的catalog中。分区通过目录结构来进行发现和推断。比如,下面基于目录的表分区将会被推断为包含日期和小时分区。

path
└── datetime=2019-08-25
    └── hour=11
        ├── part-0.parquet
        ├── part-1.parquet
    └── hour=12
        ├── part-0.parquet
└── datetime=2019-08-26
    └── hour=6
        ├── part-0.parquet

文件系统表同时支持插入和覆盖。查看INSERT Statement章节。当使用insert overwrite覆盖一个分区表时,只有相关联的分区被覆盖,而不是整张表。

文件format

文件系统连接器支持多种format格式:

  • CSV: RFC-4180. 未压缩
  • JSON: 注意,文件系统的JSON格式并不是标准的JSON文件,而是未压缩的newline delimited JSON。
  • Avro: Apache Avro. 支持通过配置avro.codec来支持压缩。
  • Parquet: Apache Parquet. 兼容Hive.
  • Orc: Apache Orc. 兼容Hive.
  • Debezium-JSON: debezium-json.
  • Canal-JSON: canal-json.
  • Raw: raw.

Source

从 flink-1.14.x 开始支持。

file system 连接器在单个表中可以被用于读取单个文件,或者是整个目录。

当使用目录作为 source 路径时,目录中的文件并没有定义好的读取顺序。

目录监控

从 flink-1.15.x 开始支持。

默认情况下,file system 连接器是有界的,该连接器只会读取一次配置的目录,然后关闭它。

你可以通过配置 option source.monitor-interval 选项配置持续的目录监控:

Key默认值类型描述
source.monitor-interval(none)Durationsource 检查新文件的时间间隔,该数值必须大于0。每个文件都会使用他们自己的路径作为唯一标识符,并且在被发现后处理一次。已经被处理过的文件集合会在整个 source 的生命周期内被保存到 state 中,因此他们和 source state 一起被持久化到 checkpoint 和 savepoint 中。
更小的时间间隔意味着文件会更快被发现,但是会对文件系统或对象存储进行更频繁的文件列出或目录遍历。如果没有配置该选项,则提供的路径将只会被扫描一次,此时该 source 将会是有界的。

可用元数据

从 flink-1.15.x 开始支持。

下面的连接器元数据可以通过被定义为表的元数据字段来访问,所有的元数据都是只读的。

Key数据类型描述
file.pathSTRING NOT NULL输入文件的路径
file.nameSTRING NOT NULL文件名称,他是距离文件路径根目录最远的元素。
file.sizeBIGINT NOT NULL文件的字节数。
file.modification-timeTIMESTAMP_LTZ(3) NOT NULL文件的修改时间。

下面的代码片段展示了 CREATE TABLE 案例如何访问元数据属性:

CREATE TABLE MyUserTableWithFilepath (
    column_name1 INT,
    column_name2 STRING,
    `file.path` STRING NOT NULL METADATA
) WITH (
    'connector' = 'filesystem',
    'path' = 'file:///path/to/whatever',
    'format' = 'json'
)

Streaming Sink

文件系统连接器基于Streaming File Sink 写入记录到文件以支持文件系统连接器流式写入。行编码格式支持csvjson。块编码格式支持parquetorcavro

可以通过sql直接写入,插入流数据到不分区的表中。如果是分区表,可以配置分区关联操作。具体查看下面的分区提交章节。

滚动策略

数据通过分区目录会被切分为多个文件。每个分区将包含其对应sink子任务接收到数据之后写入的至少一个文件,正在处理的文件将会根据配置的滚动策略来关闭并成为分区中的一个文件。
文件的滚动策略基于大小、文件可以被打开的最大超时时间间隔来配置。

Key默认值从 flink-1.15.x 开始支持
是否可被传递
类型描述
sink.rolling-policy.file-size128MBMemorySize滚动之前文件的最大大小。
sink.rolling-policy.rollover-interval30 minDuration被滚动之前,一个文件可以保持打开的最大时间间隔(默认为30分钟,以避免产生很多小文件)。通过 sink.rolling-policy.check-interval 选项来控制检查的频率。
sink.rolling-policy.check-interval1 minDuration滚动策略的检查时间间隔。该选项基于 sink.rolling-policy.rollover-interval 选项来控制检查文件是否可以被滚动。

注:对于块格式(parquetorcavro),滚动策略将会根据checkpoint间隔来控制大小和他们的数量,checkpoint决定文件的写入完成。

注:对于行格式(csvjson),如果想查看文件是否在文件系统中存在,并且不想等待过长的时间,
则可以在连接器配置 sink.rolling-policy.file-sizesink.rolling-policy.rollover-interval
并且在flink-conf.yaml中设置 execution.checkpointing.interval 参数。

对于其他的格式(avroorc),可以只在flink-conf.yaml中配置execution.checkpointing.interval参数。

文件压缩

文件系统sink支持文件压缩,该特性允许应用程序设置更小的checkpoint间隔,而不会产生很多的文件。

Key默认值从 flink-1.15.x 开始支持
是否可被传递
描述
auto-compactionfalse是否在流slink中开启自动压缩。数据将会被写入临时文件。checkpoint完成之后,通过checkpoint生成的临时文件将会被压缩。临时文件在被压缩之前是不可见的。
compaction.file-size(none)压缩的目标文件大小,默认值为滚动文件大小。

如果开启,文件压缩将会基于目标文件大小合并多个小文件为大文件。在生产生运行文件压缩时,需要注意以下问题:

  • 只有单个checkpoint中的文件可以被合并,因此,至少有和checkpoint次数相同的文件被生成。
  • 文件在被合并之前是不可见的,因此文件可见时间为:checkpoint间隔+压缩时间
  • 如果压缩运行时间过长,则将会造成任务的反压,并且增加checkpoint的时间。

分区提交

通常来说,写入分区之后通知下游应用程序是非常必要的。比如:增加分区信息到hive的元数据,或者是在分区目录中写入一个 _SUCCESS 文件。
文件系统sink连接器提供了分区提交特性,以允许配置自定义策略。提交行为基于合并的触发器和策略。

Trigger触发器:分区提交的时间可以通过水印或处理时间来确定。

Policy策略:如何提交一个分区,内奸策略支持通过success文件和元数据提交,也可以自定义实现策略。比如触发hive的指标分区,或者是和并小文件等等。

注:分区提交只在动态分区插入时起作用。

分区提交触发器

定义何时提交分区,提供分区提交触发器:

Key默认值从 flink-1.15.x 开始支持
是否可被传递
类型描述
sink.partition-commit.triggerprocess-timeString分区提交触发的类型:
process-time:基于机器时间,既不需要分区时间提取,也不需要水印生成。一旦当前系统时间超过了分区创建时的系统时间加上指定的delay延迟就会提交分区。
partition-time:基于分区字段值提取的时间,要求生成水印。当水印超过了分区值提取的时间加上delay延迟时提交水印。
sink.partition-commit.delay0 sDuration分区在延迟时间到达之前不会提交。如果是按天分区,则应该是1 d,如果是按小时分区,则应该是1 h
sink.partition-commit.watermark-time-zoneUTCString转换long类型的水印值为TIMESTAMP类型是使用的时区,转换之后的水印时间戳将被用于和分区时间计算,以决定分区是否应该被提交。
该选项只有在 sink.partition-commit.trigger 选项设置为 partition-time 时起作用。如果该选项没有被正确配置,比如source的rowtime被定义为TIMESTAMP_LTZ字段,但是该选项没有配置,则用户将会延迟几小时之后看到提交的分区。
默认值为UTC,这意味着水印需要被定义为TIMESTAMP字段,或者是不被定义。如果水印被定义为TIMESTAMP_LTZ字段,则水印时区为会话时区。该选项值可以是完全名称,比如America/Los_Angeles,或者是自定义的时区id,比如GMT+08:00

有两种触发器类型:

  • 第一个是分区的处理时间,既不要求分区时间提取,也不要求水印生成。该触发器根据分区的创建时间和当前系统时间触发分区提交。该触发器更常用,但不是很精确。比如,数据延迟或失败,将会导致不成熟的分区提交。
  • 第二个是根据水印和从分区中提取的时间来触发分区提交。该触发器要求任务有水印生成,并且分区根据时间来划分,比如按小时或按天分区。

如果想要下游尽快看到新分区,而不管数据写入是否完成:

  • ‘sink.partition-commit.trigger’=‘process-time’ (默认值)
  • ‘sink.partition-commit.delay’=‘0s’ (默认值),分区一旦写入数据,将会立即提交。注:分区可能会被提交多次。

如果想要下游在数据写入完成之后看到分区,并且job任务有水印生成,则可以通过分区值来提取时间:

  • ‘sink.partition-commit.trigger’=‘partition-time’
  • ‘sink.partition-commit.delay’=‘1h’ (如果分区为小时分区,则使用 1h,取决于分区时间类型)这是提交分区更准确的方式。它将尝试在数据写入完成之后再提交分区。

如果想要下游在数据写入完成之后看到分区,但是没有水印,或者是无法从分区值提取时间:

  • ‘ink.partition-commit.trigger’=‘process-time’ (默认值)
  • ‘sink.partition-commit.delay’=‘1h’ (如果分区为小时分区,则使用 1h,取决于分区时间类型)尝试准确的提交分区,但是迟到的数据或者是失败将会导致不成熟的分区提交。

迟到数据处理:支持写入分区的记录将会被写入已经提交的分区,并且该分区提交将会被再次触发。

分区时间提取

时间提取器定义定分区值提取时间的方式。

Key从 flink-1.15.x 开始支持
要求
从 flink-1.15.x 开始支持
是否可被传递
默认值类型描述
partition.time-extractor.kind可选defaultString指定从分区值提取时间的提取类型。支持 defaultcustom 。默认情况下,可以配置时间戳的模式,自定义的话,应该配置提取器的class类。
partition.time-extractor.class可选(none)String实现了PartitionTimeExtractor接口的提取器类。
partition.time-extractor.timestamp-pattern可选(none)Stringdefault 类型的分区时间提取方式允许用户指定分区字段时间戳的模式。默认支持从第一个属性匹配 yyyy-mm-dd hh:mm:ss 模式。如果时间戳应该从单个分区属性 dt 中提取,则可以配置为: d t ∗ ∗ 。如果时间戳应该从多个分区属性中提取,可以使用 ∗ ∗ y e a r ∗ ∗ 、 ∗ ∗ m o n t h ∗ ∗ 、 ∗ ∗ d a y ∗ ∗ 、 ∗ ∗ h o u r ∗ ∗ ,配置为: ∗ ∗ dt** 。如果时间戳应该从多个分区属性中提取,可以使用 **year** 、 **month** 、 **day** 、 **hour** ,配置为: ** dt。如果时间戳应该从多个分区属性中提取,可以使用yearmonthdayhour,配置为:year- m o n t h − month- monthday h o u r : 00 : 00 ∗ ∗ 。如果时间戳可以从两个分区属性 ∗ ∗ d t ∗ ∗ 和 ∗ ∗ h o u r ∗ ∗ 提取,则可以配置为: ∗ ∗ hour:00:00** 。如果时间戳可以从两个分区属性 **dt** 和 **hour** 提取,则可以配置为: ** hour:00:00。如果时间戳可以从两个分区属性dthour提取,则可以配置为:dt $hour:00:00
从 flink-1.15.x 开始支持
partition.time-extractor.timestamp-formatter
可选yyyy-MM-dd HH:mm:ssString转化分区时间戳字符串值为时间戳对象的 formatter 格式,分区时间戳字符串值通过 partition.time-extractor.timestamp-pattern 选项来定义。比如,分区时间戳通过多个分区字段提取:yearmonthday,你可以配置 partition.time-extractor.timestamp-pattern 选项为 y e a r year yearmonth$day ,然后配置 partition.time-extractor.timestamp-formatter 选项为 yyyyMMdd 。默认的 formatter 为:yyyy-MM-dd HH:mm:ss
timestamp-formatter 和 java 的 DateTimeFormatter 兼容。

默认提取器基于分区属性和时间戳默认组成。也可以通过实现 PartitionTimeExtractor 接口来完全自定义分区提取器。

public class HourPartTimeExtractor implements PartitionTimeExtractor {
    
    @Override
    public LocalDateTime extract(List<String> keys, List<String> values) {
    
        String dt = values.get(0);
        String hour = values.get(1);
        return Timestamp.valueOf(dt + " " + hour + ":00:00").toLocalDateTime();
    }
}

分区提交策略

分区提交策略定义分区提交时执行哪些操作:

  • 第一个是元数据,只有hive表支持元数据策略,文件系统通过目录结构管理分区。
  • 第二个是success文件,在分区对一个的目录下写一个空文件。
Key从 flink-1.15.x 开始
要求
从 flink-1.15.x 开始支持
是否可被传递
类型描述
sink.partition-commit.policy.kind可选String指定提交分区并通知下游应用程序,该分区已经完成写入并可进行读取的策略。
metastore:将分区写入元数据。只有hive表支持元数据策略,文件系统通过目录结构来管理分区。
success-file:在目录中增加 _success 文件。这两个方式可以同时配置: metastore,success-file
custom:使用策略类创建一个提交策略。
支持配置多个策略:metastore,success-file
sink.partition-commit.policy.class可选String实现了PartitionCommitPolicy接口的分区提交策略实现类。只在自定义custom提交策略中起作用。
sink.partition-commit.success-file.name可选Stringsuccess-file分区提交的文件名称,默认为: _SUCCESS

也可以向下面一样扩展提交策略实现:

public class AnalysisCommitPolicy implements PartitionCommitPolicy {
    
    private HiveShell hiveShell;

    @Override
    public void commit(Context context) throws Exception {
    
        if (hiveShell == null) {
    
            hiveShell = createHiveShell(context.catalogName());
        }

        hiveShell.execute(String.format(
                "ALTER TABLE %s ADD IF NOT EXISTS PARTITION (%s = '%s') location '%s'",
                context.tableName(),
                context.partitionKeys().get(0),
                context.partitionValues().get(0),
                context.partitionPath()));
        hiveShell.execute(String.format(
                "ANALYZE TABLE %s PARTITION (%s = '%s') COMPUTE STATISTICS FOR COLUMNS",
                context.tableName(),
                context.partitionKeys().get(0),
                context.partitionValues().get(0)));
    }
}

sink并行度

写入文件到外部文件系统的并行度(包括hive),可以通过表的option选项来配置,流模式和批模式都支持这么做。
默认情况下,slink的并行度和上游链在一起的算子并行度一致。如果配置了和上游算子不同的并行度,则写入文件算子的并行度将使用配置的并行度。

Key从 flink-1.15.x 开始
要求
从 flink-1.15.x 开始支持
是否可被传递
类型描述
sink.parallelism可选Integer将文件写入外部文件系统的并行度。数值应该大于0,否则将抛出异常。

注:目前,配置sink并行度只支持上游算子为仅插入INERT-ONLY类型的变更日志模式,否则将抛出异常。

完整案例

下面的例子展示文件系统连接器如何通过流查询从kafka读取数据,然后写入文件系统,并且通过批查询从文件系统中读取写入的数据。

CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  log_ts TIMESTAMP(3),
  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (...);

CREATE TABLE fs_table (
  user_id STRING,
  order_amount DOUBLE,
  dt STRING,
  `hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
  'connector'='filesystem',
  'path'='...',
  'format'='parquet',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.policy.kind'='success-file'
);

-- streaming sql, insert into file system table
INSERT INTO fs_table 
SELECT 
    user_id, 
    order_amount, 
    DATE_FORMAT(log_ts, 'yyyy-MM-dd'),
    DATE_FORMAT(log_ts, 'HH') 
FROM kafka_table;

-- batch sql, select with partition pruning
SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';

如果水印定义在TIMESTAMP_LTZ类型的字段上,并且被用于分区提交时间,则sink.partition-commit.watermark-time-zone配置必须设置为会话时间分区,否则分区提交将会晚几个小时。

CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  ts BIGINT, -- 毫秒值
  ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
  WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- 在TIMESTAMP_LTZ字段上定义水印
) WITH (...);

CREATE TABLE fs_table (
  user_id STRING,
  order_amount DOUBLE,
  dt STRING,
  `hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
  'connector'='filesystem',
  'path'='...',
  'format'='parquet',
  'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- 表名用户配置的时区为:'Asia/Shanghai'
  'sink.partition-commit.policy.kind'='success-file'
);

-- 流式sql,插入数据到文件系统
INSERT INTO fs_table 
SELECT 
    user_id, 
    order_amount, 
    DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'),
    DATE_FORMAT(ts_ltz, 'HH') 
FROM kafka_table;

-- 批式sql,查询指定分区下的数据
SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';

Hbase

介绍

支持:

  • Scan Source: Bounded
  • Lookup Source: Sync Mode
  • Sink: Batch
  • Sink: Streaming Upsert Mode

HBase 连接器支持读取和写入 HBase 集群。本文档介绍如何使用 HBase 连接器基于 HBase 进行 SQL 查询。

HBase 连接器在 upsert 模式下运行,可以使用 DDL 中定义的主键与外部系统交换更新操作消息。但是主键只能基于 HBase 的 rowkey 字段定义。如果没有声明主键,HBase 连接器默认取 rowkey 作为主键。

依赖

为了使用HBase连接器,要求下面的依赖添加到通过自动构建工具(比如maven和SBT)构建的项目中,或者是SQL的客户端。

1.4.x

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hbase-1.4_2.11</artifactId>
    <version>1.13.2</version>
</dependency>

2.2.x

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hbase-2.2_2.11</artifactId>
    <version>1.13.2</version>
</dependency>

注意自己使用的 flink 和 scala 版本。

如何使用 HBase 表

所有 HBase 表的列簇必须定义为 ROW 类型,字段名对应列簇名(column family),嵌套的字段名对应列限定符名(column qualifier)。
用户只需在表结构中声明查询中使用的列簇和列限定符。
除了 ROW 类型的列,剩下的原子数据类型字段(比如,STRING, BIGINT)将被识别为 HBase 的 rowkey,一张表中只能声明一个 rowkeyrowkey 字段的名字可以是任意的,如果是保留关键字,需要用反引号。

-- 在 Flink SQL 中注册 HBase 表 "mytable"
CREATE TABLE hTable (
 rowkey INT,
 family1 ROW<q1 INT>,
 family2 ROW<q2 STRING, q3 BIGINT>,
 family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
 PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
 'connector' = 'hbase-1.4',
 'table-name' = 'mytable',
 'zookeeper.quorum' = 'localhost:2181'
);

-- 用 ROW(...) 构造函数构造列簇,并往 HBase 表写数据。
-- 假设 "T" 的表结构是 [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6]
INSERT INTO hTable
SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;

-- 从 HBase 表扫描数据
SELECT rowkey, family1, family3.q4, family3.q6 FROM hTable;

-- temporal join HBase 表,将 HBase 表作为维表
SELECT * FROM myTopic
LEFT JOIN hTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = hTable.rowkey;

连接器参数

参数是否必选从 flink-1.15.x 开始支持
是否可传递
默认值数据类型描述
connector必选(none)String指定使用的连接器, 支持的值如下 :
hbase-1.4: 连接 HBase 1.4.x 集群
hbase-2.2: 连接 HBase 2.2.x 集群
table-name必选(none)String连接的 HBase 表名。
zookeeper.quorum必选(none)StringHBase Zookeeper quorum 信息。
zookeeper.znode.parent可选/hbaseStringHBase 集群的 Zookeeper 根目录。
null-string-literal可选nullString当字符串值为 null 时的存储形式,默认存成 null 字符串。HBase 的 source 和 sink 的编解码将所有数据类型(除字符串外)的 null 值以空字节来存储。
sink.buffer-flush.max-size可选2mbMemorySize写入的参数选项。每次写入请求缓存行的最大大小。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 0 关闭此选项。
sink.buffer-flush.max-rows可选1000Integer写入的参数选项。 每次写入请求缓存的最大行数。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 0 关闭此选项。
sink.buffer-flush.interval可选1sDuration写入的参数选项。刷写缓存行的间隔。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 0 关闭此选项。
注意:sink.buffer-flush.max-sizesink.buffer-flush.max-rows 同时设置为 0,刷写将会异步处理整个缓存行为。
sink.parallelism可选(none)Integer为 HBase sink operator 定义并行度。默认情况下,并行度由框架决定,和链在一起的上游算子一样。
lookup.async可选falseBoolean是否启用异步查找。如果为true,则进行异步查找。注意:异步方式只支持 hbase-2.2 连接器。
lookup.cache.max-rows可选flink-1.13.x:(无)
flink-1.14.x:-1
flink-1.13.x:Integer
flink-1.14.x:Long
查找缓存的最大行数,超过这个值,最旧的行将过期。注意:lookup.cache.max-rowslookup.cache.ttl 必须同时被设置。默认情况下,查找缓存是禁用的。
lookup.cache.ttl可选flink-1.13.x:(无)
flink-1.14.x:0s
Duration查找缓存中每一行的最大生存时间,在这段时间内,最老的行将过期。注意:lookup.cache.max-rowslookup.cache.ttl 必须同时被设置。默认情况下,查找缓存是禁用的。
lookup.max-retries可选3Integer查找数据库失败时的最大重试次数。
properties.*可选(无)String可以设置任意 HBase 的配置项。后缀名必须匹配在 HBase 配置文档 中定义的配置键。
Flink 将移除 properties. 配置键前缀并将变换后的配置键和值传入底层的 HBase 客户端。 例如设置的 'properties.hbase.security.authentication' = 'kerberos' 将会传递kerberos认证参数。

数据类型映射表

HBase 以字节数组存储所有数据。在读和写过程中要序列化和反序列化数据。

Flink 的 HBase 连接器利用 HBase(Hadoop) 的工具类 org.apache.hadoop.hbase.util.Bytes 进行字节数组和 Flink 数据类型转换。

Flink 的 HBase 连接器将所有数据类型(除字符串外)的null 值编码成空字节。对于字符串类型,null 值的字面值由null-string-literal选项值决定。

数据类型映射表如下:

Flink 数据类型HBase 转换
CHAR / VARCHAR / STRINGbyte[] toBytes(String s)
String toString(byte[] b)
BOOLEANbyte[] toBytes(boolean b)
boolean toBoolean(byte[] b)
BINARY / VARBINARY返回 byte[]。
DECIMALbyte[] toBytes(BigDecimal v)
BigDecimal toBigDecimal(byte[] b)
TINYINTnew byte[] { val }
bytes[0] // 只返回第一个字节
SMALLINTbyte[] toBytes(short val)
short toShort(byte[] bytes)
INTbyte[] toBytes(int val)
int toInt(byte[] bytes)
BIGINTbyte[] toBytes(long val)
long toLong(byte[] bytes)
FLOATbyte[] toBytes(float val)
float toFloat(byte[] bytes)
DOUBLEbyte[] toBytes(double val)
double toDouble(byte[] bytes)
DATE从 1970-01-01 00:00:00 UTC 开始的天数,int 值。
TIME从 1970-01-01 00:00:00 UTC 开始天的毫秒数,int 值。
TIMESTAMP从 1970-01-01 00:00:00 UTC 开始的毫秒数,long 值。
ARRAY不支持
MAP / MULTISET不支持
ROW不支持

DataGen

介绍

支持:

  • Scan Source: Bounded
  • Scan Source: UnBounded

DataGen连接器允许通基内存的数据生成来创建表,这对于本地查询,而不是外部系统查询来说是非常有用的,比如kafka。表可以包含Computed Column syntax字段计算语法,以支持更灵活的数据生成。

DataGen是内建连接器,无需添加额外依赖。

使用

默认情况下,DataGen表将创建无限数量的数据行,并且每个字段都是随机值。对于可变大小的类型,比如char/varchar/string/array/map/multiset,可以指定他们的长度。另外也可以通过指定总行数,来创建一个有界表。

flink也提供了序列化生成器,用户可以指定序列的开始和结束之。如果表的某个字段为序列类型,则表将会成为有界表,当第一个字段值生成到他对应的结束值时数据生成结束。

时间类型通常为本地机器的当前系统时间。

CREATE TABLE Orders (
    order_number BIGINT,
    price        DECIMAL(32,2),
    buyer        ROW<first_name STRING, last_name STRING>,
    order_time   TIMESTAMP(3)
) WITH (
  'connector' = 'datagen'
)

通常情况下,数据生成连接器和LINK子句一起使用来模拟物理表。

CREATE TABLE Orders (
    order_number BIGINT,
    price        DECIMAL(32,2),
    buyer        ROW<first_name STRING, last_name STRING>,
    order_time   TIMESTAMP(3)
) WITH (...)

-- 创建一个模拟表
CREATE TEMPORARY TABLE GenOrders
WITH (
    'connector' = 'datagen',
    'number-of-rows' = '10'
)
LIKE Orders (EXCLUDING ALL)

数据类型

类型支持的生成器备注
BOOLEANrandom
CHARrandom / sequence
VARCHARrandom / sequence
STRINGrandom / sequence
DECIMALrandom / sequence
TINYINTrandom / sequence
SMALLINTrandom / sequence
INTrandom / sequence
BIGINTrandom / sequence
FLOATrandom / sequence
DOUBLErandom / sequence
DATErandom通常为本地机器的日期。
TIMErandom通常为本地机器的时间。
TIMESTAMPrandomflink-1.13.x:通常为本地机器的时间戳。
flink-1.5.x:解析为本地机器时间戳过去一段时间的时间戳,最大可以过去的时间可以通过 max-past 选项配置。
TIMESTAMP_LTZrandomflink-1.13.x:通常为本地机器的时间戳。
flink-1.5.x:解析为本地机器时间戳过去一段时间的时间戳,最大可以过去的时间可以通过 max-past 选项配置。
INTERVAL YEAR TO MONTHrandom
INTERVAL DAY TO MONTHrandom
ROWrandom通过随机子属性值生成一个row类型字段值。
ARRAYrandom通过随机entry生成一个数组。
MAPrandom通过随机entry生成一个map表。
MULTISETrandom通过随机entry生成一个multiset

连接器选项

Option是否必须默认值类型描述
connector必须(none)String指定使用那个连接器,这儿应该是:datagen
rows-per-second可选10000Long指定每秒生成的行数以控制发射频率。
number-of-rows可选(none)Long发射的数据总行数。默认情况下,表是无界的。
fields.#.kind可选randomString该**#字段的生成器,可以是sequencerandom**。
fields.#.min可选(该类型最小值)(Type of field)随机生成器的最小值,指针对于数字类型。
fields.#.max可选(该类型最大值)(Type of field)随机生成器的最大值,只针对于数字类型。
从 flink-1.15.x:开始支持
fields.#.max-past
可选0Duration时间戳随机生成器可以过去的最大时间,只用于 timestamp 类型。
fields.#.length可选100Integer生成 char/varchar/string/array/map/multiset 可变长度类型的大小或长度。
fields.#.start可选(none)(Type of field)序列生成器的开始值。
fields.#.end可选(none)(Type of field)序列生成器的结束值。

注:上面选项中的 # ,在实际使用时,要替换为创建的表的某个字段名称,表示对这个字段设置属性。

Print

介绍

支持:

  • Sink

Print 连接器允许将每一行写入标准输出流或者标准错误流。

设计目的:

  • 简单的流作业测试。
  • 对生产调试带来极大便利。

四种 format 选项:

打印内容条件 1条件 2
标识符:任务 ID>输出数据需要提供前缀打印标识符
标识符>输出数据需要提供前缀打印标识符
任务 ID>输出数据不需要提供前缀打印标识符
输出数据不需要提供前缀打印标识符parallelism == 1

输出字符串格式为 $row_kind(f0,f1,f2…),row_kind是一个 RowKind 类型的短字符串,例如:+I(1,1)

Print 连接器是内置的。

注意:在任务运行时使用 Print Sinks 打印记录,需要注意观察任务日志。

创建Print表

CREATE TABLE print_table (
    f0 INT,
    f1 INT,
    f2 STRING,
    f3 DOUBLE
) WITH (
    'connector' = 'print'
)

也可以通过 LIKE 子句 基于已有表的结构去创建新表。

CREATE TABLE print_table WITH ('connector' = 'print')
LIKE source_table (EXCLUDING ALL)

连接器参数

参数是否必选默认值数据类型描述
connector必选(none)String指定要使用的连接器,此处应为 print
print-identifier可选(none)String配置一个标识符作为输出数据的前缀。
standard-error可选falseBoolean如果 format 需要打印为标准错误而不是标准输出,则为 True
sink.parallelism可选(none)Integer为 Print sink 算子定义并行度。默认情况下,并行度由框架决定,和链在一起的上游算子一致。

BlackHole

介绍

支持:

  • Sink: Bounded
  • Sink: UnBounded

BlackHole 连接器允许接收所有输入记录。它被设计用于:

  • 高性能测试。
  • UDF 输出,而不是实质性 sink。

就像类 Unix 操作系统上的 /dev/null

BlackHole 连接器是内置的。

创建 BlackHole 表

CREATE TABLE blackhole_table (
    f0 INT,
    f1 INT,
    f2 STRING,
    f3 DOUBLE
) WITH (
    'connector' = 'blackhole'
);

也可以基于现有模式使用 LIKE 子句 创建。

CREATE TABLE blackhole_table WITH ('connector' = 'blackhole')
LIKE source_table (EXCLUDING ALL)

连接器选项

选项是否必要默认值类型描述
connector必要(none)String指定需要使用的连接器,此处应为 blackhole
原网站

版权声明
本文为[第一片心意]所创,转载请带上原文链接,感谢
https://blog.csdn.net/u012443641/article/details/126128840