当前位置:网站首页>Flink 1.15 implements SQL script to recover data from savepointh
Flink 1.15 implements SQL script to recover data from savepointh
2022-07-27 00:55:00 【One day long -- xuanbin】
flinksql Interface creation table , And write the data to , Reference resources :Flink 1.15 Local cluster deployment Standalone Pattern ( Independent cluster mode )_ One day long -- Hyun Bin's blog -CSDN Blog


After creating the table, execute the following command to automatically create flink job:
INSERT INTO `all_users_sink`(id,user_name,address,phone_number,email,ct_date)
SELECT id,user_name,address,phone_number,email,ct_date FROM user_source;
see job Task list :
./flink-1.15.0/bin/flink list

preservation savepoint
./bin/flink savepoint :jobId [:targetDirectory] -- jobid and savepoint Keep the file directory
./bin/flink savepoint c8fa5a96073d064b0a717590a7029c0d file:///usr/local/flink-1.15.0/flink-savepoints
from savepoint/checkPoint recovery
RESET execution.savepoint.path; -- Reset point Save the path
SET execution.savepoint.path = 'file:///usr/local/flink-1.15.0/flink-savepoints/savepoint-c8fa5a-1161f2626294'; -- Set recovery path

Execute the following command to recover ( Must and create job At the time of the sql Command consistent )
INSERT INTO `all_users_sink`(id,user_name,address,phone_number,email,ct_date)
SELECT id,user_name,address,phone_number,email,ct_date FROM user_source;
If there are any errors reported below ( You can use the following command to ignore the error ):

set 'execution.savepoint.ignore-unclaimed-state' = 'true'; -- Allow skipping savepoint states that cannot be restored
After success, the interface displays :

Cancel the task :
./flink-1.15.0/bin/flink cancel $JOB_ID
./flink-1.15.0/bin/flink cancel c8fa5a96073d064b0a717590a7029c0d
Delete savepoint :
./bin/flink savepoint --dispose Hold point directory $JOB_ID
./bin/flink savepoint --dispose /usr/local/flink-1.15.0/flink-savepoints/savepoint-c8fa5a-1161f2626294 c8fa5a96073d064b0a717590a7029c0d
Stop the job gracefully and create a savepoint :
./bin/flink stop --savepointPath /tmp/flink-savepoints $JOB_ID
./bin/flink stop --savepointPath /usr/local/flink-1.15.0/flink-savepoints 74e0eddfa18eb39c403617f1d573cfdd
flink sql Refer to official documents for commands :SQL Client | Apache Flink
flink adopt jar The job created by the package is restored , Start the job from the savepoint :
Refer to official documentation :Checkpoints | Apache Flink
./bin/flink run --detached --fromSavepoint /usr/local/flink-1.15.0/flink-savepoints/savepoint-74e0ed-e3e4e431bd5d ./examples/streaming/StateMachineExample.jar
边栏推荐
猜你喜欢

DOM day_ 04 (7.12) BOM, open new page (delayed opening), address bar operation, browser information reading, historical operation

Two or three things about redis

flink1.11 sql本地运行demo & 本地webUI可视解决
![[CTF 真题] 2018-网鼎杯-Web-Unfinish](/img/d8/a367c26b51d9dbaf53bf4fe2a13917.png)
[CTF 真题] 2018-网鼎杯-Web-Unfinish
![[leetcode] no duplicate longest string](/img/97/bf8c9b019136ab372ce2c43cddbb2c.jpg)
[leetcode] no duplicate longest string

5_ Linear regression

Flink的容错机制(checkpoint)

MySQL8.0中的隐藏索引和降序索引(新特性)
![[HarekazeCTF2019]encode_and_encode](/img/f5/c06523a1764717bdf2d91f069c9d77.png)
[HarekazeCTF2019]encode_and_encode

14 web vulnerability: types of SQL injection and submission injection
随机推荐
MySQL Part 2
Medical data of more than 4000 people has been exposed for 16 years
基于Flink实时项目:用户行为分析(三:网站总浏览量统计(PV))
Designer mode
Canal introduction
C # conversion of basic data types for entry
Flink Interval Join源码理解
Ansible MySQL installation case record
[BJDCTF2020]EzPHP
DOM day_04(7.12)BOM、打开新页面(延迟打开)、地址栏操作、浏览器信息读取、历史操作
MYSQL分表DDL操作(存储过程)
通过FlinkCDC将MySQL中变更的数据写入到kafka(DataStream方式)
Vector size performance problems
Redisson 工作原理-源码分析
箭头函数详解 2021-04-30
基于Flink实时项目:用户行为分析(二:实时流量统计)
SparkSql之编程方式
Checked status in El checkbox 2021-08-02
重学JSON.stringify
当事务遇上分布式锁