当前位置:网站首页>Flink parsing (VI): savepoints
Flink parsing (VI): savepoints
2022-07-06 17:11:00 【Stray_ Lambs】
Catalog
I should assign all operators in my homework ID Do you ?
If I add a new operator that needs state in my homework , What's going to happen ?
What happens if stateful operators are deleted from the job ?
If I reorder stateful operators in my homework , What's going to happen ?
If I add 、 Delete or reorder operators without status in the job , What's going to happen ?
What happens when I change the parallelism of the program during recovery ?
I can put savepoint Is the file moved to stable storage ?
Savepoints
because Flink Of savepoint And checkpoint There are also differences , As I wrote before Flink Recovery mechanism Mentioned . But not very detailed , So according to official documents , To be specific savepoint.
Savepoint It's based on Flink checkpoint The execution state of the flow job created by the mechanism Consistent mirroring , To put it bluntly, it's actually related to checkpoint equally , It is also a snapshot file , But this snapshot was triggered manually .checkpoint It is mainly used for fault recovery , And we can use savepoint Conduct Flink Job stop and restart 、fork Or update , For example, version update or active stop Job When you use .
Savepoint It's made up of two parts : Stable storage ( for example HDFS,S3 etc. ) Directory of binary files stored on ( Usually very large ), And metadata files ( Relatively small ). among , The file on the stable storage represents the data image of the job execution state ( Snapshot ), The metadata file is contained in the form of relative path, which is mainly intended to serve as Savepoint Pointer to a file on a portion of stable storage .
Actually savepoint and checkpoint The code implementation of is basically the same , And the generated format is also consistent , But why are there two names ? That's because there are some conceptual differences between them ( also RocksDB The incremental checkpoint The difference between snapshots , There may be others in the future ).
Conceptually speaking ,savepoint It's a bit like the backup of traditional databases , and checkpoint It's a bit like the difference between restoring logs .
Checkpoint The purpose is to provide a recovery mechanism for jobs that fail unexpectedly , from Flink Conduct management ( establish 、 management 、 Delete ), No need to interact with people , It is insensitive to users , commonly Job The end will be Flink Automatically delete checkpoint Unless there is a configuration reserved .
Savepoint More attention is paid to portability and job change support .savepoint It is usually created by users 、 management 、 Delete . These use cases are planned , Manual backup and recovery , Usually upgrading Flink edition , Adjust user logic , Use when changing the parallelism or red blue deployment . and Job After the end ,savepoint There will be .
Another difference is , from 1.11.0 Version start ,Flink You can move copies to savepoint Move the directory anywhere , And then recover , and checkpoint Arbitrary movement of files is not supported , because checkpoint Contains the absolute path of some files .
However, there are two situations that do not support mobile savepoint Directory movement
- If enabled entropy injection : In this case ,savepoint The directory does not contain all the data files , Because the injected paths will be scattered in each path . Because a public root directory is missing , therefore savepoint Will contain the absolute path , This makes it impossible to support savepoint Directory migration .
- The assignment contains task-owned state, such as
GenericWriteAhreadLogsink.
If we use MemoryStateBackend , May be There is no data file in the directory , That's because in this mode ,metadata and savepoint All the data will be saved in _metadata In file .
Be careful : It is not recommended to move or delete the last... Of a running job Savepoint , Because this may interfere with failure recovery . therefore ,Savepoint It has side effects on the accurate receiver , To ensure precise semantics , If in the last Savepoint There was no Checkpoint , So it's going to use Savepoint Resume .
Assignment operator ID
When writing project code , It is strongly recommended that uid(string) Manually specify the operator's ID, Later, according to these operators ID Restore the state of each operator , To ensure that your savepoint Consistent with your code . Here is a sample code .
DataStream<String> stream = env.
// Stateful source (e.g. Kafka) with ID
.addSource(new StatefulSource())
.uid("source-id") // ID for the source operator
.shuffle()
// Stateful mapper with ID
.map(new StatefulMapper())
.uid("mapper-id") // ID for the mapper
// Stateless printing sink
.print(); // Auto-generated IDThe reason is because , If you don't specify ID, It will be customized ID, If ID Unchanged is no problem , Can be done savepoint recovery . But if you are manually savepoint after , We add or delete code , Changed the structure of the code , that ID Will change . A simple example , If the original code logic is map->filter->keyby, Suppose these operators generate ID In turn, is 1,2,3, We savepoint after , Change code , Change it to map->keyby->sum, So automatically generated ID And it's going to be 1,2,3, But if we savepoint When it comes to recovery , That's not right. .‘
Flink Generate ID Depending on the structure of the program , And very sensitive to program changes , So it is strongly recommended that , in order to savepoint Restore consistency , Or manually assign operators ID.
savepoint operation
To configure savepoint
You can go through state.savepoints.dir To configure savepoint Default directory for . Trigger savepoint when , This directory will be used to store savepoint. You can override the default by specifying a custom target directory using the trigger command ( see also :targetDirectory Parameters ). If no default value is configured and no custom target directory is specified , The trigger Savepoint Will fail .
Be careful : The destination directory must be JobManager(s) and TaskManager(s) Accessible locations , for example , Location on distributed file system .
Trigger savepoint
When triggered savepoint when , A new savepoint Catalog , Where data and metadata are stored .
# This will trigger ID by :jobId Of Savepoint, And return to the created Savepoint route . You need this path to restore and delete Savepoint .
$ bin/flink savepoint :jobId [:targetDirectory]
# This will trigger ID by :jobId and YARN Applications ID :yarnAppId Of Savepoint, And return to the created Savepoint The path of .
$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppIdrecovery savepoint
$ bin/flink run -s :savepointPath [:runArgs] This will commit the job and specify the to recover from Savepoint . You can give Savepoint Directory or _metadata Path to file .
By default ,resume The operation will attempt to Savepoint All States of are mapped back to the program you want to restore . If the operator is deleted , You can use the --allowNonRestoredState(short:-n) Option skip the state that cannot be mapped to the new program :
$ bin/flink run -s :savepointPath -n [:runArgs]Delete savepoint
$ bin/flink savepoint -d :savepointPath This will delete the data stored in :savepointPath Medium Savepoint.
Please note that , It can also be manually deleted through regular file system operations Savepoint , It doesn't affect the others Savepoint or Checkpoint( please remember , Every Savepoint Are self-contained ). stay Flink 1.2 Before , Use the above Savepoint Command execution is a more tedious task .
F.A.Q
I should assign all operators in my homework ID Do you ?
Based on experience , Yes . Strictly speaking , Only through uid Method to assign ID That's enough .Savepoint Only the states of these stateful operators are included , Stateless operators are not Savepoint Part of .
In practice , It is recommended that all operators be assigned ID, because Flink Some built-in operators ( Such as Window operator ) It's also stateful , Whether the built-in operator has a state is not very obvious . If you are completely sure that the operator is stateless , Then you can skip uid Method .
If I add a new operator that needs state in my homework , What's going to happen ?
When you add a new operator to the job , It will initialize without any state . Savepoint Contains the state of each stateful operator . Stateless operators are not at all Savepoint Part of . The behavior of the new operator is similar to that of the stateless operator .
What happens if stateful operators are deleted from the job ?
By default , from Savepoint Upon recovery, an attempt will be made to assign all States to the new job . If any state operator is deleted , You can't go from Savepoint recovery .
You can use run Command settings --allowNonRestoredState ( abbreviation :-n ) To allow the deletion of stateful operators :
$ bin/flink run -s :savepointPath -n [:runArgs]
If I reorder stateful operators in my homework , What's going to happen ?
If these operators are assigned ID, They will recover as usual .
If there is no allocation ID , Then there is a state operator automatically generated ID It is likely to change after reordering . This will cause you to fail from the previous Savepoint recovery .
If I add 、 Delete or reorder operators without status in the job , What's going to happen ?
If you will ID Assign to stateful operators , Then the stateless operator will not affect Savepoint recovery .
If there is no allocation ID , Then there is a state operator automatically generated ID It is likely to change after reordering . This will cause you to fail from the previous Savepoint recovery .
What happens when I change the parallelism of the program during recovery ?
If Savepoint Yes, it is Flink >= 1.2.0 The trigger , And no use like Checkpointed Such a status that is not recommended API, Then you can simply start from Savepoint Restore the program and specify a new parallelism .
If you are from Flink < 1.2.0 The trigger Savepoint recovery , Or use what is now obsolete api, Then you must first compare your homework with Savepoint Migrate to Flink >= 1.2.0, Then you can change the parallelism . See Upgrade assignments and Flink Version guide .
I can put savepoint Is the file moved to stable storage ?
The quick answer to this question is “ yes ”, from Flink 1.11.0 Version start ,savepoint It's self-contained , You can migrate on demand savepoint Recover the file .
Reference resources
边栏推荐
- Mongodb在node中的使用
- Error occurred during initialization of VM Could not reserve enough space for object heap
- Activiti directory (IV) inquiry agency / done, approved
- Shell_ 01_ data processing
- 肖申克的救赎有感
- The most lost road I have ever walked through is the brain circuit of ByteDance programmers
- Alibaba cloud server docker installation mysql5.5
- Solr new core
- 吴军三部曲见识(四) 大家智慧
- 8086 CPU internal structure
猜你喜欢

汇编语言段定义

Resume of a microservice architecture teacher with 10 years of work experience

MySQL数字函数

"One year after graduation, I won ACL best paper"

Alibaba cloud server docker installation mysql5.5
![[graduation project] QT from introduction to practice: realize imitation of QQ communication, which is also the last blog post in school.](/img/ef/2072aac5f85c7daf39174784dec7ee.jpg)
[graduation project] QT from introduction to practice: realize imitation of QQ communication, which is also the last blog post in school.

吴军三部曲见识(四) 大家智慧

Train 100 pictures for 1 hour, and the style of the photos changes at will. There is a demo at the end of the article | siggraph 2021

Programmer orientation problem solving methodology

汇编课后作业
随机推荐
Activiti directory (III) deployment process and initiation process
Data transfer instruction
Fdog series (I): think about it. It's better to write a chat software. Then start with the imitation QQ registration page.
Set up the flutter environment pit collection
程序员定位解决问题方法论
Prototype chain inheritance
Control transfer instruction
Eight part essay that everyone likes
JVM类加载子系统
Fdog series (4): use the QT framework to imitate QQ to realize the login interface, interface chapter.
Introduction to microservices
README. txt
Which is more important for programming, practice or theory [there are some things recently, I don't have time to write an article, so I'll post an article on hydrology, and I'll fill in later]
How to generate six digit verification code
暑假刷题嗷嗷嗷嗷
JVM之垃圾回收器上篇
Go language uses the thrift protocol to realize the client and service end reports not enough arguments in call to oprot Writemessagebegin error resolution
Fdog series (III): use Tencent cloud SMS interface to send SMS, write database, deploy to server, web finale.
Von Neumann architecture
算数运算指令