当前位置:网站首页>Flink parsing (VI): savepoints
Flink parsing (VI): savepoints
2022-07-06 17:11:00 【Stray_ Lambs】
Catalog
I should assign all operators in my homework ID Do you ?
If I add a new operator that needs state in my homework , What's going to happen ?
What happens if stateful operators are deleted from the job ?
If I reorder stateful operators in my homework , What's going to happen ?
If I add 、 Delete or reorder operators without status in the job , What's going to happen ?
What happens when I change the parallelism of the program during recovery ?
I can put savepoint Is the file moved to stable storage ?
Savepoints
because Flink Of savepoint And checkpoint There are also differences , As I wrote before Flink Recovery mechanism Mentioned . But not very detailed , So according to official documents , To be specific savepoint.
Savepoint It's based on Flink checkpoint The execution state of the flow job created by the mechanism Consistent mirroring , To put it bluntly, it's actually related to checkpoint equally , It is also a snapshot file , But this snapshot was triggered manually .checkpoint It is mainly used for fault recovery , And we can use savepoint Conduct Flink Job stop and restart 、fork Or update , For example, version update or active stop Job When you use .
Savepoint It's made up of two parts : Stable storage ( for example HDFS,S3 etc. ) Directory of binary files stored on ( Usually very large ), And metadata files ( Relatively small ). among , The file on the stable storage represents the data image of the job execution state ( Snapshot ), The metadata file is contained in the form of relative path, which is mainly intended to serve as Savepoint Pointer to a file on a portion of stable storage .
Actually savepoint and checkpoint The code implementation of is basically the same , And the generated format is also consistent , But why are there two names ? That's because there are some conceptual differences between them ( also RocksDB The incremental checkpoint The difference between snapshots , There may be others in the future ).
Conceptually speaking ,savepoint It's a bit like the backup of traditional databases , and checkpoint It's a bit like the difference between restoring logs .
Checkpoint The purpose is to provide a recovery mechanism for jobs that fail unexpectedly , from Flink Conduct management ( establish 、 management 、 Delete ), No need to interact with people , It is insensitive to users , commonly Job The end will be Flink Automatically delete checkpoint Unless there is a configuration reserved .
Savepoint More attention is paid to portability and job change support .savepoint It is usually created by users 、 management 、 Delete . These use cases are planned , Manual backup and recovery , Usually upgrading Flink edition , Adjust user logic , Use when changing the parallelism or red blue deployment . and Job After the end ,savepoint There will be .
Another difference is , from 1.11.0 Version start ,Flink You can move copies to savepoint Move the directory anywhere , And then recover , and checkpoint Arbitrary movement of files is not supported , because checkpoint Contains the absolute path of some files .
However, there are two situations that do not support mobile savepoint Directory movement
- If enabled entropy injection : In this case ,savepoint The directory does not contain all the data files , Because the injected paths will be scattered in each path . Because a public root directory is missing , therefore savepoint Will contain the absolute path , This makes it impossible to support savepoint Directory migration .
- The assignment contains task-owned state, such as
GenericWriteAhreadLog
sink.
If we use MemoryStateBackend
, May be There is no data file in the directory , That's because in this mode ,metadata and savepoint All the data will be saved in _metadata In file .
Be careful : It is not recommended to move or delete the last... Of a running job Savepoint , Because this may interfere with failure recovery . therefore ,Savepoint It has side effects on the accurate receiver , To ensure precise semantics , If in the last Savepoint There was no Checkpoint , So it's going to use Savepoint Resume .
Assignment operator ID
When writing project code , It is strongly recommended that uid(string) Manually specify the operator's ID, Later, according to these operators ID Restore the state of each operator , To ensure that your savepoint Consistent with your code . Here is a sample code .
DataStream<String> stream = env.
// Stateful source (e.g. Kafka) with ID
.addSource(new StatefulSource())
.uid("source-id") // ID for the source operator
.shuffle()
// Stateful mapper with ID
.map(new StatefulMapper())
.uid("mapper-id") // ID for the mapper
// Stateless printing sink
.print(); // Auto-generated ID
The reason is because , If you don't specify ID, It will be customized ID, If ID Unchanged is no problem , Can be done savepoint recovery . But if you are manually savepoint after , We add or delete code , Changed the structure of the code , that ID Will change . A simple example , If the original code logic is map->filter->keyby, Suppose these operators generate ID In turn, is 1,2,3, We savepoint after , Change code , Change it to map->keyby->sum, So automatically generated ID And it's going to be 1,2,3, But if we savepoint When it comes to recovery , That's not right. .‘
Flink Generate ID Depending on the structure of the program , And very sensitive to program changes , So it is strongly recommended that , in order to savepoint Restore consistency , Or manually assign operators ID.
savepoint operation
To configure savepoint
You can go through state.savepoints.dir
To configure savepoint Default directory for . Trigger savepoint when , This directory will be used to store savepoint. You can override the default by specifying a custom target directory using the trigger command ( see also :targetDirectory Parameters ). If no default value is configured and no custom target directory is specified , The trigger Savepoint Will fail .
Be careful : The destination directory must be JobManager(s) and TaskManager(s) Accessible locations , for example , Location on distributed file system .
Trigger savepoint
When triggered savepoint when , A new savepoint Catalog , Where data and metadata are stored .
# This will trigger ID by :jobId Of Savepoint, And return to the created Savepoint route . You need this path to restore and delete Savepoint .
$ bin/flink savepoint :jobId [:targetDirectory]
# This will trigger ID by :jobId and YARN Applications ID :yarnAppId Of Savepoint, And return to the created Savepoint The path of .
$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
recovery savepoint
$ bin/flink run -s :savepointPath [:runArgs]
This will commit the job and specify the to recover from Savepoint . You can give Savepoint Directory or _metadata
Path to file .
By default ,resume The operation will attempt to Savepoint All States of are mapped back to the program you want to restore . If the operator is deleted , You can use the --allowNonRestoredState
(short:-n
) Option skip the state that cannot be mapped to the new program :
$ bin/flink run -s :savepointPath -n [:runArgs]
Delete savepoint
$ bin/flink savepoint -d :savepointPath
This will delete the data stored in :savepointPath
Medium Savepoint.
Please note that , It can also be manually deleted through regular file system operations Savepoint , It doesn't affect the others Savepoint or Checkpoint( please remember , Every Savepoint Are self-contained ). stay Flink 1.2 Before , Use the above Savepoint Command execution is a more tedious task .
F.A.Q
I should assign all operators in my homework ID Do you ?
Based on experience , Yes . Strictly speaking , Only through uid
Method to assign ID That's enough .Savepoint Only the states of these stateful operators are included , Stateless operators are not Savepoint Part of .
In practice , It is recommended that all operators be assigned ID, because Flink Some built-in operators ( Such as Window operator ) It's also stateful , Whether the built-in operator has a state is not very obvious . If you are completely sure that the operator is stateless , Then you can skip uid
Method .
If I add a new operator that needs state in my homework , What's going to happen ?
When you add a new operator to the job , It will initialize without any state . Savepoint Contains the state of each stateful operator . Stateless operators are not at all Savepoint Part of . The behavior of the new operator is similar to that of the stateless operator .
What happens if stateful operators are deleted from the job ?
By default , from Savepoint Upon recovery, an attempt will be made to assign all States to the new job . If any state operator is deleted , You can't go from Savepoint recovery .
You can use run Command settings --allowNonRestoredState
( abbreviation :-n
) To allow the deletion of stateful operators :
$ bin/flink run -s :savepointPath -n [:runArgs]
If I reorder stateful operators in my homework , What's going to happen ?
If these operators are assigned ID, They will recover as usual .
If there is no allocation ID , Then there is a state operator automatically generated ID It is likely to change after reordering . This will cause you to fail from the previous Savepoint recovery .
If I add 、 Delete or reorder operators without status in the job , What's going to happen ?
If you will ID Assign to stateful operators , Then the stateless operator will not affect Savepoint recovery .
If there is no allocation ID , Then there is a state operator automatically generated ID It is likely to change after reordering . This will cause you to fail from the previous Savepoint recovery .
What happens when I change the parallelism of the program during recovery ?
If Savepoint Yes, it is Flink >= 1.2.0 The trigger , And no use like Checkpointed
Such a status that is not recommended API, Then you can simply start from Savepoint Restore the program and specify a new parallelism .
If you are from Flink < 1.2.0 The trigger Savepoint recovery , Or use what is now obsolete api, Then you must first compare your homework with Savepoint Migrate to Flink >= 1.2.0, Then you can change the parallelism . See Upgrade assignments and Flink Version guide .
I can put savepoint Is the file moved to stable storage ?
The quick answer to this question is “ yes ”, from Flink 1.11.0 Version start ,savepoint It's self-contained , You can migrate on demand savepoint Recover the file .
Reference resources
边栏推荐
- 面试集锦库
- TypeScript基本操作
- On the clever use of stream and map
- 控制转移指令
- @RestController、@Controller
- Only learning C can live up to expectations Top1 environment configuration
- Eureka high availability
- Ce n'est qu'en apprenant que c est à la hauteur des attentes Top5 s1e8 | s1e9: caractères et chaînes & opérateurs arithmétiques
- [graduation project] QT from introduction to practice: realize imitation of QQ communication, which is also the last blog post in school.
- yum install xxx报错
猜你喜欢
学习投资大师的智慧
~87 animation
Programmer orientation problem solving methodology
Some instructions on whether to call destructor when QT window closes and application stops
Set up the flutter environment pit collection
Many papers on ByteDance have been selected into CVPR 2021, and the selected dry goods are here
JVM运行时数据区之程序计数器
J'ai traversé le chemin le plus fou, le circuit cérébral d'un programmeur de saut d'octets
Fdog series (III): use Tencent cloud SMS interface to send SMS, write database, deploy to server, web finale.
字节跳动多篇论文入选 CVPR 2021,精选干货都在这里了
随机推荐
Von Neumann architecture
J'ai traversé le chemin le plus fou, le circuit cérébral d'un programmeur de saut d'octets
On the clever use of stream and map
À propos de l'utilisation intelligente du flux et de la carte
字节跳动春招攻略:学长学姐笔经面经,还有出题人「锦囊」
Programmer orientation problem solving methodology
Ce n'est qu'en apprenant que c est à la hauteur des attentes Top5 s1e8 | s1e9: caractères et chaînes & opérateurs arithmétiques
Description of project structure configuration of idea
手把手带你做强化学习实验--敲级详细
吴军三部曲见识(四) 大家智慧
原型链继承
Data config problem: the reference to entity 'useunicode' must end with ';' delimiter.
暑假刷题嗷嗷嗷嗷
yum install xxx报错
Full record of ByteDance technology newcomer training: a guide to the new growth of school recruitment
Alibaba cloud server builds SVN version Library
Introduction to microservices
JS garbage collection mechanism and memory leakage
Idea breakpoint debugging skills, multiple dynamic diagram package teaching package meeting.
DOS 功能调用