当前位置:网站首页>Pyflink writes MySQL examples with JDBC
Pyflink writes MySQL examples with JDBC
2022-07-02 04:58:00 【AG Nanshan】
pyflink use jdbc Connect mysql Example
Get ready :
Software :python3.7 pycharm
oriented flink1.13.5
Install the module :python -m pip install apache-flink==1.13.5
add to mysql Of jdbc jar plug-in unit 、pyflink Of flink-connector-jdbc_2.11 Two plug-ins
To pyflink Modular lib Folder ( Be careful mysql Of jdbc edition 、pyflink The path of 、flink Version of )
${PYTHON_HOME} \Python37\site-packages\pyflink\lib
establish mysql surface
CREATE TABLE `print_table` (
`f0` int(11) DEFAULT NULL,
`f1` int(11) DEFAULT NULL,
`f2` varchar(500) DEFAULT NULL
)`
Specifically python Code
( Be careful jdbc url Of hostname、database and user、password Make changes according to your )
from pyflink.table import EnvironmentSettings, TableEnvironment
# 1. establish TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = TableEnvironment.create(env_settings)
# 2. establish source surface
table_env.execute_sql("""CREATE TABLE source_table ( f0 INT, f1 INT, f2 STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second'='5' ) """)
# 3. establish sink surface
table_env.execute_sql("""CREATE TABLE print_table ( f0 INT, f1 INT, f2 STRING ) WITH ( 'connector' = 'jdbc', 'url'='jdbc:mysql://hostname:3306/test?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC', 'username'='root', 'password'='password', 'table-name' = 'print_table' ) """)
# Or by SQL Query statements to write sink surface :
table_env.execute_sql("insert into print_table select f0,f1,f2 from source_table").wait(60000)
# wait(60000) Of 60000 It's a timeout ,60000 millisecond , namely 60 second , This can be changed according to your own needs
Source of reprint
边栏推荐
- 【ClickHouse】How to create index for Map Type Column or one key of it?
- Summary of main account information of zhengdaliu 4
- idea自动导包和自动删包设置
- C case of communication between server and client based on mqttnet
- Steam教育的实际问题解决能力
- Alibaba cloud polkit pkexec local rights lifting vulnerability
- 6.30年终小结,学生时代结束
- Record the bug of unity 2020.3.31f1 once
- 面试会问的 Promise.all()
- Social media search engine optimization and its importance
猜你喜欢
06 decorator mode
Analyze the space occupied by the table according to segments, clusters and pages
Thinkphp內核工單系統源碼商業開源版 多用戶+多客服+短信+郵件通知
ThinkPHP kernel work order system source code commercial open source version multi user + multi customer service + SMS + email notification
Detailed process of DC-1 range construction and penetration practice (DC range Series)
Ansible installation and use
Leetcode merge sort linked list
Getting started with pytest -- description of fixture parameters
Virtual machine installation deepin system
Lm09 Fisher inverse transform inversion mesh strategy
随机推荐
汇编语言中的标志位:CF、PF、AF、ZF、SF、TF、IF、DF、OF
National all Chinese Automatic Test Software apifox
fastText文本分类
Leetcode merge sort linked list
Orthogonal test method and function diagram method for test case design
Mathematical knowledge -- understanding and examples of fast power
Interview question: do you know the difference between deep copy and shallow copy? What is a reference copy?
案例分享|智慧化的西部机场
LeetCode-归并排序链表
cs架构下抓包的几种方法
Idea automatic package import and automatic package deletion settings
Rhcsa --- work on the third day
CorelDRAW graphics suite2022 free graphic design software
Pytest learning ----- pytest assertion of interface automation testing
Let genuine SMS pressure measurement open source code
Cultivate primary and secondary school students' love for educational robots
06 装饰(Decorator)模式
MMAP zero copy knowledge point notes
Leetcode basic programming: array
Summary of database problems