当前位置:网站首页>Pyflink writes MySQL examples with JDBC
Pyflink writes MySQL examples with JDBC
2022-07-02 04:58:00 【AG Nanshan】
pyflink use jdbc Connect mysql Example
Get ready :
Software :python3.7 pycharm
oriented flink1.13.5
Install the module :python -m pip install apache-flink==1.13.5
add to mysql Of jdbc jar plug-in unit 、pyflink Of flink-connector-jdbc_2.11 Two plug-ins
To pyflink Modular lib Folder ( Be careful mysql Of jdbc edition 、pyflink The path of 、flink Version of )
${PYTHON_HOME} \Python37\site-packages\pyflink\lib
establish mysql surface
CREATE TABLE `print_table` (
`f0` int(11) DEFAULT NULL,
`f1` int(11) DEFAULT NULL,
`f2` varchar(500) DEFAULT NULL
)`
Specifically python Code
( Be careful jdbc url Of hostname、database and user、password Make changes according to your )
from pyflink.table import EnvironmentSettings, TableEnvironment
# 1. establish TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = TableEnvironment.create(env_settings)
# 2. establish source surface
table_env.execute_sql("""CREATE TABLE source_table ( f0 INT, f1 INT, f2 STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second'='5' ) """)
# 3. establish sink surface
table_env.execute_sql("""CREATE TABLE print_table ( f0 INT, f1 INT, f2 STRING ) WITH ( 'connector' = 'jdbc', 'url'='jdbc:mysql://hostname:3306/test?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC', 'username'='root', 'password'='password', 'table-name' = 'print_table' ) """)
# Or by SQL Query statements to write sink surface :
table_env.execute_sql("insert into print_table select f0,f1,f2 from source_table").wait(60000)
# wait(60000) Of 60000 It's a timeout ,60000 millisecond , namely 60 second , This can be changed according to your own needs
Source of reprint
边栏推荐
- Cultivate primary and secondary school students' love for educational robots
- Future trend of automated testing ----- self healing technology
- Promise all()
- geotrust ov多域名ssl證書一年兩千一百元包含幾個域名?
- Learn BeanShell before you dare to say you know JMeter
- 6.30年终小结,学生时代结束
- 数据库问题汇总
- 奠定少儿编程成为基础学科的原理
- 解析少儿编程中的动手搭建教程
- 数学问题(数论)试除法做质数的判断、分解质因数,筛质数
猜你喜欢
Mathematical knowledge -- understanding and examples of fast power
Hcip day 17
缓存一致性解决方案——改数据时如何保证缓存和数据库中数据的一致性
Typescript function details
AcrelEMS高速公路微电网能效管理平台与智能照明解决方案智慧点亮隧道
Simple and practical accounting software, so that accounts can be checked
Leetcode- insert and sort the linked list
Markdown编辑语法
How to modify data file path in DM database
Vmware安装win10报错:operating system not found
随机推荐
Design and implementation of general interface open platform - (44) log processing of API services
Application of intelligent robot in agricultural ecology
C # picture display occupancy problem
Steam教育的实际问题解决能力
Getting started with pytest ----- confitest Application of PY
Introduction to Luogu 3 [circular structure] problem list solution
Application d'un robot intelligent dans le domaine de l'agroécologie
One step implementation of yolox helmet detection (combined with oak intelligent depth camera)
Markdown编辑语法
Comp 250 parsing
Keil compilation code of CY7C68013A
Online incremental migration of DM database
Precipitate yourself and stay up late to sort out 100 knowledge points of interface testing professional literacy
社交媒体搜索引擎优化及其重要性
Thinkphp内核工单系统源码商业开源版 多用户+多客服+短信+邮件通知
Go Chan's underlying principles
How to recover deleted data in disk
Lm09 Fisher inverse transform inversion mesh strategy
Cache consistency solution - how to ensure the consistency between the cache and the data in the database when changing data
Acelems Expressway microgrid energy efficiency management platform and intelligent lighting solution intelligent lighting tunnel