当前位置:网站首页>flink-sql所有表格式format

flink-sql所有表格式format

2022-08-04 05:27:00 第一片心意

版本说明

本文档介绍的各种flink sql的语法基于flink-1.13.x,flink版本低于1.13.x的用户,在sql运行出错误时,需要自行去flink官网查看对应版本的语法支持。

另外,flink新版本支持的语法,文档中会进行特殊标注,说明对应语法在 flink 哪个版本开始支持,但凡是没有特殊标注的,均支持flink-1.13.x及以上版本。

所有格式

Flink提供了一组可以与表连接器一起使用的表格式。表格式是一种存储格式,定义如何将二进制数据映射到表字段。

Flink支持以下格式:

格式连接器
CSVApache Kafka, Upsert Kafka, Amazon Kinesis Data Streams, Filesystem
JSONApache Kafka, Upsert Kafka, Amazon Kinesis Data Streams, Filesystem, Elasticsearch
Apache AvroApache Kafka, Upsert Kafka, Amazon Kinesis Data Streams, Filesystem
Confluent AvroApache Kafka, Upsert Kafka
Debezium CDCApache Kafka, Filesystem
Canal CDCApache Kafka, Filesystem
Maxwell CDCApache Kafka, Filesystem
OGG CDCApache Kafka, Filesystem(从 flink-1.15.x 开始支持)
Apache ParquetFilesystem
Apache ORCFilesystem
RawApache Kafka, Upsert Kafka, Amazon Kinesis Data Streams, Filesystem

CSV

说明

支持:

  • Format: Serialization Schema 序列化格式
  • Format: Deserialization Schema 反序列化格式

CSV格式允许基于CSV schema读写CSV格式的数据。目前,CSV schema来源于表schema定义。

依赖

为了使用CSV格式,以下依赖项对于使用自动化构建工具(如Maven或SBT)的项目和带有SQL JAR包的SQL Client都是必需的。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>1.13.0</version>
</dependency>

注意自己使用的 flink 版本。

使用CSV格式

CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_behavior',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'testGroup',
    'format' = 'csv',
    'csv.ignore-parse-errors' = 'true',
    'csv.allow-comments' = 'true'
)

Format参数

选项要求是否可以被转发
从 flink-1.15.x 开始支持
默认值类型描述
format必选(none)String指定使用哪种格式,这儿应该是 csv
csv.field-delimiter可选,String字段值分隔符号(默认为英文逗号**,),必须是单个字符。
可以使用反斜杠来指定特殊字符,比如
\t代表制表符。
也可以在纯SQL中使用unicode编码来指定,比如:
‘csv.field-delimiter’ = U&‘\0001’**,表示0x01字符。
csv.disable-quote-character可选falseBoolean禁用用于封闭字段值的引号符号(默认为false)。如果为true,必须设置csv.quote-character选项。
csv.quote-character可选"String封闭字段值的引号符号(默认为英文双引号**"**)。
csv.allow-comments可选falseBoolean忽略以**#**开头的注释行(默认禁用)。如果启动用,确认同时忽略转换错误,以允许出现空行数据。
csv.ignore-parse-errors可选falseBoolean跳过转换错误的属性和数据行,而不是失败。如果出现错误,字段值将设置为null
csv.array-element-delimiter可选;String数组元素分隔符号(默认为英文分号**;**)。
csv.escape-character可选(none)String用于转义字段值的转移符号(默认禁用)。
csv.null-literal可选(none)String将null字符串作为NULL赋给对应字段值(默认禁用)。

数据类型匹配

目前,CSV schema总是派生于表schema。目前还不支持直接显式定义CSV schema。

Flink CSV格式使用jackson databind API解析和生成CSV字符串。

Flink类型到CSV类型的类型映射如下表所示。

Flink SQL typeCSV type
CHAR / VARCHAR / STRINGstring
BOOLEANboolean
BINARY / VARBINARYbase64 编码的字符串
DECIMALnumber
TINYINTnumber
SMALLINTnumber
INTnumber
BIGINTnumber
FLOATnumber
DOUBLEnumber
DATEdate 格式的字符串
TIMEtime 格式的字符串
TIMESTAMPdate-time 格式的字符串
INTERVALnumber
ARRAYarray
ROWobject

JSON

说明

支持:

  • Format: Serialization Schema 序列化格式
  • Format: Deserialization Schema 反序列化格式

JSON格式允许基于JSON schema读写JSON格式的数据。目前,JSON schema派生于表schema。

JSON format 支持仅追加流,除非是你使用的连接器明确支持 retract流 和/或 upsert流,比如 Upsert Kafka 连接器。
如果你需要将数据写入 retract流 和/或 upsert流,建议你使用 CDC format,比如 Debezium JSONCannal JSON

依赖

为了使用Json格式,以下依赖项对于使用自动化构建工具(如Maven或SBT)的项目和带有SQL JAR包的SQL Client都是必需的。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>1.13.0</version>
</dependency> 

注意自己使用的 flink 版本。

使用JSON格式

下面是一个使用Kafka连接器和JSON格式创建表的示例。

CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_behavior',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'testGroup',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true'
)

Format参数

选项要求是否可以被转发
从 flink-1.15.x 开始支持
默认值类型描述
format必选(none)String指定使用哪种格式,这儿必须是 json
json.fail-on-missing-field可选falseBoolean如果丢失了schema中指定的属性,是否发生失败。
json.ignore-parse-errors可选falseBoolean如果转化错误,直接跳过该属性和行,而不是发生失败。该类型的错误,属性会被设置为null
json.timestamp-format.standard可选‘SQL’String声明输入和输出的时间戳格式。当前支持的格式为SQL 以及 ISO-8601
可选参数 SQL 将会以 yyyy-MM-dd HH:mm:ss.s{precision} 的格式解析时间戳, 例如 2020-12-30 12:13:14.123 ,且会以相同的格式输出。
可选参数 ISO-8601 将会以 yyyy-MM-ddTHH:mm:ss.s{precision} 的格式解析输入时间戳, 例如 2020-12-30T12:13:14.123 ,且会以相同的格式输出。
json.map-null-key.mode可选‘FAIL’String指定匹配数据时序列化键为null的处理模式。目前支持:FAILDROPLITERAL
FAIL:遇到null键匹配时抛出异常。
DROP:遇到null键匹配时丢弃数据。
LITERAL:替换null键为字符串字面量。字符串字面量通过 json.map-null-key.literal 选项定义。
json.map-null-key.literal可选‘null’String当设置 json.map-null-key.mode 选项为 LITERAL 时,指定代替null键的字符串字面量。
如果设置为 null ,则表的schema字段名null就会和实际JSON数据中的 null 键进行匹配;
如果设置为 null-key ,则表的schema字段名null-key就会和实际JSON数据中的 null 键进行匹配。
json.encode.decimal-as-plain-number可选falseBoolean编码所有的数字为普通数字而不是科学计数法数字。
默认情况改下,数据可能会使用科学计数法,比如:0.000000027会被默认编码为2.7E-8。如果设置这个选项为true,则会编码为0.000000027

数据类型匹配

目前,JSON schema总是派生于表schema。目前还不支持直接显式定义JSON schema。

Flink JSON格式使用jackson databind API解析和生成JSON字符串。

下表列出了从Flink类型到JSON类型的类型映射。

Flink SQL typeJSON type
CHAR / VARCHAR / STRINGstring
BOOLEANboolean
BINARY / VARBINARYbase64 编码的字符串
DECIMALnumber
TINYINTnumber
SMALLINTnumber
INTnumber
BIGINTnumber
FLOATnumber
DOUBLEnumber
DATEdate 格式的字符串
TIMEtime 格式的字符串
TIMESTAMPdate-time 格式的字符串
TIMESTAMP_WITH_LOCAL_TIME_ZONEdate-time 格式的字符串,使用 UTC 时区
INTERVALnumber
ARRAYarray
MAP / MULTISETobject
ROWobject

AVRO

说明

支持:

  • Format: Serialization Schema 序列化格式
  • Format: Deserialization Schema 反序列化格式

Apache Avro格式允许基于Avro schema读写Avro格式的数据。目前,Avro schema派生于表schema。

依赖

为了使用Avro格式,以下依赖项对于使用自动化构建工具(如Maven或SBT)的项目和带有SQL JAR包的SQL Client都是必需的。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-sql-avro</artifactId>
    <version>1.13.0</version>
</dependency> 

注意自己所使用的 flink 的版本。

使用AVRO格式

下面是一个使用Kafka连接器和Avro格式创建表的例子。

CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_behavior',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'testGroup',
    'format' = 'avro'
)

Format参数

选项要求是否可以被转发
从 flink-1.15.x 开始支持
默认值类型描述
format必选(none)String指定使用哪种格式,这儿应该是 avro
avro.codec可选(none)String只用于Filesystem文件系统,指定avro的压缩格式。默认没有压缩。可用的枚举有:deflatesnappybzip2xz

数据类型匹配

目前,Avro schema总是派生于表schema。目前还不支持直接显式定义Avro schema。下表列出了从Flink类型到Avro类型的类型映射。

Flink SQL 类型Avro 类型Avro 逻辑类型
CHAR / VARCHAR / STRINGstring
BOOLEANboolean
BINARY / VARBINARYbytes
DECIMALfixeddecimal
TINYINTint
SMALLINTint
INTint
BIGINTlong
FLOATfloat
DOUBLEdouble
DATEintdate
TIMEinttime-millis
TIMESTAMPlongtimestamp-millis
ARRAYarray
MAP
(key 必须是 string/char/varchar 类型)
map
MULTISET
(元素必须是 string/char/varchar 类型)
map
ROWrecord

除了上面列出的类型外,Flink还支持读写可空类型。Flink将可为空的类型映射到Avro联合(某值,null),其中某值是从Flink类型转换而来的Avro类型。

有关Avro类型的更多信息,可以参考Avro规范

Confluent Avro

说明

支持:

  • Format: Serialization Schema 序列化格式
  • Format: Deserialization Schema 反序列化格式

Avro Schema Registry(avro-confluent)格式允许你读取被io.confluent.kafka.serializers.KafkaAvroSerializer序列化的记录, 并写入可以被io.confluent.kafka.serializers.KafkaAvroDeserializer反序列化读取的记录。

当读取(反序列化)这种格式的数据时,根据数据中的schema版本id从配置的Confluent schema Registry中获取Avro写入schema,同时从表schema推断读取schema。

当用这种格式写入(序列化)一条数据时,Avro schema将从表schema推断出用于检索的schema id:

  • flink-1.13.x:主要通过avro-confluent.schema-registry.subject配置的主题名进行查找。
  • flink-1.14.x:主要通过avro-confluent.subject配置的主题名进行查找。

Avro Schema Registry格式只能与Apache Kafka SQL连接器或Upsert Kafka SQL连接器结合使用。

依赖

为了使用Avro Schema Registry格式,以下依赖项对于使用自动化构建工具(如Maven或SBT)的项目和带有SQL JAR包的SQL Client都是必需的。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-avro-confluent-registry</artifactId>
    <version>1.13.0</version>
</dependency>

注意自己使用的 flink 版本。

使用Avro-Confluent格式

使用原始UTF-8字符串作为Kafka键以及在Schema Registry中注册的Avro记录作为Kafka值注册的表:

flink-1.13.x:

CREATE TABLE user_created (
    -- -- 一个映射到kafka原生UTF-8字符串key的字段
    the_kafka_key STRING,
    -- 一些Avro属性字段作为kafka value
    id STRING,
    name STRING,
    email STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_events_example1',
    'properties.bootstrap.servers' = 'localhost:9092',
    -- UTF-8字符串作为kafka key,使用“the_kafka_key”表字段
    'key.format' = 'raw',
    'key.fields' = 'the_kafka_key',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.schema-registry.url' = 'http://localhost:8082',
    'value.fields-include' = 'EXCEPT_KEY'
)

flink-1.14.x:

CREATE TABLE user_created (
    -- -- 一个映射到kafka原生UTF-8字符串key的字段
    the_kafka_key STRING,
    -- 一些Avro属性字段作为kafka value
    id STRING,
    name STRING,
    email STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_events_example1',
    'properties.bootstrap.servers' = 'localhost:9092',
    -- UTF-8字符串作为kafka key,使用“the_kafka_key”表字段
    'key.format' = 'raw',
    'key.fields' = 'the_kafka_key',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = 'http://localhost:8082',
    'value.fields-include' = 'EXCEPT_KEY'
)

我们可以如下方式将数据写入kafka表:

INSERT INTO user_created
SELECT
    -- 赋值user id字段值作为kafka key
    id as the_kafka_key,
    -- 所有字段值
    id, name, email
FROM some_table

Kafka键和值都在Schema Registry中注册为Avro record:

flink-1.13.x:

CREATE TABLE user_created (
    -- 一个映射到“id” avro属性字段作为kafka key
    kafka_key_id STRING,
    -- 一些映射到avro属性字段作为kafka value
    id STRING,
    name STRING,
    email STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_events_example2',
    'properties.bootstrap.servers' = 'localhost:9092',
    -- 注意:由于哈希分区的存在,Kafka key上下文中的schema演化几乎不可能向后或向前兼容。
    'key.format' = 'avro-confluent',
    'key.avro-confluent.schema-registry.url' = 'http://localhost:8082',
    -- 在这个例子中,我们希望Kafka key和value的Avro类型都包含字段'id' => 在与Kafka key字段相关联的表字段名前添加一个前缀,以避免冲突
    'key.fields-prefix' = 'kafka_key_',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.schema-registry.url' = 'http://localhost:8082',
    'value.fields-include' = 'EXCEPT_KEY',
    -- 从flink 1.13版本开始,subject有默认值,尽管可以被覆盖
    'key.avro-confluent.schema-registry.subject' = 'user_events_example2-key2',
    'value.avro-confluent.schema-registry.subject' = 'user_events_example2-value2'
)

flink-1.14.x:

CREATE TABLE user_created (
    -- 一个映射到“id” avro属性字段作为kafka key
    kafka_key_id STRING,
    -- 一些映射到avro属性字段作为kafka value
    id STRING,
    name STRING,
    email STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_events_example2',
    'properties.bootstrap.servers' = 'localhost:9092',
    -- 注意:由于哈希分区的存在,Kafka key上下文中的schema演化几乎不可能向后或向前兼容。
    'key.format' = 'avro-confluent',
    'key.avro-confluent.url' = 'http://localhost:8082',
    -- 在这个例子中,我们希望Kafka key和value的Avro类型都包含字段'id' => 在与Kafka key字段相关联的表字段名前添加一个前缀,以避免冲突
    'key.fields-prefix' = 'kafka_key_',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = 'http://localhost:8082',
    'value.fields-include' = 'EXCEPT_KEY',
    -- 从flink 1.13版本开始,subject有默认值,尽管可以被覆盖
    'key.avro-confluent.subject' = 'user_events_example2-key2',
    'value.avro-confluent.subject' = 'user_events_example2-value2'
)

使用upsert-kafka连接器的表示例,其中Kafka value在Schema Registry中注册为Avro记录:

flink-1.13.x:

CREATE TABLE user_created (
    -- 一个映射到kafka原生UTF-8字符串key的字段
    kafka_key_id STRING,
    -- 一些映射到avro属性的字段作为kafka value
    id STRING,
    name STRING,
    email STRING,
    -- upsert-kafka连接器要求有一个主键来定义upsert行为
    PRIMARY KEY (kafka_key_id) NOT ENFORCED

) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'user_events_example3',
    'properties.bootstrap.servers' = 'localhost:9092',
    -- UTF-8字符串作为kafka key
    -- 在这个案例中不指定'key.fields',因为它由表的主键指定
    'key.format' = 'raw',
    -- In this example, we want the Avro types of both the Kafka key and value to contain the field 'id'
    -- 在这个例子中,我们希望Kafka key和value的Avro类型都包含字段'id' => 在与Kafka key字段相关联的表字段名前添加一个前缀,以避免冲突
    'key.fields-prefix' = 'kafka_key_',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.schema-registry.url' = 'http://localhost:8082',
    'value.fields-include' = 'EXCEPT_KEY'
)

flink-1.14.x:

CREATE TABLE user_created (
    -- 一个映射到kafka原生UTF-8字符串key的字段
    kafka_key_id STRING,
    -- 一些映射到avro属性的字段作为kafka value
    id STRING,
    name STRING,
    email STRING,
    -- upsert-kafka连接器要求有一个主键来定义upsert行为
    PRIMARY KEY (kafka_key_id) NOT ENFORCED

) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'user_events_example3',
    'properties.bootstrap.servers' = 'localhost:9092',
    -- UTF-8字符串作为kafka key
    -- 在这个案例中不指定'key.fields',因为它由表的主键指定
    'key.format' = 'raw',
    -- In this example, we want the Avro types of both the Kafka key and value to contain the field 'id'
    -- 在这个例子中,我们希望Kafka key和value的Avro类型都包含字段'id' => 在与Kafka key字段相关联的表字段名前添加一个前缀,以避免冲突
    'key.fields-prefix' = 'kafka_key_',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = 'http://localhost:8082',
    'value.fields-include' = 'EXCEPT_KEY'
)

Format参数

选项要求是否可以被转发 从 flink-1.15.x 开始支持默认值类型描述
format必选(none)String指定使用哪种模式,这儿应该是 avro-confluent
avro-confluent.basic-auth.credentials-source可选(none)Stringschema注册的基础认证证书资
avro-confluent.basic-auth.user-info可选(none)Stringschema注册的基础认证用户信息
avro-confluent.bearer-auth.credentials-source可选(none)Stringschema注册的持有者认证证书源
avro-confluent.bearer-auth.token可选(none)Stringschema注册的持有者认证令牌 token 源
从 flink-1.14.x 开始支持 avro-confluent.properties可选(node)Map转发到下面 schema 注册的属性 map 表,这对于没有通过Flink配置选项正式公开的选项很有用,但是 Flink 选项拥有更高的优先级。
avro-confluent.ssl.keystore.location可选(none)StringSSL秘钥库文件存储位置
avro-confluent.ssl.keystore.password可选(none)StringSSL秘钥库密码
avro-confluent.ssl.truststore.location可选(none)StringSSL truststore的文件存储位置
avro-confluent.ssl.truststore.password可选(none)StringSSL truststore的密码
flink-1.13.x:avro-confluent.schema-registry.subject flink-1.14.x:avro-confluent.subject可选(none)StringConfluent模式注册中心主题,在该主题下注册此格式在序列化期间使用的schema。默认情况下,kafkaupsert-kafka 连接器使用 <topic_name>-value<topic_name>-key 作为默认主题名。但对于其他连接器(例如: filesystem ),当用作接收器时,subject选项是必需的。
flink-1.13.x:avro-confluent.schema-registry.url flink-1.14.x:avro-confluent.url必选(none)String用于获取/注册Confluent Schema Registry schema的URL

数据类型匹配

目前,Apache Flink总是使用表schema在反序列化期间派生Avro读取schema,在序列化期间派生Avro写入schema。

目前还不支持直接显式定义Avro模式。 Avro和Flink数据类型之间的映射请参见Apache Avro Format

除了上面列出的类型外,Flink还支持读写可空类型。Flink将可为空的类型映射到Avro联合(某值,null),其中某值是从Flink类型转换而来的Avro类型。

有关Avro类型的更多信息,可以参考Avro规范

Debezium

说明

支持:

  • Changelog-Data-Capture CDC
  • Format Format: Serialization 序列化格式
  • Schema Format: Deserialization Schema 反序列化格式

Debezium 是一个 CDC(Changelog Data Capture,变更数据捕获)工具,可以把来自 MySQLPostgreSQLOracleMicrosoft SQL Server
和许多其他数据库的更改实时流传输到 Kafka 中。
Debezium 为变更日志提供了统一的格式结构,并支持使用 JSONApache Avro 序列化消息。

Flink 支持将 Debezium JSONAvro 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,这个特性非常有用,例如

  • 将增量数据从数据库同步到其他系统
  • 日志审计
  • 数据库的实时物化视图
  • 关联维度数据库的变更历史,等等

Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Debezium 格式的 JSONAvro 消息,输出到 Kafka 等存储中。
但需要注意的是,目前 Flink 还不支持将 UPDATE_BEFOREUPDATE_AFTER 合并为一条 UPDATE 消息。
因此,Flink 将 UPDATE_BEFOREUPDATE_AFTER 分别编码为 DELETEINSERT 类型的 Debezium 消息。

依赖

Debezium Avro

为了使用Debezium格式,以下依赖项对于使用自动化构建工具(如Maven或SBT)的项目和带有SQL JAR包的SQL Client都是必需的

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-avro-confluent-registry</artifactId>
    <version>1.13.0</version>
</dependency>

Debezium Json

为了使用Debezium格式,以下依赖项对于使用自动化构建工具(如Maven或SBT)的项目和带有SQL JAR包的SQL Client都是必需的

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>1.13.0</version>
</dependency>

注意自己使用的 flink 版本。

注意: 请参考 Debezium 文档,
了解如何设置 Debezium Kafka Connect 用来将变更日志同步到 Kafka 主题。

使用 Debezium Format

Debezium 为变更日志提供了统一的格式,下面是一个 JSON 格式的从 MySQL product 表捕获的更新操作的简单示例:

{
    
  "before": {
    
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.18
  },
  "after": {
    
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.15
  },
  "source": {
    
    ...
  },
  "op": "u",
  "ts_ms": 1589362330904,
  "transaction": null
}

注意: 参考 Debezium 文档,了解每个字段的含义。

MySQL product表有4列(id、name、description、weight)。上面的 JSON 消息是 products 表上的一条更新事件,
其中 id = 111 的行的 weight 值从 5.18 更改为 5.15。假设此消息已同步到 Kafka 主题 products_binlog 中,则可以使用以下 DDL 来读取此主题并解析更改事件。

CREATE TABLE topic_products (
    -- schema 与 MySQL 的 products 表完全相同
    id BIGINT,
    name STRING,
    description STRING,
    weight DECIMAL(10, 2)
) WITH (
    'connector' = 'kafka',
    'topic' = 'products_binlog',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'testGroup',
    -- 使用 'debezium-json' format 来解析 Debezium 的 JSON 消息
    -- 如果 Debezium 用 Avro 编码消息,请使用 'debezium-avro-confluent'
    'format' = 'debezium-json'  -- 如果 Debezium 用 Avro 编码消息,请使用 'debezium-avro-confluent'
)

在某些情况下,用户在设置 Debezium Kafka Connect 时,可能会开启 Kafka 的配置 value.converter.schemas.enable ,用来在消息体中包含 schema 信息。
然后,Debezium JSON 消息可能如下所示:

{
    
  "schema": {
    
    ...
  },
  "payload": {
    
    "before": {
    
      "id": 111,
      "name": "scooter",
      "description": "Big 2-wheel scooter",
      "weight": 5.18
    },
    "after": {
    
      "id": 111,
      "name": "scooter",
      "description": "Big 2-wheel scooter",
      "weight": 5.15
    },
    "source": {
    
      ...
    },
    "op": "u",
    "ts_ms": 1589362330904,
    "transaction": null
  }
}

为了解析这类消息,需要在上述 DDL WITH 子句中添加选项 ‘debezium-json.schema-include’ = ‘true’(默认为 false)。
建议不要包含 schema 的描述,因为这样会使消息变得非常冗长,并降低解析性能。

在将主题注册为 Flink 表之后,可以将 Debezium 消息用作变更日志源。

-- MySQL "products" 的实时物化视图
-- 计算相同产品的最新平均重量
SELECT name, AVG(weight) FROM topic_products GROUP BY name;

-- 将 MySQL "products" 表的所有数据和增量更改同步到
-- Elasticsearch "products" 索引,供将来查找
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;

可用的元数据

以下format元数据可以在表定义中作为只读虚拟(VIRTUAL)列

注意:只有在对应的连接器可以传递 format 元数据时,format 元数据属性才可用。目前,只有 kafka 连接器可以暴露元数据属性到他的 value format。

数据类型描述
schemaSTRING NULLpayload中JSON格式的schema。如果Debezium数据中不包含schema,则返回NULL。
ingestion-timestampTIMESTAMP_LTZ(3) NULL连接器处理时间的时间戳。和Debezium数据中的ts_ms属性一致。
source.timestampTIMESTAMP_LTZ(3) NULLsource系统创建事件的时间戳。和Debezium数据中的source.ts_ms属性一致。
source.databaseSTRING NULL原始数据库名称。和Debezium数据中的source.db属性一致。
source.schemaSTRING NULL原始数据库的schema。和Debezium数据中的source.schema属性一致。
source.tableSTRING NULL原始数据库表名。和Debezium数据中的 source.collection 属性一致。
source.propertiesMAP<STRING, STRING> NULLsource源的属性表。和Debezium数据中的source属性一致。

下面的例子展示了如何在Kafka中访问Debezium元数据字段:

CREATE TABLE KafkaTable (
    origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
    event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
    origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
    origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
    origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
    origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_behavior',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'earliest-offset',
    'value.format' = 'debezium-json'
);

format参数

Flink 提供了 debezium-avro-confluentdebezium-json 两种 format 来解析 Debezium 生成的 JSON 格式和 Avro 格式的消息。
请使用 debezium-avro-confluent 来解析 Debezium 的 Avro 消息,使用 debezium-json 来解析 Debezium 的 JSON 消息。

Debezium Avro

参数是否必选默认值类型描述
format必选(none)String指定使用哪个format,这儿应该是 debezium-avro-confluent
debezium-avro-confluent.basic-auth.credentials-source可选(none)StringBasic auth credentials source for Schema Registry
debezium-avro-confluent.basic-auth.user-info可选(none)StringBasic auth user info for schema registry
debezium-avro-confluent.bearer-auth.credentials-source可选(none)StringBearer auth credentials source for Schema Registry
debezium-avro-confluent.bearer-auth.token可选(none)StringBearer auth token for Schema Registry
从 flink-1.14.x 开始支持
debezium-avro-confluent.properties
可选(none)Map转发到下面 schema 注册的属性 map 表,这对于没有通过Flink配置选项正式公开的选项很有用,但是 Flink 选项拥有更高的优先级。
debezium-avro-confluent.ssl.keystore.location可选(none)StringLocation / File of SSL keystore
debezium-avro-confluent.ssl.keystore.password可选(none)StringPassword for SSL keystore
debezium-avro-confluent.ssl.truststore.location可选(none)StringLocation / File of SSL truststore
debezium-avro-confluent.ssl.truststore.password可选(none)StringPassword for SSL truststore
flink-1.13.x:debezium-avro-confluent.schema-registry.subject
flink-1.14.x:debezium-avro-confluent.subject
可选(none)StringConfluent模式注册中心主题,在该主题下注册此格式在序列化期间使用的schema。
默认情况下,kafkaupsert-kafka连接器使用<topic_name>-value<topic_name>-key作为默认主题名。但对于其他连接器(例如:filesystem),当用作接收器时,subject选项是必需的。
flink-1.14.x:debezium-avro-confluent.schema-registry.url
flink-1.14.x:debezium-avro-confluent.url
必选(none)StringConfluent Schema Registry 获取/注册 schema 的URL.

Debezium Json

参数是否必选默认值类型描述
format必选(none)String指定要使用的格式,此处应为 debezium-json
debezium-json.schema-include可选falseBoolean设置 Debezium Kafka Connect 时,用户可以启用 Kafka 配置 value.converter.schemas.enable 以在消息中包含 schema。此选项表明 Debezium JSON 消息是否包含 schema。
debezium-json.ignore-parse-errors可选falseBoolean当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。
debezium-json.timestamp-format.standard可选‘SQL’String声明输入和输出的时间戳格式。当前支持的格式为 SQL 以及 ‘ISO-8601’。
可选参数 SQL 将会以 yyyy-MM-dd HH:mm:ss.s{precision} 的格式解析时间戳, 例如 ‘2020-12-30 12:13:14.123’,且会以相同的格式输出。
可选参数 ISO-8601 将会以 yyyy-MM-ddTHH:mm:ss.s{precision} 的格式解析输入时间戳, 例如 ‘2020-12-30T12:13:14.123’ ,且会以相同的格式输出。
debezium-json.map-null-key.mode选填‘FAIL’String指定处理 Map 中 key 值为空的方法. 当前支持的值有 FAIL , DROPLITERAL
FAIL 如果遇到 Map 中 key 值为空的数据,将抛出异常。
DROP 将丢弃 Map 中 key 值为空的数据项。
LITERAL 将使用字符串常量来替换 Map 中的空 key 值。
字符串常量的值由 debezium-json.map-null-key.literal 定义。
debezium-json.map-null-key.literal选填‘null’String当 ‘debezium-json.map-null-key.mode’ 是 LITERAL 的时候,指定字符串常量替换 Map中的空 key 值。
debezium-json.encode.decimal-as-plain-number选填falseBoolean将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027

Canal

说明

支持:

  • Changelog-Data-Capture CDC
  • Format Format: Serialization 序列化格式
  • Schema Format: Deserialization Schema 反序列化格式

Canal 是一个 CDC(ChangeLog Data Capture,变更日志数据捕获)工具,可以实时地将 MySQL 变更传输到其他系统。
Canal 为变更日志提供了统一的数据格式,并支持使用 JSONprotobuf 序列化消息(Canal 默认使用 protobuf)。

Flink 支持将 Canal 的 JSON 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,这个特性非常有用,例如

  • 将增量数据从数据库同步到其他系统
  • 日志审计
  • 数据库的实时物化视图
  • 关联维度数据库的变更历史,等等

Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Canal 格式的 JSON 消息,输出到 Kafka 等存储中。

但需要注意的是,目前 Flink 还不支持将 UPDATE_BEFOREUPDATE_AFTER 合并为一条 UPDATE 消息。
因此,Flink 将 UPDATE_BEFOREUPDATE_AFTER 分别编码为 DELETEINSERT

注意:未来会支持 Canal protobuf 类型消息的解析以及输出 Canal 格式的消息。

依赖

为了使用Canal格式,使用自动化构建工具(如Maven或SBT)的项目和使用SQL JAR包的SQL Client都需要以下依赖项。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>1.13.0</version>
</dependency>

注意自己使用的 flink 版本。

注意:有关如何部署 Canal 以将变更日志同步到消息队列,请参阅 Canal 文档。

使用 Canal Format

{
    
  "data": [
    {
    
      "id": "111",
      "name": "scooter",
      "description": "Big 2-wheel scooter",
      "weight": "5.18"
    }
  ],
  "database": "inventory",
  "es": 1589373560000,
  "id": 9,
  "isDdl": false,
  "mysqlType": {
    
    "id": "INTEGER",
    "name": "VARCHAR(255)",
    "description": "VARCHAR(512)",
    "weight": "FLOAT"
  },
  "old": [
    {
    
      "weight": "5.15"
    }
  ],
  "pkNames": [
    "id"
  ],
  "sql": "",
  "sqlType": {
    
    "id": 4,
    "name": 12,
    "description": 12,
    "weight": 7
  },
  "table": "products",
  "ts": 1589373560798,
  "type": "UPDATE"
}

注意:有关各个字段的含义,请参阅 Canal 文档。

MySQL products 表有4列(id,name,description 和 weight)。

上面的 JSON 消息是 products 表上的一个更新事件,表示 id = 111 的行数据上 weight 字段值从5.15变更成为 5.18
假设消息已经同步到了一个 Kafka 主题:products_binlog,那么就可以使用以下DDL来从这个主题消费消息并解析变更事件。

CREATE TABLE topic_products (
    -- 元数据与 MySQL "products" 表完全相同
    id BIGINT,
    name STRING,
    description STRING,
    weight DECIMAL(10, 2)
) WITH (
    'connector' = 'kafka',
    'topic' = 'products_binlog',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'testGroup',
    'format' = 'canal-json'  -- 使用 canal-json 格式
)

将 Kafka 主题注册成 Flink 表之后,就可以将 Canal 消息用作变更日志源。

-- 关于MySQL "products" 表的实时物化视图
-- 计算相同产品的最新平均重量
SELECT name, AVG(weight) FROM topic_products GROUP BY name;

-- 将 MySQL "products" 表的所有数据和增量更改同步到
-- Elasticsearch "products" 索引以供将来搜索
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;

可用的元数据

以下format元数据可以在表定义中作为只读虚拟(VIRTUAL)列

注意:只有在对应的连接器可以传递 format 元数据时,format 元数据属性才可用。目前,只有 kafka 连接器可以暴露元数据属性到他的 value format。

数据类型描述
databaseSTRING NULL原始数据库名。和Canal数据中的database属性一致。
tableSTRING NULL原始数据库表名。和Canal数据中的table属性一致。
sql-typeMAP<STRING, INT> NULLSQL type的map表。和Canal数据中的sqlType属性一致。
pk-namesARRAY<STRING> NULL主键名称的数组。和Canal数据中的pkNames属性一致。
ingestion-timestampTIMESTAMP_LTZ(3) NULL连接器处理事件的时间戳。和Canal数据中的ts属性一致。

下面的例子展示了如何在Kafka中访问Canal元数据字段:

CREATE TABLE KafkaTable (
    origin_database STRING METADATA FROM 'value.database' VIRTUAL,
    origin_table STRING METADATA FROM 'value.table' VIRTUAL,
    origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,
    origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL,
    origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_behavior',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'earliest-offset',
    'value.format' = 'canal-json'
);

Format 参数

选项要求默认类型描述
format必选(none)String指定要使用的格式,此处应为 canal-json
canal-json.ignore-parse-errors可选falseBoolean当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null
canal-json.timestamp-format.standard可选‘SQL’String指定输入和输出时间戳格式。当前支持的值是 SQLISO-8601
SQL:将解析 yyyy-MM-dd HH:mm:ss.s{precision} 格式的输入时间戳,例如 ‘2020-12-30 12:13:14.123’,并以相同格式输出时间戳。
ISO-8601:将解析 yyyy-MM-ddTHH:mm:ss.s{precision} 格式的输入时间戳,例如 ‘2020-12-30T12:13:14.123’,并以相同的格式输出时间戳。
canal-json.map-null-key.mode可选‘FAIL’String指定处理 Map 中 key 值为空的方法. 当前支持的值有 FAIL, DROPLITERAL
FAIL:如果遇到 Map 中 key 值为空的数据,将抛出异常。
DROP:将丢弃 Map 中 key 值为空的数据项。
LITERAL:将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 canal-json.map-null-key.literal 定义。
canal-json.map-null-key.literal可选‘null’Stringcanal-json.map-null-key.modeLITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。
canal-json.encode.decimal-as-plain-number可选falseBoolean将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027
canal-json.database.include可选(none)String一个可选的正则表达式,通过正则匹配 Canal 记录中的 database 元字段,仅读取指定数据库的 changelog 记录。正则字符串与 Java 的 Pattern 兼容。
canal-json.table.include可选(none)String一个可选的正则表达式,通过正则匹配 Canal 记录中的 table 元字段,仅读取指定表的 changelog 记录。正则字符串与 Java 的 Pattern 兼容。

注意事项

重复的变更事件

在正常的操作环境下,Canal 应用能以 exactly-once 的语义投递每条变更事件。然而,当有故障发生时,Canal 应用只能保证 at-least-once 的投递语义。
这也意味着,在非正常情况下,Canal 可能会投递重复的变更事件到消息队列中,当 Flink 从消息队列中消费的时候就会得到重复的事件。
这可能会导致 Flink 查询的运行得到错误的结果或者非预期的异常。

因此,建议在这种情况下,建议在这种情况下,将作业参数 table.exec.source.cdc-events-duplicate 设置成 true,并在该 source 上定义 PRIMARY KEY
框架会生成一个额外的有状态算子,使用该 primary key 来对变更事件去重并生成一个规范化的 changelog 流。

数据类型映射

目前,Canal Format 使用 JSON Format 进行序列化和反序列化。 有关数据类型映射的更多详细信息,请参阅 JSON Format 文档。

Maxwell

说明

支持:

  • Changelog-Data-Capture Format CDC
  • Format: Serialization Schema 序列化格式
  • Format: Deserialization Schema 反序列化格式

Maxwell是一个CDC (Changelog变更数据捕捉)工具,可以实时从MySQL流到Kafka, Kinesis和其他流连接器。Maxwell为变更日志提供了统一的数据格式,并支持使用 JSON 序列化消息。

Flink支持将Maxwell JSON消息解释为INSERT/UPDATE/DELETE消息到Flink SQL系统中。在许多情况下,这个特性是很有用的,例如

  • 将增量数据从数据库同步到其他系统
  • 日志审计
  • 数据库的实时物化视图
  • 关联维度数据库的变更历史,等等

Flink还支持将Flink SQL中的INSERT/UPDATE/DELETE消息编码为Maxwell JSON消息,并发送到Kafka等外部系统。 但是,目前Flink还不能将UPDATE_BEFOREUPDATE_AFTER合并成一个单独的UPDATE消息。因此,Flink将UPDATE_BEFOREUDPATE_AFTER编码为DELETEINSERT Maxwell消息。

依赖

为了使用Maxwell格式,以下依赖项对于使用自动化构建工具(如Maven或SBT)的项目和带有SQL JAR包的SQL Client都是必需的。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>1.13.0</version>
</dependency>

注意自己使用的 flink 版本。

注意:关于如何用Maxwell JSON同步changelog流到Kafka主题,请参考Maxwell文档

使用Maxwell格式

Maxwell为changelog流提供了统一的格式,下面是一个简单的例子,用于从JSON格式的MySQL products表中获取更新操作。

{
    
  "database": "test",
  "table": "e",
  "type": "insert",
  "ts": 1477053217,
  "xid": 23396,
  "commit": true,
  "position": "master.000006:800911",
  "server_id": 23042,
  "thread_id": 108,
  "primary_key": [
    1,
    "2016-10-21 05:33:37.523000"
  ],
  "primary_key_columns": [
    "id",
    "c"
  ],
  "data": {
    
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.15
  },
  "old": {
    
    "weight": 5.18
  }
}

注意:关于每个字段的含义,请参考Maxwell文档

MySQL products表有4列id, name, description 和weight)。 上面的JSON消息是products表上的更新更改事件,其中id = 111行的weight值从5.18更改为5.15。 假设这个消息同步到Kafka主题products_binlog,则可以使用下面的DDL来消费这个主题并解释变化事件。

CREATE TABLE topic_products (
    -- schema和MySQL的"products"表完全一致
    id BIGINT,
    name STRING,
    description STRING,
    weight DECIMAL(10, 2)
) WITH (
    'connector' = 'kafka',
    'topic' = 'products_binlog',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'testGroup',
    'format' = 'maxwell-json'
)

将主题注册为Flink表之后,就可以将Maxwell消息作为更改日志源使用了。

-- 关于MySQL "products" 表的实时物化视图
-- 计算相同产品的最新平均重量
SELECT name, AVG(weight) FROM topic_products GROUP BY name;

-- 将 MySQL "products" 表的所有数据和增量更改同步到 Elasticsearch "products" 索引以供将来搜索
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;

可用元数据

从 flink-1.14.x 开始支持。

下面的 format 元数据可以在表定义的只读虚拟(VIRTUAL)列中使用。

注意:只有在对应的连接器可以传递 format 元数据时,format 元数据属性才可用。目前,只有 kafka 连接器可以暴露元数据属性到他的 value format。

Key数据类型描述
databaseSTRING NULL原始数据库名称,如果可用,则对应于 Maxwell 数据中的database字段。
tableSTRING NULL原始数据库表名称,如果可用,则对应于 Maxwell 数据中的table字段。
primary-key-columnsARRAY<STRING> NULL主键名称数组,如果可用,则对应于 Maxwell 数据中的primary_key_columns属性。
ingestion-timestampTIMESTAMP_LTZ(3) NULL连接器处理事件的时间戳。如果可用,则对应于 Maxwell 数据中的 ts 属性。

下面的案例展示如果访问 kafka 中 Maxwell 元数据属性:

CREATE TABLE KafkaTable (
    origin_database STRING METADATA FROM 'value.database' VIRTUAL,
    origin_table STRING METADATA FROM 'value.table' VIRTUAL,
    origin_primary_key_columns ARRAY<STRING> METADATA FROM 'value.primary-key-columns' VIRTUAL,
    origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_behavior',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'earliest-offset',
    'value.format' = 'maxwell-json'
);

Format参数

选项要求默认类型描述
format必选(none)String指定要使用的格式,此处应为 maxwell-json
maxwell-json.ignore-parse-errors可选falseBoolean当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。
maxwell-json.timestamp-format.standard可选‘SQL’String指定输入和输出时间戳格式。当前支持的值是 SQLISO-8601SQL:将解析 yyyy-MM-dd HH:mm:ss.s{precision} 格式的输入时间戳,例如 ‘2020-12-30 12:13:14.123’,并以相同格式输出时间戳。 ISO-8601:将解析 yyyy-MM-ddTHH:mm:ss.s{precision} 格式的输入时间戳,例如 ‘2020-12-30T12:13:14.123’,并以相同的格式输出时间戳。
maxwell-json.map-null-key.mode可选‘FAIL’String指定处理 Map 中 key 值为空的方法. 当前支持的值有 FAIL, DROPLITERALFAIL:如果遇到 Map 中 key 值为空的数据,将抛出异常。 DROP:将丢弃 Map 中 key 值为空的数据项。 LITERAL: 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 canal-json.map-null-key.literal 定义。
maxwell-json.map-null-key.literal可选‘null’Stringcanal-json.map-null-key.modeLITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。
maxwell-json.encode.decimal-as-plain-number可选falseBoolean将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027

Ogg

说明

从 flink-1.15.x 开始支持。

支持:

  • Changelog-Data-Capture Format CDC
  • Format: Serialization Schema 序列化格式
  • Format: Deserialization Schema 反序列化格式

Oracle GoldenGate(简称 ogg)
是一个提供实时数据转化平台的管理服务,使用复制的方式保证数据高可用以及实时分析。
消费者可以设计、执行功能、并且监控他们的数据副本和流式数据处理方案,而无需收集或管理计算环境。
Ogg 对 changelog 数据提供了一个 format schema ,并且提供了 JSON 格式的序列化数据。

Flink 支持在 Flink SQL 系统中解析 Ogg JSON 数据为 INSERT/UPDATE/DELETE 数据,在很多情况下,这个特性是非常有用的,比如:

  • 从数据库同步增量数据到其他系统
  • 日志审计
  • 在数据库中实时物化视图
  • 时态连接数据库表的变更历史等等

Flink 也支持在 Flink SQL 中编码 INSERT/UPDATE/DELETE 消息为 Ogg JSON 消息,并且发射到其他系统,比如 kafka 。
然而,Flink 目前还不能合并 UPDATE_BEFOREUPDATE_AFTER 为单个 UPDATE 消息。因此,Flink 会将 UPDATE_BEFOREUPDATE_AFTER 编码为 DELETEINSERT Ogg 消息。

依赖

Ogg Json

为了使用Ogg 格式,以下依赖项对于使用自动化构建工具(如Maven或SBT)的项目和带有SQL JAR包的SQL Client都是必需的。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>1.15.0</version>
</dependency>

注意自己的 flink 版本。

注:请参考 Ogg Kafka 处理 文档
来了解怎么设置 Ogg Kafka 处理器来同步 changelog 数据到 kafka 主题。

使用 Ogg 格式

Ogg 对 changelog 提供了统一的 format,下面是一个简单的案例,展示了从 Oracle PRODUCTS 表中捕捉更新操作数据为 JSON 格式:

{
    
  "before": {
    
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.18
  },
  "after": {
    
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.15
  },
  "op_type": "U",
  "op_ts": "2020-05-13 15:40:06.000000",
  "current_ts": "2020-05-13 15:40:07.000000",
  "primary_keys": [
    "id"
  ],
  "pos": "00000000000000000000143",
  "table": "PRODUCTS"
}

注:请参考 Debezium 文档来了解每个属性的含义。

Oracle PRODUCTS 表有4个字段 (id, name, description, weight),上面的 JSON 数据是 PRODUCTS 表的一个更新变更事件,
id 为 111 的 weight 值从 5.18 变成了 5.15。假设这个数据同步到了 kafka 的 products_ogg 主题,然后我们就可以使用下面的 DDL 语句消费这个主题,并解析这个变更事件。

CREATE TABLE topic_products (
    -- schema和 oracle 的 "products" 表是完全相同的
    id BIGINT,
    name STRING,
    description STRING,
    weight DECIMAL(10, 2)
) WITH (
    'connector' = 'kafka',
    'topic' = 'products_ogg',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'testGroup',
    'format' = 'ogg-json'
)

将主题注册为 flink 表之后,就可以将 Ogg 消息作为 changelog source 来消费了。

-- Oracle "PRODUCTS" 表的实时物化视图,该视图计算了镶贴工产品最新的平均重量
SELECT name, AVG(weight)
FROM topic_products
GROUP BY name;

-- 同步 Oracle "PRODUCTS" 表的所有数据和增量变更数据到 Elasticsearch 的 "products" 索引中,以便将来进行搜索
INSERT INTO elasticsearch_products
SELECT *
FROM topic_products;

可用元数据

下面的元数据可以暴露到表定义的只读虚拟(VIRTUAL)字段中。

注意:只有在对应的连接器可以传递 format 元数据时,format 元数据属性才可用。目前,只有 kafka 连接器可以暴露元数据属性到他的 value format。

Key数据类型描述
tableSTRING NULL表的全限定名称。表的权限名称格式为:CATALOG NAME.SCHEMA NAME.TABLE NAME
primary-keysARRAY<STRING> NULL源表主键字段名称数组,如果 includePrimaryKeys 配置设置为 true ,则主键属性值只包含在 JSON 格式的输出数据中。
ingestion-timestampTIMESTAMP_LTZ(6) NULL连接器处理事件的时间戳,对应 Ogg 数据中的 current_ts 属性。
event-timestampTIMESTAMP_LTZ(6) NULLsource 系统创建事件的时间戳,对应于 Ogg 数据中的 op_ts 属性。

下面的案例展示如何访问 kafka 中 Ogg 元数据属性:

CREATE TABLE KafkaTable (
    origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
    event_time TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL,
    origin_table STRING METADATA FROM 'value.table' VIRTUAL,
    primary_keys ARRAY<STRING> METADATA FROM 'value.primary-keys' VIRTUAL,
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_behavior',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'earliest-offset',
    'value.format' = 'ogg-json'
);

Format 参数

参数要求默认值类型描述
format必选(none)String指定使用的 format ,这儿应该为:ogg-json
ogg-json.ignore-parse-errors可选falseBoolean跳过转化失败的属性和行而不是失败,如果遇到错误,属性值将被设置为 null
ogg-json.timestamp-format.standard可选‘SQL’String指定输入和输出时间戳的格式,目前支持的值为:SQLISO-8601
SQL:转化时间戳为 yyyy-MM-dd HH:mm:ss.s{precision} 格式,比如: ‘2020-12-30 12:13:14.123’。
ISO-8601:转化时间戳为 yyyy-MM-ddTHH:mm:ss.s{precision} 格式,比如:‘2020-12-30T12:13:14.123’。
ogg-json.map-null-key.mode可选‘FAIL’String指定 map 类型数据遇到 key 值为 null 时的处理方式。目前支持的值为:FAILDROPLITERAL
FAIL:遇到 map 数据中的 key 为 null 时抛出异常。
DROP:删除 map 数据中的 key 为 null 的 entry。
LITERAL:替换 key 为 null 值的字符串字面量。字符串字面量值通过 ogg-json.map-null-key.literal 选项定义。
ogg-json.map-null-key.literal可选‘null’String指定当 ogg-json.map-null-key.mode 选项值为 LITERAL 时,要替换为的字符串字面量值。

数据类型映射

目前,Ogg 格式使用 JSON 格式来序列化和反序列化。
请参考 JSON 格式文档来获取更多有关数据类型映射的细节。

Parquet

说明

支持:

  • Format: Serialization Schema 序列化格式
  • Format: Deserialization Schema 反序列化格式

Apache Parquet 格式允许读写 Parquet 数据.

依赖

为了使用Parquet格式,使用自动化构建工具(如Maven或SBT)的项目和使用SQL JAR包的SQL Client都需要以下依赖项。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-parquet_2.11</artifactId>
    <version>1.13.0</version>
</dependency>

注意自己使用的 flink 和 scala 版本。

使用Parquet格式

以下为用 Filesystem 连接器和 Parquet 格式创建表的示例:

CREATE TABLE user_behavior (
user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    dt STRING
) PARTITIONED BY (dt) WITH (
    'connector' = 'filesystem',
    'path' = '/tmp/user_behavior',
    'format' = 'parquet'
)

Format 参数

参数是否必须默认值类型描述
format必选(none)String指定使用的格式,此处应为 parquet
parquet.utc-timezone可选falseBoolean使用 UTC 时区或本地时区在纪元时间和 LocalDateTime 之间进行转换。Hive 0.x/1.x/2.x 使用本地时区,但 Hive 3.x 使用 UTC 时区。

Parquet 格式也支持 ParquetOutputFormat 的配置。 例如, 可以配置 parquet.compression=GZIP 来开启 gzip 压缩。

数据类型映射

目前,Parquet 格式类型映射与 Apache Hive 兼容,但与 Apache Spark 有所不同:

  • Timestamp:不参考精度,直接映射 timestamp 类型至 int96。
  • Decimal:根据精度,映射 decimal 类型至固定长度字节的数组。

下表列举了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。

Flink 数据类型Parquet 类型Parquet 逻辑类型
CHAR / VARCHAR / STRINGBINARYUTF8
BOOLEANBOOLEAN
BINARY / VARBINARYBINARY
DECIMALFIXED_LEN_BYTE_ARRAYDECIMAL
TINYINTINT32INT_8
SMALLINTINT32INT_16
INTINT32
BIGINTINT64
FLOATFLOAT
DOUBLEDOUBLE
DATEINT32DATE
TIMEINT32TIME_MILLIS
TIMESTAMPINT96
从 flink-1.15.x 开始支持
ARRAY
LIST
从 flink-1.15.x 开始支持
MAP
MAP
从 flink-1.15.x 开始支持
ROW
STRUCT

注意

flink-1.13.x:暂不支持复合数据类型(Array、Map 与 Row)。
flink-1.15.x:复合数据类型(Array、Map 与 Row)只支持写,还不支持读。

Orc

说明

支持:

  • Format: Serialization Schema 序列化格式
  • Format: Deserialization Schema 反序列化格式

Apache Orc Format 允许读写 ORC 数据。

依赖

为了使用ORC格式,以下依赖项对于使用自动化构建工具(如Maven或SBT)的项目和带有SQL JAR包的SQL Client都是必需的。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-orc_2.11</artifactId>
    <version>1.13.0</version>
</dependency>

注意自己使用的 flink 和 scala 版本。

使用Orc格式

下面是一个用 Filesystem connector 和 Orc format 创建表的例子

CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    dt STRING
) PARTITIONED BY (dt) WITH (
    'connector' = 'filesystem',
    'path' = '/tmp/user_behavior',
    'format' = 'orc'
)

Format 参数

参数是否必选默认值类型描述
format必选(none)String指定要使用的格式,这里应该是 orc

Orc 格式也支持来源于 Table properties 的表属性。 举个例子,你可以设置 orc.compress=SNAPPY 来允许spappy压缩。

数据类型映射

Orc 格式类型的映射和 Apache Hive 是兼容的。下面的表格列出了 Flink 类型的数据和 Orc 类型的数据的映射关系。

Flink 数据类型Orc 物理类型Orc 逻辑类型
CHARbytesCHAR
VARCHARbytesVARCHAR
STRINGbytesSTRING
BOOLEANlongBOOLEAN
BYTESbytesBINARY
DECIMALdecimalDECIMAL
TINYINTlongBYTE
SMALLINTlongSHORT
INTlongINT
BIGINTlongLONG
FLOATdoubleFLOAT
DOUBLEdoubleDOUBLE
DATElongDATE
TIMESTAMPtimestampTIMESTAMP
从 flink-1.14.x 开始支持
ARRAY
-LIST
从 flink-1.14.x 开始支持
MAP
-MAP
从 flink-1.14.x 开始支持
ROW
-STRUCT

flink-1.13.x:复合数据类型:Array、Map、Row还不支持。
flink-1.14.x:开始支持复合数据类型。

Raw

说明

支持:

  • Format: Serialization Schema 序列化格式
  • Format: Deserialization Schema 反序列化格式

Raw format 允许读写原始(基于字节)数据作为单个列的值。

注意: 这种格式会将 null 值编码成 byte[] 类型的 null,因此在 upsert-kafka 中使用时可能会有限制,因为 upsert-kafka 将 null 值视为删除消息(在key上删除)。
因此,如果该字段可能有 null 值,我们建议避免使用 upsert-kafka 连接器和 raw format 作为 value.format。

Raw format 连接器是内置的,不需要添加额外的连接器依赖。

使用Raw格式

比如在 Kafka 中有以下原始日志数据,希望使用 Flink SQL 读取和分析此类数据。

47.29.201.179 - - [28/Feb/2019:13:17:10 +0000] "GET /?p=1 HTTP/2.0" 200 5316 "https://domain.com/?p=1" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36" "2.75"

下面的代码创建了一张表,使用 raw format 以 UTF-8 编码的形式从中读取(也可以写入)底层的 Kafka topic 主题数据为字符串:

CREATE TABLE nginx_log (
    log STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'nginx_log',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'testGroup',
    'format' = 'raw'
)

将原始数据读取为纯字符串,之后使用用户自定义函数将其分为多个字段进行进一步分析。例如示例中的 my_split。

SELECT t.hostname, t.datetime, t.url, t.browser, ...
FROM
    (
    SELECT my_split(log) as t FROM nginx_log
    )
;

相对应的,也可以将一个字符串类型的列以 UTF-8 编码的形式写入 Kafka topic。

Format 参数

参数是否必选默认值类型描述
format必选(none)String指定要使用的格式, 这里应该是 raw
raw.charset可选UTF-8String指定字符集来编码文本字符串。
raw.endianness可选big-endianString指定字节序来编码数字值的字节。有效值为 big-endianlittle-endian 。更多细节可查阅字节序

数据类型映射

下表详细说明了这种格式支持的 SQL 类型,包括用于编码和解码的序列化类和反序列化类的详细信息。

Flink SQL 类型
CHAR / VARCHAR / STRINGUTF-8(默认)编码的文本字符串。
编码字符集可以通过 raw.charset 进行配置。
BINARY / VARBINARY / BYTES字节序列本身。
BOOLEAN表示布尔值的单个字节,0表示 false, 1 表示 true。
TINYINT有符号数字值的单个字节。
SMALLINT采用big-endian(默认)编码的两个字节。
字节序可以通过 raw.endianness 配置。
INT采用 big-endian (默认)编码的四个字节。
字节序可以通过 raw.endianness 配置。
BIGINT采用 big-endian (默认)编码的八个字节。
字节序可以通过 raw.endianness 配置。
FLOAT采用 IEEE 754 格式和 big-endian (默认)编码的四个字节。
字节序可以通过 raw.endianness 配置。
DOUBLE采用 IEEE 754 格式和 big-endian (默认)编码的八个字节。
字节序可以通过 raw.endianness 配置。
RAW通过 RAW 类型的底层 TypeSerializer 序列化的字节序列。
原网站

版权声明
本文为[第一片心意]所创,转载请带上原文链接,感谢
https://blog.csdn.net/u012443641/article/details/126128652