当前位置:网站首页>Flink Postgres CDC
Flink Postgres CDC
2022-07-29 01:21:00 【hyunbar】


Big data technology AI
Flink/Spark/Hadoop/ Several positions , Data analysis 、 interview , Dry goods learning materials such as source code interpretation
128 Original content
official account
1、Flink JDBC Write Postgres
Add dependency
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
Code
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class Flink_JDBC_PG {
public static void main(String[] args) {
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);
String source = "CREATE TABLE student (\n" +
" id INT,\n" +
" age INT,\n" +
" name STRING,\n" +
" create_time TIMESTAMP(3),\n" +
" WATERMARK FOR create_time AS create_time\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='20',\n" +
" 'fields.id.kind'='random' ,\n" +
" 'fields.id.min'='1',\n" +
" 'fields.id.max'='100',\n" +
" 'fields.age.kind'='random',\n" +
" 'fields.age.min'='1',\n" +
" 'fields.age.max'='100',\n" +
" 'fields.name.kind'='random',\n" +
" 'fields.name.length'='3'\n" +
")";
tableEnvironment.executeSql(source);
String sink_sql = "CREATE TABLE sink_student (\n" +
" id INT,\n" +
" age INT,\n" +
" name STRING,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:postgresql://bigdata:5432/postgres',\n" +
" 'table-name' = 'student',\n" +
" 'username'='postgres',\n" +
" 'password'='123456'\n" +
")";
tableEnvironment.executeSql(sink_sql);
String result = "insert into sink_student select id,age,name from student";
tableEnvironment.executeSql(result);
tableEnvironment.executeSql("select * from student").print();
}
}
pg Tables should be created in advance , Have a primary key
2、Postgres To configure binlog
2.1 Changing configuration files postgresql.conf
# change wal The log mode is logical
wal_level = logical # minimal, replica, or logical
# change solts The largest number ( The default value is 10),flink-cdc By default, one table occupies one slots
max_replication_slots = 20 # max number of replication slots
# change wal Send maximum number of processes ( The default value is 10), This value is the same as the one above solts Same settings
max_wal_senders = 20 # max number of walsender processes
# Break replication connections that are inactive for more than a specified number of milliseconds , It can be set a little larger ( Default 60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable

Reload the configuration or restart postgres
[[email protected] ~]# docker exec -it 7b2d8a96ef4c /bin/bash
[email protected]:/# su postgres
[email protected]:/$ pg_ctl reload
server signaled
View parameters
select *
from pg_settings
where name in ('wal_level',
'max_replication_slots',
'max_wal_senders',
'wal_sender_timeout');

2.2 Give users permission to copy streams
-- Give users permission to copy streams
ALTER ROLE postgres REPLICATION;
-- Give users access to the database
GRANT CONNECT ON DATABASE postgres TO postgres;
-- Put the current library public All table query permissions are assigned to users
GRANT SELECT ON ALL TABLES IN SCHEMA public TO postgres;
2.3 Publishing tables
-- Update and publish all table IDS ( Optional )
UPDATE pg_publication SET puballtables= false WHERE pubname IS NOT NULL;
-- View publishing settings
SELECT * FROM pg_publication;
-- Look at those table releases
SELECT * FROM pg_publication_tables;
-- Create and publish all tables
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- Add a table for publishing
ALTER PUBLICATION dbz_publication ADD TABLE student;

2.4 Changing the replication identity of a table contains updated and deleted values
-- Changing the replication ID includes updating and deleting the previous values
ALTER TABLE student REPLICA IDENTITY FULL;
-- Look at the replication ID ( by f The logo description is set successfully )
SELECT relreplident FROM pg_class WHERE relname = 'student';

2.5 Commonly used pg command
-- You don't have to restart postgres cluster The configuration can take effect
pg_ctl reload
-- Create user
CREATE USER root WITH PASSWORD '123456';
-- Add, delete, modify and check permissions for users
GRANT INSERT, UPDATE, SELECT, DELETE ON ALL TABLES IN SCHEMA public TO postgres;
-- Give users permission to copy streams
ALTER ROLE postgres REPLICATION;
-- Give users access to the database
GRANT CONNECT ON DATABASE postgres TO postgres;
-- Put the current library public All table query permissions are assigned to users
GRANT SELECT ON ALL TABLES IN SCHEMA public TO postgres;
UPDATE pg_publication SET puballtables= false WHERE pubname IS NOT NULL;
-- View publishing settings
SELECT * FROM pg_publication;
-- Look at those table releases
SELECT * FROM pg_publication_tables;
-- Create and publish all tables
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- Add a table for publishing
ALTER PUBLICATION dbz_publication ADD TABLE student;
-- Changing the replication ID includes updating and deleting the previous values
ALTER TABLE student REPLICA IDENTITY FULL;
-- Look at the replication ID ( by f The logo description is set successfully )
SELECT relreplident FROM pg_class WHERE relname = 'student';
-- Delete slot
SELECT PG_DROP_REPLICATION_SLOT('my_slot');
-- Query the number of user connections
SELECT usename, COUNT(*) FROM pg_stat_activity GROUP BY usename ORDER BY COUNT(*) DESC;
-- Set the maximum number of user connections
ALTER ROLE postgres CONNECTION LIMIT 200;
3、Flink Streaming Postgres CDC
3.1 Introduce dependencies
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-postgres-cdc</artifactId>
<version>2.1.0</version>
</dependency>
3.2 Code
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class FlinkStream_CDC_PG {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SourceFunction<String> sourceFunction = PostgreSQLSource.<String>builder()
.hostname("bigdata")
.port(5432)
.database("postgres")
.schemaList("public")
.tableList("public.student")
.username("postgres")
.password("123456")
.slotName("sink_student_cdc1")
.decodingPluginName("pgoutput")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
DataStreamSource<String> streamSource = env.addSource(sourceFunction);
streamSource.print();
env.execute();
}
}

4、Flink SQL Postgres CDC
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class FlinkSql_CDC_PG {
public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);
tableEnvironment.getConfig().getConfiguration().setString("execution.checkpointing.interval", "3s");
String pg_sql = "CREATE TABLE sink_student_cdc1 (\n" +
" id INT,\n" +
" age INT,\n" +
" name STRING\n" +
") WITH (\n" +
" 'connector' = 'postgres-cdc',\n" +
" 'hostname' = '106.52.242.238',\n" +
" 'port' = '5432',\n" +
" 'username' = 'postgres',\n" +
" 'password' = '123456',\n" +
" 'database-name' = 'postgres',\n" +
" 'schema-name' = 'public',\n" +
" 'table-name' = 'student',\n" +
" 'decoding.plugin.name' = 'pgoutput',\n" +
" 'slot.name'='sink_student_cdc1',\n" +
" 'debezium.snapshot.mode' = 'exported'\n" +
")";
tableEnvironment.executeSql(pg_sql);
tableEnvironment.executeSql("select * from sink_student_cdc1").print();
}
}

5、 Problem summary
Q1:PSQLException: ERROR: replication slot “flink” is active for PID 974 error
slot.name Only one , One table, one
Q2: Use CDC 2.x edition , Only full data can be read , Unable to read increment (binlog) data
CDC 2.0 It supports the lock free algorithm , Supports concurrent reading , To ensure full data + The order of incremental data , rely on Flink Of checkpoint Mechanism , So the job needs to be configured checkpoint.SQL Configuration mode in the job :
Flink SQL> SET 'execution.checkpointing.interval' = '3s';
DataStream Job configuration :
env.enableCheckpointing(3000);
PGCDC: Change the replication ID of the table, including updates and deletions
Q3: Operation error Replication slot “xxxx” is active
- Go to Postgres Manually execute the following commands in
select pg_drop_replication_slot('rep_slot');
- pg source with Add... To the parameter
'debezium.slot.drop.on.stop' = 'true', Automatically clean up after the job stops slot
Q4: When to use flink-sql-connector-xxx.jar, When to use flink-connector-xxx.jar, What's the difference between the two ?
Flink CDC Each in the project connector Dependency management and Flink In the project connector bring into correspondence with .
flink-sql-connector-xx It's a fat bag , except connector Out of code , Also put connector All three party packages that depend on shade Then drive , Provide to SQL Homework uses , Users only need to be in lib Add the fat package under the directory .
flink-connector-xx Only the connector Code for , It does not contain the required dependencies , Provide datastream Homework uses , Users need to manage the required three-party package dependencies , Conflicting dependencies need to be done by yourself exclude, shade Handle .
Q5:decoding.plugin.name Unable to access file “decoderbufs”: There is no file or directory
according to Postgres Determine the plug-ins installed on the service . The list of supported plug-ins is as follows :
decoderbufs( The default value is )
wal2json
wal2json_rds
wal2json_streaming
wal2json_rds_streaming
pgoutput
Q6:SlotName
Slot names must conform to PostgreSQL replication slot naming rules, which state: “Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character.”
Q7:snapshot.mode
initial- Only when the logical server name does not record an offset , The connector will execute the snapshot .
always- Every time you start the connector , Connectors will execute snapshots .
never- The connector never takes snapshots . When the connector is configured in this way , Its startup behavior is as follows . If Kafka offset was previously stored in the topic LSN, The connector will continue to stream changes from this location . If there is no storage LSN, The connector will be created from the server PostgreSQL The point in time when the logical copy slot starts streaming changes . Only when you know that all the data you are interested in is still reflected in WAL In the middle of the day ,never Snapshot mode is useful .
initial_only- The connector executes the initial snapshot , Then stop , Without dealing with any subsequent changes .
exported- The connector performs snapshots according to the point in time when the replication slot was created . This is a great way to perform snapshots in a lock free manner .

边栏推荐
- 20220728 sorting strings that are not pure numbers
- Self-attention neural architecture search for semantic image segmentation
- Return the member function of *this
- (perfect solution) why is the effect of using train mode on the train/val/test dataset good, but it is all very poor in Eval mode
- MySQL time is formatted by hour_ MySQL time format, MySQL statement queried by time period [easy to understand]
- A new generation of ultra safe cellular battery, Sihao aipao, is on the market, starting from 139900
- [Commons lang3 topic] 004- numberutils topic
- APP接入Kakaotalk三方登录
- Charles -- teach you how to use the packet capturing tool from 0-1
- Intel带你初识视觉识别--OpenVINO
猜你喜欢

Charles -- teach you how to use the packet capturing tool from 0-1

Flink SQL Hudi 实战

Spark 3.0 中七个必须知道的 SQL 性能优化

Flask project architecture (First Edition

Cookies and sessions

How to carry out engineering implementation of DDD Domain Driven Design

北京护照西班牙语翻译推荐

Hilbert 变换与瞬时频率

Self made | a 16 bit RISC architecture CPU is self-made by hand
![[web development] basic knowledge of flask framework](/img/79/5ece84552c82e98f5e15fac1ee3335.png)
[web development] basic knowledge of flask framework
随机推荐
Wechat campus bathroom reservation applet graduation design finished product (8) graduation design thesis template
Transfer: cognitive subculture
【mysql】多指标历史累计去重问题
Summary of process and thread knowledge points 2
ACM SIGIR 2022 | 美团技术团队精选论文解读
How to smoothly go online after MySQL table splitting?
递归与分治
Numpy 常见函数及使用
写作作业一
[Commons lang3 topic] 001 stringutils topic
一文让你搞懂MYSQL底层原理。-内部结构、索引、锁、集群
Main causes of IT hardware failures and best practices for prevention
Flink Postgres CDC
Expression evaluation
Letax record \documentclass{}, authoryear attribute is used
18 diagrams, intuitive understanding of neural networks, manifolds and topologies
App access kakaotalk three party login
Day2: 130 questions in three languages
Flink SQL Hudi 实战
Summary of process and thread knowledge points 1