当前位置:网站首页>Flink SQL(三) 连接到外部系统System和JDBC
Flink SQL(三) 连接到外部系统System和JDBC
2022-07-26 13:51:00 【ambitfly】
文件系统
另一类非常常见的外部系统就是文件系统(File System)了。Flink 提供了文件系统的连 接器,支持从本地或者分布式的文件系统中读写数据。这个连接器是内置在 Flink 中的,所以使用它并不需要额外引入依赖。
下面是一个连接到文件系统的示例:
CREATE TABLE MyTable (
column_name1 INT,
column_name2 STRING,
...
part_name1 INT,
part_name2 STRING
) PARTITIONED BY (part_name1, part_name2) WITH (
'connector' = 'filesystem', -- 连接器类型
'path' = '...', -- 文件路径
'format' = '...' -- 文件格式
)
这里在 WITH 前使用了 PARTITIONED BY 对数据进行了分区操作。文件系统连接器支持 对分区文件的访问。
JDBC
关系型数据表本身就是 SQL 最初应用的地方,所以我们也会希望能直接向关系型数据库中读写表数据。Flink 提供的 JDBC 连接器可以通过 JDBC 驱动程序(driver)向任意的关系型 数据库读写数据,比如 MySQL、PostgreSQL、Derby 等。
作为 TableSink 向数据库写入数据时,运行的模式取决于创建表的 DDL 是否定义了主键 (primary key)。如果有主键,那么 JDBC 连接器就将以更新插入(Upsert)模式运行,可以向外部数据库发送按照指定键(key)的更新(UPDATE)和删除(DELETE)操作,如果没有定义主键,那么就将在追加(Append)模式下运行,不支持更新和删除操作。
引入依赖
想要在 Flink 程序中使用 JDBC 连接器,需要引入如下依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>此外,为了连接到特定的数据库,我们还用引入相关的驱动器依赖,比如 MySQL:
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency>创建 JDBC 表
创建 JDBC 表的方法与前面 Upsert Kafka 大同小异。下面是一个具体示例:
-- 创建一张连接到 MySQL 的 表 CREATE TABLE MyTable ( id BIGINT, name STRING, age INT, status BOOLEAN, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydatabase', 'table-name' = 'users' ); -- 将另一张表 T 的数据写入到 MyTable 表中 INSERT INTO MyTable SELECT id, name, age, status FROM T; 这里创建表的 DDL 中定义了主键,所以数据会以 Upsert 模式写入到 MySQL 表中;而到 MySQL 的连接,是通过 WITH 子句中的 url 定义的。要注意写入 MySQL 中真正的表名称是 users,而 MyTable 是注册在 Flink 表环境中的表。
边栏推荐
- Intercept the coordinate points (four point coordinates of the face frame) face image from the marked XML file and save it in the specified folder
- [oauth2] VII. Wechat oauth2 authorized login
- Redis learning notes
- Completable future practical usage
- The last time I heard about eBay, or the last time
- Synchronization mechanism of go (sync.mutex)
- GDB common commands
- Pytorch学习笔记(一)安装与常用函数的使用
- Segmentation fault (core dumped)
- 消息的订阅和发布
猜你喜欢

Docker integrates the redis sentinel mode (one master, two slave and three sentinels)

Time complexity and space complexity

php使用sqlserver

Frisbee, 2022 "black red" top stream

Brief introduction of reflection mechanism

上一次听到易趣,还是上一次

JS download files, filesaver.js export txt and Excel files

gdb常用命令

GDB common commands

MySql的DDL和DML和DQL的基本语法
随机推荐
【Oauth2】五、OAuth2LoginAuthenticationFilter
白帽子揭秘:互联网千亿黑产吓退马斯克
Ultimate doll 2.0 | cloud native delivery package
MySql的DDL和DML和DQL的基本语法
Why does WPS refuse advertising?
"Intermediate and advanced test questions": what is the implementation principle of mvcc?
Rotation of 2D conversion, transform origin of 2D conversion center point and scale of 2D conversion
JS object assignment problem
LCL three-phase PWM rectifier (inverter)
Pytoch learning notes (III) use, modification, training (cpu/gpu) and verification of the model
Digital collections accelerate the breaking of the circle and help the industry find new opportunities
404 pages and routing hooks
【Oauth2】七、微信OAuth2授权登录
Comparison between SIGMOD function and softmax function
The.Net webapi uses groupname to group controllers to render the swagger UI
重押海外:阿里、京东、顺丰再拼“内力”
Brief introduction of reflection mechanism
In 2022, we "sent away" so many Internet products in only one month
The last time I heard about eBay, or the last time
【OAuth2】八、OAuth2登录的配置逻辑-OAuth2LoginConfigurer和OAuth2ClientConfigurer