当前位置:网站首页>Flink parsing (VI): savepoints

Flink parsing (VI): savepoints

2022-07-06 17:11:00 Stray_ Lambs

Catalog

Savepoints

Assignment operator ID

savepoint operation

To configure savepoint

Trigger savepoint

recovery savepoint

Delete savepoint

F.A.Q

I should assign all operators in my homework ID Do you ?

If I add a new operator that needs state in my homework , What's going to happen ?

What happens if stateful operators are deleted from the job ?

If I reorder stateful operators in my homework , What's going to happen ?

If I add 、 Delete or reorder operators without status in the job , What's going to happen ?

What happens when I change the parallelism of the program during recovery ?

I can put savepoint Is the file moved to stable storage ?

Reference resources


Savepoints

because Flink Of savepoint And checkpoint There are also differences , As I wrote before Flink Recovery mechanism Mentioned . But not very detailed , So according to official documents , To be specific savepoint.

Savepoint It's based on Flink checkpoint The execution state of the flow job created by the mechanism Consistent mirroring , To put it bluntly, it's actually related to checkpoint equally , It is also a snapshot file , But this snapshot was triggered manually .checkpoint It is mainly used for fault recovery , And we can use savepoint Conduct Flink Job stop and restart 、fork Or update , For example, version update or active stop Job When you use .

Savepoint It's made up of two parts : Stable storage ( for example HDFS,S3 etc. ) Directory of binary files stored on ( Usually very large ), And metadata files ( Relatively small ). among , The file on the stable storage represents the data image of the job execution state ( Snapshot ), The metadata file is contained in the form of relative path, which is mainly intended to serve as Savepoint Pointer to a file on a portion of stable storage .

Actually savepoint and checkpoint The code implementation of is basically the same , And the generated format is also consistent , But why are there two names ? That's because there are some conceptual differences between them ( also RocksDB The incremental checkpoint The difference between snapshots , There may be others in the future ).

Conceptually speaking ,savepoint It's a bit like the backup of traditional databases , and checkpoint It's a bit like the difference between restoring logs .

Checkpoint The purpose is to provide a recovery mechanism for jobs that fail unexpectedly , from Flink Conduct management ( establish 、 management 、 Delete ), No need to interact with people , It is insensitive to users , commonly Job The end will be Flink Automatically delete checkpoint Unless there is a configuration reserved .

Savepoint More attention is paid to portability and job change support .savepoint It is usually created by users 、 management 、 Delete . These use cases are planned , Manual backup and recovery , Usually upgrading Flink edition , Adjust user logic , Use when changing the parallelism or red blue deployment . and Job After the end ,savepoint There will be .

Another difference is , from 1.11.0 Version start ,Flink You can move copies to savepoint Move the directory anywhere , And then recover , and checkpoint Arbitrary movement of files is not supported , because checkpoint Contains the absolute path of some files .

However, there are two situations that do not support mobile savepoint Directory movement

  • If enabled  entropy injection : In this case ,savepoint The directory does not contain all the data files , Because the injected paths will be scattered in each path . Because a public root directory is missing , therefore savepoint Will contain the absolute path , This makes it impossible to support savepoint Directory migration .
  • The assignment contains task-owned state, such as GenericWriteAhreadLog sink.

If we use MemoryStateBackend , May be There is no data file in the directory , That's because in this mode ,metadata and savepoint All the data will be saved in _metadata In file .

Be careful : It is not recommended to move or delete the last... Of a running job Savepoint , Because this may interfere with failure recovery . therefore ,Savepoint It has side effects on the accurate receiver , To ensure precise semantics , If in the last Savepoint There was no Checkpoint , So it's going to use Savepoint Resume .

Assignment operator ID

When writing project code , It is strongly recommended that uid(string) Manually specify the operator's ID, Later, according to these operators ID Restore the state of each operator , To ensure that your savepoint Consistent with your code . Here is a sample code .

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

The reason is because , If you don't specify ID, It will be customized ID, If ID Unchanged is no problem , Can be done savepoint recovery . But if you are manually savepoint after , We add or delete code , Changed the structure of the code , that ID Will change . A simple example , If the original code logic is map->filter->keyby, Suppose these operators generate ID In turn, is 1,2,3, We savepoint after , Change code , Change it to map->keyby->sum, So automatically generated ID And it's going to be 1,2,3, But if we savepoint When it comes to recovery , That's not right. .‘

Flink Generate ID Depending on the structure of the program , And very sensitive to program changes , So it is strongly recommended that , in order to savepoint Restore consistency , Or manually assign operators ID.

savepoint operation

To configure savepoint

You can go through  state.savepoints.dir  To configure savepoint Default directory for . Trigger savepoint when , This directory will be used to store savepoint. You can override the default by specifying a custom target directory using the trigger command ( see also :targetDirectory Parameters ). If no default value is configured and no custom target directory is specified , The trigger Savepoint Will fail .

Be careful :  The destination directory must be JobManager(s) and TaskManager(s) Accessible locations , for example , Location on distributed file system .

Trigger savepoint

When triggered savepoint when , A new savepoint Catalog , Where data and metadata are stored .

# This will trigger  ID  by  :jobId  Of  Savepoint, And return to the created  Savepoint  route .  You need this path to restore and delete  Savepoint .
$ bin/flink savepoint :jobId [:targetDirectory]

# This will trigger  ID  by  :jobId  and  YARN  Applications  ID :yarnAppId  Of  Savepoint, And return to the created  Savepoint  The path of .
$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId

recovery savepoint

$ bin/flink run -s :savepointPath [:runArgs]

This will commit the job and specify the to recover from Savepoint . You can give Savepoint Directory or  _metadata  Path to file .

By default ,resume The operation will attempt to Savepoint All States of are mapped back to the program you want to restore . If the operator is deleted , You can use the  --allowNonRestoredState(short:-n) Option skip the state that cannot be mapped to the new program

$ bin/flink run -s :savepointPath -n [:runArgs]

Delete savepoint

$ bin/flink savepoint -d :savepointPath

This will delete the data stored in  :savepointPath  Medium Savepoint.

Please note that , It can also be manually deleted through regular file system operations Savepoint , It doesn't affect the others Savepoint or Checkpoint( please remember , Every Savepoint Are self-contained ). stay Flink 1.2 Before , Use the above Savepoint Command execution is a more tedious task .

F.A.Q

I should assign all operators in my homework ID Do you ?

Based on experience , Yes . Strictly speaking , Only through  uid  Method to assign ID That's enough .Savepoint Only the states of these stateful operators are included , Stateless operators are not Savepoint Part of .

In practice , It is recommended that all operators be assigned ID, because Flink Some built-in operators ( Such as Window operator ) It's also stateful , Whether the built-in operator has a state is not very obvious . If you are completely sure that the operator is stateless , Then you can skip  uid  Method .

If I add a new operator that needs state in my homework , What's going to happen ?

When you add a new operator to the job , It will initialize without any state . Savepoint Contains the state of each stateful operator . Stateless operators are not at all Savepoint Part of . The behavior of the new operator is similar to that of the stateless operator .

What happens if stateful operators are deleted from the job ?

By default , from Savepoint Upon recovery, an attempt will be made to assign all States to the new job . If any state operator is deleted , You can't go from Savepoint recovery .

You can use run Command settings  --allowNonRestoredState ( abbreviation :-n ) To allow the deletion of stateful operators :

$ bin/flink run -s :savepointPath -n [:runArgs]

If I reorder stateful operators in my homework , What's going to happen ?

If these operators are assigned ID, They will recover as usual .

If there is no allocation ID , Then there is a state operator automatically generated ID It is likely to change after reordering . This will cause you to fail from the previous Savepoint recovery .

If I add 、 Delete or reorder operators without status in the job , What's going to happen ?

If you will ID Assign to stateful operators , Then the stateless operator will not affect Savepoint recovery .

If there is no allocation ID , Then there is a state operator automatically generated ID It is likely to change after reordering . This will cause you to fail from the previous Savepoint recovery .

What happens when I change the parallelism of the program during recovery ?

If Savepoint Yes, it is Flink >= 1.2.0 The trigger , And no use like  Checkpointed  Such a status that is not recommended API, Then you can simply start from Savepoint Restore the program and specify a new parallelism .

If you are from Flink < 1.2.0 The trigger Savepoint recovery , Or use what is now obsolete api, Then you must first compare your homework with Savepoint Migrate to Flink >= 1.2.0, Then you can change the parallelism . See Upgrade assignments and Flink Version guide .

I can put savepoint Is the file moved to stable storage ?

The quick answer to this question is “ yes ”, from Flink 1.11.0 Version start ,savepoint It's self-contained , You can migrate on demand savepoint Recover the file .

Reference resources

Savepoints | Apache Flink

原网站

版权声明
本文为[Stray_ Lambs]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/187/202207060928120060.html