当前位置:网站首页>Flink Postgres CDC
Flink Postgres CDC
2022-07-29 01:21:00 【hyunbar】


Big data technology AI
Flink/Spark/Hadoop/ Several positions , Data analysis 、 interview , Dry goods learning materials such as source code interpretation
128 Original content
official account
1、Flink JDBC Write Postgres
Add dependency
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
Code
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class Flink_JDBC_PG {
public static void main(String[] args) {
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);
String source = "CREATE TABLE student (\n" +
" id INT,\n" +
" age INT,\n" +
" name STRING,\n" +
" create_time TIMESTAMP(3),\n" +
" WATERMARK FOR create_time AS create_time\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='20',\n" +
" 'fields.id.kind'='random' ,\n" +
" 'fields.id.min'='1',\n" +
" 'fields.id.max'='100',\n" +
" 'fields.age.kind'='random',\n" +
" 'fields.age.min'='1',\n" +
" 'fields.age.max'='100',\n" +
" 'fields.name.kind'='random',\n" +
" 'fields.name.length'='3'\n" +
")";
tableEnvironment.executeSql(source);
String sink_sql = "CREATE TABLE sink_student (\n" +
" id INT,\n" +
" age INT,\n" +
" name STRING,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:postgresql://bigdata:5432/postgres',\n" +
" 'table-name' = 'student',\n" +
" 'username'='postgres',\n" +
" 'password'='123456'\n" +
")";
tableEnvironment.executeSql(sink_sql);
String result = "insert into sink_student select id,age,name from student";
tableEnvironment.executeSql(result);
tableEnvironment.executeSql("select * from student").print();
}
}
pg Tables should be created in advance , Have a primary key
2、Postgres To configure binlog
2.1 Changing configuration files postgresql.conf
# change wal The log mode is logical
wal_level = logical # minimal, replica, or logical
# change solts The largest number ( The default value is 10),flink-cdc By default, one table occupies one slots
max_replication_slots = 20 # max number of replication slots
# change wal Send maximum number of processes ( The default value is 10), This value is the same as the one above solts Same settings
max_wal_senders = 20 # max number of walsender processes
# Break replication connections that are inactive for more than a specified number of milliseconds , It can be set a little larger ( Default 60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable

Reload the configuration or restart postgres
[[email protected] ~]# docker exec -it 7b2d8a96ef4c /bin/bash
[email protected]:/# su postgres
[email protected]:/$ pg_ctl reload
server signaled
View parameters
select *
from pg_settings
where name in ('wal_level',
'max_replication_slots',
'max_wal_senders',
'wal_sender_timeout');

2.2 Give users permission to copy streams
-- Give users permission to copy streams
ALTER ROLE postgres REPLICATION;
-- Give users access to the database
GRANT CONNECT ON DATABASE postgres TO postgres;
-- Put the current library public All table query permissions are assigned to users
GRANT SELECT ON ALL TABLES IN SCHEMA public TO postgres;
2.3 Publishing tables
-- Update and publish all table IDS ( Optional )
UPDATE pg_publication SET puballtables= false WHERE pubname IS NOT NULL;
-- View publishing settings
SELECT * FROM pg_publication;
-- Look at those table releases
SELECT * FROM pg_publication_tables;
-- Create and publish all tables
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- Add a table for publishing
ALTER PUBLICATION dbz_publication ADD TABLE student;

2.4 Changing the replication identity of a table contains updated and deleted values
-- Changing the replication ID includes updating and deleting the previous values
ALTER TABLE student REPLICA IDENTITY FULL;
-- Look at the replication ID ( by f The logo description is set successfully )
SELECT relreplident FROM pg_class WHERE relname = 'student';

2.5 Commonly used pg command
-- You don't have to restart postgres cluster The configuration can take effect
pg_ctl reload
-- Create user
CREATE USER root WITH PASSWORD '123456';
-- Add, delete, modify and check permissions for users
GRANT INSERT, UPDATE, SELECT, DELETE ON ALL TABLES IN SCHEMA public TO postgres;
-- Give users permission to copy streams
ALTER ROLE postgres REPLICATION;
-- Give users access to the database
GRANT CONNECT ON DATABASE postgres TO postgres;
-- Put the current library public All table query permissions are assigned to users
GRANT SELECT ON ALL TABLES IN SCHEMA public TO postgres;
UPDATE pg_publication SET puballtables= false WHERE pubname IS NOT NULL;
-- View publishing settings
SELECT * FROM pg_publication;
-- Look at those table releases
SELECT * FROM pg_publication_tables;
-- Create and publish all tables
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- Add a table for publishing
ALTER PUBLICATION dbz_publication ADD TABLE student;
-- Changing the replication ID includes updating and deleting the previous values
ALTER TABLE student REPLICA IDENTITY FULL;
-- Look at the replication ID ( by f The logo description is set successfully )
SELECT relreplident FROM pg_class WHERE relname = 'student';
-- Delete slot
SELECT PG_DROP_REPLICATION_SLOT('my_slot');
-- Query the number of user connections
SELECT usename, COUNT(*) FROM pg_stat_activity GROUP BY usename ORDER BY COUNT(*) DESC;
-- Set the maximum number of user connections
ALTER ROLE postgres CONNECTION LIMIT 200;
3、Flink Streaming Postgres CDC
3.1 Introduce dependencies
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-postgres-cdc</artifactId>
<version>2.1.0</version>
</dependency>
3.2 Code
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class FlinkStream_CDC_PG {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SourceFunction<String> sourceFunction = PostgreSQLSource.<String>builder()
.hostname("bigdata")
.port(5432)
.database("postgres")
.schemaList("public")
.tableList("public.student")
.username("postgres")
.password("123456")
.slotName("sink_student_cdc1")
.decodingPluginName("pgoutput")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
DataStreamSource<String> streamSource = env.addSource(sourceFunction);
streamSource.print();
env.execute();
}
}

4、Flink SQL Postgres CDC
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class FlinkSql_CDC_PG {
public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);
tableEnvironment.getConfig().getConfiguration().setString("execution.checkpointing.interval", "3s");
String pg_sql = "CREATE TABLE sink_student_cdc1 (\n" +
" id INT,\n" +
" age INT,\n" +
" name STRING\n" +
") WITH (\n" +
" 'connector' = 'postgres-cdc',\n" +
" 'hostname' = '106.52.242.238',\n" +
" 'port' = '5432',\n" +
" 'username' = 'postgres',\n" +
" 'password' = '123456',\n" +
" 'database-name' = 'postgres',\n" +
" 'schema-name' = 'public',\n" +
" 'table-name' = 'student',\n" +
" 'decoding.plugin.name' = 'pgoutput',\n" +
" 'slot.name'='sink_student_cdc1',\n" +
" 'debezium.snapshot.mode' = 'exported'\n" +
")";
tableEnvironment.executeSql(pg_sql);
tableEnvironment.executeSql("select * from sink_student_cdc1").print();
}
}

5、 Problem summary
Q1:PSQLException: ERROR: replication slot “flink” is active for PID 974 error
slot.name Only one , One table, one
Q2: Use CDC 2.x edition , Only full data can be read , Unable to read increment (binlog) data
CDC 2.0 It supports the lock free algorithm , Supports concurrent reading , To ensure full data + The order of incremental data , rely on Flink Of checkpoint Mechanism , So the job needs to be configured checkpoint.SQL Configuration mode in the job :
Flink SQL> SET 'execution.checkpointing.interval' = '3s';
DataStream Job configuration :
env.enableCheckpointing(3000);
PGCDC: Change the replication ID of the table, including updates and deletions
Q3: Operation error Replication slot “xxxx” is active
- Go to Postgres Manually execute the following commands in
select pg_drop_replication_slot('rep_slot');
- pg source with Add... To the parameter
'debezium.slot.drop.on.stop' = 'true', Automatically clean up after the job stops slot
Q4: When to use flink-sql-connector-xxx.jar, When to use flink-connector-xxx.jar, What's the difference between the two ?
Flink CDC Each in the project connector Dependency management and Flink In the project connector bring into correspondence with .
flink-sql-connector-xx It's a fat bag , except connector Out of code , Also put connector All three party packages that depend on shade Then drive , Provide to SQL Homework uses , Users only need to be in lib Add the fat package under the directory .
flink-connector-xx Only the connector Code for , It does not contain the required dependencies , Provide datastream Homework uses , Users need to manage the required three-party package dependencies , Conflicting dependencies need to be done by yourself exclude, shade Handle .
Q5:decoding.plugin.name Unable to access file “decoderbufs”: There is no file or directory
according to Postgres Determine the plug-ins installed on the service . The list of supported plug-ins is as follows :
decoderbufs( The default value is )
wal2json
wal2json_rds
wal2json_streaming
wal2json_rds_streaming
pgoutput
Q6:SlotName
Slot names must conform to PostgreSQL replication slot naming rules, which state: “Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character.”
Q7:snapshot.mode
initial- Only when the logical server name does not record an offset , The connector will execute the snapshot .
always- Every time you start the connector , Connectors will execute snapshots .
never- The connector never takes snapshots . When the connector is configured in this way , Its startup behavior is as follows . If Kafka offset was previously stored in the topic LSN, The connector will continue to stream changes from this location . If there is no storage LSN, The connector will be created from the server PostgreSQL The point in time when the logical copy slot starts streaming changes . Only when you know that all the data you are interested in is still reflected in WAL In the middle of the day ,never Snapshot mode is useful .
initial_only- The connector executes the initial snapshot , Then stop , Without dealing with any subsequent changes .
exported- The connector performs snapshots according to the point in time when the replication slot was created . This is a great way to perform snapshots in a lock free manner .

边栏推荐
- Django使用MySQL数据库已经存在的数据表方法
- Brushless DC motor controller (how much is the hall charge for changing the motor)
- mysql分表之后怎么平滑上线?
- 新一代超安全蜂窝电池,思皓爱跑上市,13.99万起售
- 进程和线程知识点总结1
- SDRAM Controller Design (two design methods of digital controller)
- Classification prediction | MATLAB realizes time series classification prediction of TCN time convolution neural network
- APP接入Kakaotalk三方登录
- Learning summary of time complexity and space complexity
- 递归与分治
猜你喜欢

一文让你搞懂MYSQL底层原理。-内部结构、索引、锁、集群

【unity】将unity编辑c#配置为vscode

PLATO上线LAAS协议Elephant Swap,用户可借此获得溢价收益
![[web development] basic knowledge of flask framework](/img/79/5ece84552c82e98f5e15fac1ee3335.png)
[web development] basic knowledge of flask framework

递归与分治

Summary of process and thread knowledge points 1

图扑软件亮相 2022 福州数博会,携手共创数字新时代

Bracket matching test

ThinkPHP high imitation blue cloud disk system program

How to create a custom 404 error page in WordPress
随机推荐
【刷题笔记】链表内指定区间反转
Cookies and sessions
Main causes of IT hardware failures and best practices for prevention
一元函数积分学之1__不定积分
MySQL stored procedure realizes the creation of a table (copy the structure of the original table and create a new table)
Thread lock and its ascending and descending levels
ActiveMQ基本详解
This article enables you to understand the underlying principle of MySQL- Internal structure, index, lock, cluster
Plato launched the LAAS protocol elephant swap, which allows users to earn premium income
SystemVerilog join and copy operators
nep 2022 cat
Rewriting method set
C语言300行代码实现扫雷(可展开+可标记+可更改困难级别)
IT硬件故障的主要原因和预防的最佳实践
y80.第四章 Prometheus大厂监控体系及实战 -- kube-state-metrics组件介绍和监控扩展(十一)
量化交易之数字货币篇 - 生成foot print因子数据
【Jenkins笔记】入门,自由空间;持续集成企业微信;allure报告,持续集成电子邮件通知;构建定时任务
[MySQL] string to int
转:认知亚文化
[Jenkins' notes] introduction, free space; Continuous integration of enterprise wechat; Allure reports, continuous integration of email notifications; Build scheduled tasks