当前位置:网站首页>Pyflink writes MySQL examples with JDBC
Pyflink writes MySQL examples with JDBC
2022-07-02 04:58:00 【AG Nanshan】
pyflink use jdbc Connect mysql Example
Get ready :
Software :python3.7 pycharm
oriented flink1.13.5
Install the module :python -m pip install apache-flink==1.13.5
add to mysql Of jdbc jar plug-in unit 、pyflink Of flink-connector-jdbc_2.11 Two plug-ins
To pyflink Modular lib Folder ( Be careful mysql Of jdbc edition 、pyflink The path of 、flink Version of )
${PYTHON_HOME} \Python37\site-packages\pyflink\lib
establish mysql surface
CREATE TABLE `print_table` (
`f0` int(11) DEFAULT NULL,
`f1` int(11) DEFAULT NULL,
`f2` varchar(500) DEFAULT NULL
)`
Specifically python Code
( Be careful jdbc url Of hostname、database and user、password Make changes according to your )
from pyflink.table import EnvironmentSettings, TableEnvironment
# 1. establish TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = TableEnvironment.create(env_settings)
# 2. establish source surface
table_env.execute_sql("""CREATE TABLE source_table ( f0 INT, f1 INT, f2 STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second'='5' ) """)
# 3. establish sink surface
table_env.execute_sql("""CREATE TABLE print_table ( f0 INT, f1 INT, f2 STRING ) WITH ( 'connector' = 'jdbc', 'url'='jdbc:mysql://hostname:3306/test?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC', 'username'='root', 'password'='password', 'table-name' = 'print_table' ) """)
# Or by SQL Query statements to write sink surface :
table_env.execute_sql("insert into print_table select f0,f1,f2 from source_table").wait(60000)
# wait(60000) Of 60000 It's a timeout ,60000 millisecond , namely 60 second , This can be changed according to your own needs
Source of reprint
边栏推荐
- Orthogonal test method and function diagram method for test case design
- 初学爬虫-笔趣阁爬虫
- [bus interface] Axi interface
- AcrelEMS高速公路微电网能效管理平台与智能照明解决方案智慧点亮隧道
- Let正版短信测压开源源码
- Federal learning: dividing non IID samples according to Dirichlet distribution
- C case of communication between server and client based on mqttnet
- A new attribute value must be added to the entity entity class in the code, but there is no corresponding column in the database table
- Typescript function details
- Go Chan's underlying principles
猜你喜欢
![[common error] the DDR type of FPGA device is selected incorrectly](/img/f3/be66bcfafeed581add6d48654dfe34.jpg)
[common error] the DDR type of FPGA device is selected incorrectly

CY7C68013A之keil编译代码

How do I interview for a successful software testing position? If you want to get a high salary, you must see the offer

解析少儿编程中的动手搭建教程

奠定少儿编程成为基础学科的原理

Ognl和EL表达式以及内存马的安全研究

LeetCode-对链表进行插入排序

Summary of database problems

Steam教育的实际问题解决能力

Tawang food industry insight | current situation, consumption data and trend analysis of domestic infant complementary food market
随机推荐
Mysql重点难题(2)汇总
[high speed bus] Introduction to jesd204b
LeetCode-对链表进行插入排序
Virtual machine installation deepin system
Markdown edit syntax
Online incremental migration of DM database
Leetcode- insert and sort the linked list
Starting from the classification of database, I understand the map database
农业生态领域智能机器人的应用
Summary of main account information of zhengdaliu 4
Super detailed pycharm tutorial
社交媒体搜索引擎优化及其重要性
Markdown编辑语法
Mouse events in JS
Mysql database learning
How do I interview for a successful software testing position? If you want to get a high salary, you must see the offer
CY7C68013A之keil编译代码
[graduation season · advanced technology Er] young people have dreams, why are they afraid of hesitation
Embedded-c language-8-character pointer array / large program implementation
Oracle和MySQL的基本区别(入门级)