当前位置:网站首页>Pyflink writes MySQL examples with JDBC
Pyflink writes MySQL examples with JDBC
2022-07-02 04:58:00 【AG Nanshan】
pyflink use jdbc Connect mysql Example
Get ready :
Software :python3.7 pycharm
oriented flink1.13.5
Install the module :python -m pip install apache-flink==1.13.5
add to mysql Of jdbc jar plug-in unit 、pyflink Of flink-connector-jdbc_2.11 Two plug-ins
To pyflink Modular lib Folder ( Be careful mysql Of jdbc edition 、pyflink The path of 、flink Version of )
${PYTHON_HOME} \Python37\site-packages\pyflink\lib
establish mysql surface
CREATE TABLE `print_table` (
`f0` int(11) DEFAULT NULL,
`f1` int(11) DEFAULT NULL,
`f2` varchar(500) DEFAULT NULL
)`
Specifically python Code
( Be careful jdbc url Of hostname、database and user、password Make changes according to your )
from pyflink.table import EnvironmentSettings, TableEnvironment
# 1. establish TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = TableEnvironment.create(env_settings)
# 2. establish source surface
table_env.execute_sql("""CREATE TABLE source_table ( f0 INT, f1 INT, f2 STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second'='5' ) """)
# 3. establish sink surface
table_env.execute_sql("""CREATE TABLE print_table ( f0 INT, f1 INT, f2 STRING ) WITH ( 'connector' = 'jdbc', 'url'='jdbc:mysql://hostname:3306/test?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC', 'username'='root', 'password'='password', 'table-name' = 'print_table' ) """)
# Or by SQL Query statements to write sink surface :
table_env.execute_sql("insert into print_table select f0,f1,f2 from source_table").wait(60000)
# wait(60000) Of 60000 It's a timeout ,60000 millisecond , namely 60 second , This can be changed according to your own needs
Source of reprint
边栏推荐
- How to configure PostgreSQL 12.9 to allow remote connections
- 案例分享|智慧化的西部机场
- Video cover image setting, put cover images into multiple videos in the simplest way
- 社交媒体搜索引擎优化及其重要性
- 数据库问题汇总
- Let genuine SMS pressure measurement open source code
- Summary of database problems
- Mapping location after kotlin confusion
- 缓存一致性解决方案——改数据时如何保证缓存和数据库中数据的一致性
- How do I interview for a successful software testing position? If you want to get a high salary, you must see the offer
猜你喜欢

Several methods of capturing packets under CS framework

CorelDRAW graphics suite2022 free graphic design software

How to configure PostgreSQL 12.9 to allow remote connections

Orthogonal test method and function diagram method for test case design

Mysql database learning

Mysql表insert中文变?号的问题解决办法

Cultivate primary and secondary school students' love for educational robots

win11安装pytorch-gpu遇到的坑

2022-003arts: recursive routine of binary tree

Promise all()
随机推荐
Embedded-c language-9-makefile/ structure / Consortium
Here comes the chicken soup! Keep this quick guide for data analysts
Mysql表insert中文变?号的问题解决办法
CorelDRAW graphics suite2022 free graphic design software
idea自动导包和自动删包设置
js面试收藏试题1
6.30年终小结,学生时代结束
DJB Hash
Summary of common string processing functions in C language
TypeScript函数详解
I sorted out some basic questions about opencv AI kit.
Video multiple effects production, fade in effect and border background are added at the same time
C# 基于MQTTNet的服务端与客户端通信案例
Application d'un robot intelligent dans le domaine de l'agroécologie
[common error] the DDR type of FPGA device is selected incorrectly
農業生態領域智能機器人的應用
Research on the security of ognl and El expressions and memory horse
[Yu Yue education] autumn 2021 reference materials of Tongji University
缓存一致性解决方案——改数据时如何保证缓存和数据库中数据的一致性
初学爬虫-笔趣阁爬虫