当前位置:网站首页>Flink parsing (VI): savepoints
Flink parsing (VI): savepoints
2022-07-06 17:11:00 【Stray_ Lambs】
Catalog
I should assign all operators in my homework ID Do you ?
If I add a new operator that needs state in my homework , What's going to happen ?
What happens if stateful operators are deleted from the job ?
If I reorder stateful operators in my homework , What's going to happen ?
If I add 、 Delete or reorder operators without status in the job , What's going to happen ?
What happens when I change the parallelism of the program during recovery ?
I can put savepoint Is the file moved to stable storage ?
Savepoints
because Flink Of savepoint And checkpoint There are also differences , As I wrote before Flink Recovery mechanism Mentioned . But not very detailed , So according to official documents , To be specific savepoint.
Savepoint It's based on Flink checkpoint The execution state of the flow job created by the mechanism Consistent mirroring , To put it bluntly, it's actually related to checkpoint equally , It is also a snapshot file , But this snapshot was triggered manually .checkpoint It is mainly used for fault recovery , And we can use savepoint Conduct Flink Job stop and restart 、fork Or update , For example, version update or active stop Job When you use .
Savepoint It's made up of two parts : Stable storage ( for example HDFS,S3 etc. ) Directory of binary files stored on ( Usually very large ), And metadata files ( Relatively small ). among , The file on the stable storage represents the data image of the job execution state ( Snapshot ), The metadata file is contained in the form of relative path, which is mainly intended to serve as Savepoint Pointer to a file on a portion of stable storage .
Actually savepoint and checkpoint The code implementation of is basically the same , And the generated format is also consistent , But why are there two names ? That's because there are some conceptual differences between them ( also RocksDB The incremental checkpoint The difference between snapshots , There may be others in the future ).
Conceptually speaking ,savepoint It's a bit like the backup of traditional databases , and checkpoint It's a bit like the difference between restoring logs .
Checkpoint The purpose is to provide a recovery mechanism for jobs that fail unexpectedly , from Flink Conduct management ( establish 、 management 、 Delete ), No need to interact with people , It is insensitive to users , commonly Job The end will be Flink Automatically delete checkpoint Unless there is a configuration reserved .
Savepoint More attention is paid to portability and job change support .savepoint It is usually created by users 、 management 、 Delete . These use cases are planned , Manual backup and recovery , Usually upgrading Flink edition , Adjust user logic , Use when changing the parallelism or red blue deployment . and Job After the end ,savepoint There will be .
Another difference is , from 1.11.0 Version start ,Flink You can move copies to savepoint Move the directory anywhere , And then recover , and checkpoint Arbitrary movement of files is not supported , because checkpoint Contains the absolute path of some files .
However, there are two situations that do not support mobile savepoint Directory movement
- If enabled entropy injection : In this case ,savepoint The directory does not contain all the data files , Because the injected paths will be scattered in each path . Because a public root directory is missing , therefore savepoint Will contain the absolute path , This makes it impossible to support savepoint Directory migration .
- The assignment contains task-owned state, such as
GenericWriteAhreadLog
sink.
If we use MemoryStateBackend
, May be There is no data file in the directory , That's because in this mode ,metadata and savepoint All the data will be saved in _metadata In file .
Be careful : It is not recommended to move or delete the last... Of a running job Savepoint , Because this may interfere with failure recovery . therefore ,Savepoint It has side effects on the accurate receiver , To ensure precise semantics , If in the last Savepoint There was no Checkpoint , So it's going to use Savepoint Resume .
Assignment operator ID
When writing project code , It is strongly recommended that uid(string) Manually specify the operator's ID, Later, according to these operators ID Restore the state of each operator , To ensure that your savepoint Consistent with your code . Here is a sample code .
DataStream<String> stream = env.
// Stateful source (e.g. Kafka) with ID
.addSource(new StatefulSource())
.uid("source-id") // ID for the source operator
.shuffle()
// Stateful mapper with ID
.map(new StatefulMapper())
.uid("mapper-id") // ID for the mapper
// Stateless printing sink
.print(); // Auto-generated ID
The reason is because , If you don't specify ID, It will be customized ID, If ID Unchanged is no problem , Can be done savepoint recovery . But if you are manually savepoint after , We add or delete code , Changed the structure of the code , that ID Will change . A simple example , If the original code logic is map->filter->keyby, Suppose these operators generate ID In turn, is 1,2,3, We savepoint after , Change code , Change it to map->keyby->sum, So automatically generated ID And it's going to be 1,2,3, But if we savepoint When it comes to recovery , That's not right. .‘
Flink Generate ID Depending on the structure of the program , And very sensitive to program changes , So it is strongly recommended that , in order to savepoint Restore consistency , Or manually assign operators ID.
savepoint operation
To configure savepoint
You can go through state.savepoints.dir
To configure savepoint Default directory for . Trigger savepoint when , This directory will be used to store savepoint. You can override the default by specifying a custom target directory using the trigger command ( see also :targetDirectory Parameters ). If no default value is configured and no custom target directory is specified , The trigger Savepoint Will fail .
Be careful : The destination directory must be JobManager(s) and TaskManager(s) Accessible locations , for example , Location on distributed file system .
Trigger savepoint
When triggered savepoint when , A new savepoint Catalog , Where data and metadata are stored .
# This will trigger ID by :jobId Of Savepoint, And return to the created Savepoint route . You need this path to restore and delete Savepoint .
$ bin/flink savepoint :jobId [:targetDirectory]
# This will trigger ID by :jobId and YARN Applications ID :yarnAppId Of Savepoint, And return to the created Savepoint The path of .
$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
recovery savepoint
$ bin/flink run -s :savepointPath [:runArgs]
This will commit the job and specify the to recover from Savepoint . You can give Savepoint Directory or _metadata
Path to file .
By default ,resume The operation will attempt to Savepoint All States of are mapped back to the program you want to restore . If the operator is deleted , You can use the --allowNonRestoredState
(short:-n
) Option skip the state that cannot be mapped to the new program :
$ bin/flink run -s :savepointPath -n [:runArgs]
Delete savepoint
$ bin/flink savepoint -d :savepointPath
This will delete the data stored in :savepointPath
Medium Savepoint.
Please note that , It can also be manually deleted through regular file system operations Savepoint , It doesn't affect the others Savepoint or Checkpoint( please remember , Every Savepoint Are self-contained ). stay Flink 1.2 Before , Use the above Savepoint Command execution is a more tedious task .
F.A.Q
I should assign all operators in my homework ID Do you ?
Based on experience , Yes . Strictly speaking , Only through uid
Method to assign ID That's enough .Savepoint Only the states of these stateful operators are included , Stateless operators are not Savepoint Part of .
In practice , It is recommended that all operators be assigned ID, because Flink Some built-in operators ( Such as Window operator ) It's also stateful , Whether the built-in operator has a state is not very obvious . If you are completely sure that the operator is stateless , Then you can skip uid
Method .
If I add a new operator that needs state in my homework , What's going to happen ?
When you add a new operator to the job , It will initialize without any state . Savepoint Contains the state of each stateful operator . Stateless operators are not at all Savepoint Part of . The behavior of the new operator is similar to that of the stateless operator .
What happens if stateful operators are deleted from the job ?
By default , from Savepoint Upon recovery, an attempt will be made to assign all States to the new job . If any state operator is deleted , You can't go from Savepoint recovery .
You can use run Command settings --allowNonRestoredState
( abbreviation :-n
) To allow the deletion of stateful operators :
$ bin/flink run -s :savepointPath -n [:runArgs]
If I reorder stateful operators in my homework , What's going to happen ?
If these operators are assigned ID, They will recover as usual .
If there is no allocation ID , Then there is a state operator automatically generated ID It is likely to change after reordering . This will cause you to fail from the previous Savepoint recovery .
If I add 、 Delete or reorder operators without status in the job , What's going to happen ?
If you will ID Assign to stateful operators , Then the stateless operator will not affect Savepoint recovery .
If there is no allocation ID , Then there is a state operator automatically generated ID It is likely to change after reordering . This will cause you to fail from the previous Savepoint recovery .
What happens when I change the parallelism of the program during recovery ?
If Savepoint Yes, it is Flink >= 1.2.0 The trigger , And no use like Checkpointed
Such a status that is not recommended API, Then you can simply start from Savepoint Restore the program and specify a new parallelism .
If you are from Flink < 1.2.0 The trigger Savepoint recovery , Or use what is now obsolete api, Then you must first compare your homework with Savepoint Migrate to Flink >= 1.2.0, Then you can change the parallelism . See Upgrade assignments and Flink Version guide .
I can put savepoint Is the file moved to stable storage ?
The quick answer to this question is “ yes ”, from Flink 1.11.0 Version start ,savepoint It's self-contained , You can migrate on demand savepoint Recover the file .
Reference resources
边栏推荐
- DOS 功能调用
- Shell_ 07_ Functions and regular expressions
- 1. JVM入门介绍
- Shell_ 00_ First meeting shell
- Interview collection library
- 亮相Google I/O,字节跳动是这样应用Flutter的
- Instructions for Redux
- MySQL string function
- 關於Stream和Map的巧用
- Go language uses the thrift protocol to realize the client and service end reports not enough arguments in call to oprot Writemessagebegin error resolution
猜你喜欢
随机推荐
Instructions for Redux
Programmer orientation problem solving methodology
~85 transition
@RequestMapping、@GetMapping
TCP's three handshakes and four waves
Solr standalone installation
redux使用说明
was unable to send heartbeat
姚班智班齐上阵,竞赛高手聚一堂,这是什么神仙编程大赛?
Activiti directory (I) highlights
Solr new core
字节跳动2022校招研发提前批宣讲会,同学们最关心的10个问题
MySQL date function
Only learning C can live up to expectations Top1 environment configuration
数据传送指令
ByteDance 2022 school recruitment R & D advance approval publicity meeting, students' top 10 issues
8086 segmentation technology
8086 CPU internal structure
100张图训练1小时,照片风格随意变,文末有Demo试玩|SIGGRAPH 2021
Alibaba cloud server builds SVN version Library