当前位置:网站首页>Flink < --> Introduction to JDBC +with parameter
Flink < --> Introduction to JDBC +with parameter
2022-07-03 10:48:00 【Samooyou】
Introduce
JDBC Connector Provide for the right to MySQL、PostgreSQL、Oracle And other common database read-write support .
at present Oceanus Provided flink-connector-jdbc
Connector Components have been built in MySQL and PostgreSQL Driver program . If necessary, connect Oracle Wait for other databases , By attaching Custom package The way , Upload the corresponding JDBC Driver Of JAR package .
Using range
JDBC Support as data source table (Source), Used to scan tables by fixed columns and for JOIN The right table of ( Dimension table ). It is also supported as a data destination table (sink), be used for Tuple Data flow tables and are used for Upsert Data flow table ( You need to specify a primary key ).
If necessary JDBC The change records of the database are consumed as streaming source tables , Use the built-in CDC data source . If built-in CDC The data source cannot meet the demand , You can also use Debezium、Canal etc. , Yes JDBC Capture and subscribe to changes in the database , then Oceanus These change events can be further handled . Details can be found at Message queue Kafka.
Example
Used as a data source for column scanning (Source)
|
Used as dimension table data source (Source)
|
For data purposes (Tuple Sink)
|
For data purposes (Upsert Sink)
|
Universal WITH Parameters
Parameter values | Required | The default value is | describe |
---|---|---|---|
connector | yes | nothing | When connecting to the database , Need to fill in 'jdbc' . |
url | yes | nothing | JDBC Database connection URL. |
table-name | yes | nothing | Database table name . |
driver | no | nothing | JDBC Driver The name of the class . If you don't type , Automatically from url Infer from . |
username | no | nothing | Database user name .'username' and 'password' Must be used at the same time . |
password | no | nothing | Database password . |
scan.partition.column | no | nothing | Specify to scan the input partition (Partitioned Scan) Column name of , The column must be of numeric type 、 The date type 、 Timestamp type, etc . Details about partition scanning , See also Partition scan . |
scan.partition.num | no | nothing | After partition scanning is enabled , Specify the number of partitions . |
scan.partition.lower-bound | no | nothing | After partition scanning is enabled , Specify the minimum value of the first partition . |
scan.partition.upper-bound | no | nothing | After partition scanning is enabled , Specify the maximum value of the last partition . |
scan.fetch-size | no | 0 | Every time you read from the database , The number of rows obtained in batch . The default is 0, It means reading line by line , Low efficiency ( Throughput is not high ). |
lookup.cache.max-rows | no | nothing | The query cache (Lookup Cache) The maximum number of cached data in . |
lookup.cache.ttl | no | nothing | Query the maximum cache time of each record in the cache . |
lookup.max-retries | no | 3 | When the database query fails , Maximum number of retries . |
sink.buffer-flush.max-rows | no | 100 | Batch output , How much data is cached in the cache at most . If set to 0, Indicates that output caching is prohibited . |
sink.buffer-flush.interval | no | 1s | Batch output , Maximum interval per batch ( millisecond ). If 'sink.buffer-flush.max-rows' Set to '0' , And this option is not zero , It means that the pure asynchronous output function is enabled , That is, the data is output to the operator 、 The threads from the operator to the database are completely decoupled . |
sink.max-retries | no | 3 | When the database write fails , Maximum number of retries . |
Primary key description
- about Append(Tuple) data , Homework DDL There is no need to set the primary key ,JDBC There is no need to define a primary key for a database table , It is not recommended to define a primary key ( Otherwise, the write may fail due to repeated data ).
- about Upsert data ,JDBC Table of database must Define the primary key , And need to DDL Of the statement CREATE TABLE in , Add the corresponding column name
PRIMARY KEY NOT ENFORCED
constraint .
Be careful :
- about MySQL surface ,Upsert The realization of function depends on
INSERT .. ON DUPLICATE KEY UPDATE ..
grammar , Common versions of MySQL Both support this syntax .- about PostgreSQL surface ,Upsert Function realization depends on
INSERT .. ON CONFLICT .. DO UPDATE SET ..
grammar , This syntax requires PostgreSQL 9.5 And above versions are supported .- about Oracle surface ,Upsert Function implementation depends on
MERGE .. INTO .. USING ON .. WHEN UPDATE .. WHEN INSERT ..
grammar , This syntax requires Oracle 9i And above versions can support .
Partition scan
Partition scan (Partitioned Scan) Can speed up multiple parallelism Source Operator read JDBC Data sheet , Each subtask can read its own private partition . When using this function , All four scan.partition
All parameters at the beginning must be specified , Otherwise, an error will be reported .
Be careful :
there
scan.partition.upper-bound
The specified maximum value andscan.partition.lower-bound
Specified minimum , It refers to the step size of each partition , It will not affect the number and accuracy of data finally read .
Read cache
Flink Provides read cache (Lookup Cache) function , It can improve the performance of dimension table reading . At present, the implementation of this cache is synchronous , Default not enabled ( Every request will read the database , Throughput is very low ), Must be set manually lookup.cache.max-rows
and lookup.cache.ttl
Two parameters to enable this function .
Be careful :
If it's cached TTL Too long , Or there are too many cached entries , It may cause the data in the database to be updated ,Flink The job still reads the old data in the cache . Therefore, operations sensitive to database changes , Please use caching function with caution .
Batch write optimization
By setting sink.buffer-flush The first two parameters , You can write to the database in batches . It is recommended to match the parameters of the corresponding underlying database , To achieve better batch writing effect , Otherwise, the bottom layer will still write one by one , The efficiency is not high .
- about MySQL, It is suggested that url After connecting parameters, add rewriteBatchedStatements=true Parameters .
|
- about PostgreSQL, It is suggested that url After connecting parameters, add reWriteBatchedInserts=true Parameters .
|
Reference resources :JDBC | Apache Flink
边栏推荐
- UI interface design related knowledge (I)
- extern关键字
- QT:QSS自定义QTableView实例
- 六、MySQL之数据定义语言(一)
- Jupiter notebook changing font style and font size
- Leetcode刷题---1385
- Unity学习笔记:联网游戏Pixel Adventure 1学习过程&纠错心得
- 7、 Data definition language of MySQL (2)
- How does MySQL find the latest data row that meets the conditions?
- Type de contenu « Application / X - www - form - urlencoded; Charset = utf - 8 'not supported
猜你喜欢
Unity学习笔记:个人学习项目《疯狂天才埃德加》纠错文档
User recommendation preference model based on attention enhanced knowledge perception
Numpy realizes the classification of iris by perceptron
Preliminary knowledge of Neural Network Introduction (pytorch)
神经网络入门之模型选择(PyTorch)
Entropy method to calculate weight
Interviewer: what is the internal implementation of the list in redis?
Knowledge map enhancement recommendation based on joint non sampling learning
深度学习入门之自动求导(Pytorch)
七、MySQL之数据定义语言(二)
随机推荐
Leetcode刷题---10
Ut2012 learning notes
logstash备份跟踪上报的数据记录
Nuget add reference error while installing packages
[untitled]
extern关键字
How to make a blood bar in the game
A detailed explanation of vector derivative and matrix derivative
Leetcode skimming ---852
Numpy realizes the classification of iris by perceptron
Flink -- 内置函数(ALL)
Pour vous amener dans le monde des bases de données natives du cloud
Leetcode刷题---263
Leetcode skimming ---75
Ut2015 learning notes
Leetcode skimming ---44
权重衰退(PyTorch)
Leetcode skimming ---283
Classification (data consolidation and grouping aggregation)
Jetson TX2 brush machine