当前位置:网站首页>Flink 解析(六):Savepoints

Flink 解析(六):Savepoints

2022-07-06 09:32:00 Stray_Lambs

目录

Savepoints

分配算子ID

savepoint操作

配置savepoint

触发savepoint

恢复savepoint

删除savepoint

F.A.Q

我应该为我作业中的所有算子分配 ID 吗?

如果我在作业中添加一个需要状态的新算子,会发生什么?

如果从作业中删除有状态的算子会发生什么?

如果我在作业中重新排序有状态算子,会发生什么?

如果我添加、删除或重新排序作业中没有状态的算子,会发生什么?

当我在恢复时改变程序的并行度时会发生什么?

我可以将 savepoint 文件移动到稳定存储上吗?

参考


Savepoints

因为Flink的savepoint与checkpoint也有不同之处,跟我之前写的Flink恢复机制有提到过。但是不太详细,所以根据官方文档,详细的说下savepoint。

Savepoint是根据Flink checkpoint机制所创建的流作业执行状态的一致镜像,说白了其实跟checkpoint一样,也是一个快照文件,只不过这个快照是我们手动去触发的。checkpoint主要是为了故障恢复时使用,而我们可以使用savepoint进行Flink作业停止与重启、fork或者更新,例如版本更新或者主动停止Job的时候使用。

Savepoint由两部分组成:稳定存储(例如HDFS,S3等)上存储的二进制文件的目录(通常很大),以及元数据文件(相对较小)。其中,稳定存储上的文件表示作业执行的状态的数据镜像(即快照),元数据文件以相对路径的形式包含主要只想作为Savepoint一部分的稳定存储上的文件的指针。

其实savepoint和checkpoint的代码实现基本上是相同的,并且生成的格式也是一致的,然而为什么会有两种名称呢? 那是因为它们之间存在着概念上的一些差异(还有RocksDB增量checkpoint快照的区别,未来可能还有其他的)。

从概念上来说,savepoint有点像传统数据库当中的备份,而checkpoint有点像是恢复日志的区别

Checkpoint目的在于为意外失败的作业提供恢复机制,由Flink进行管理(创建、管理、删除),不需要与人交互,对于使用者是无感的,一般Job结束就会被Flink自动删除checkpoint除非有配置保留。

Savepoint更多的关注于可移植性和对作业的更改支持。savepoint一般是由用户创建、管理、删除。这些用例是计划的,手动备份和恢复的,一般在升级Flink版本,调整用户逻辑,更改并行度或者进行红蓝部署的时候使用。而且Job结束后,savepoint还会存在。

还有一个不同在于,从1.11.0版本开始,Flink可以通过移动拷贝将savepoint目录移动到任意地方,然后再进行恢复,而checkpoint不支持任意移动文件,因为checkpoint中包含一些文件的绝对路径

不过有两种情况不支持移动savepoint目录的移动

  • 如果启用了 entropy injection :这种情况下,savepoint目录不包含所有的数据文件,因为注入的路径会分散在各个路径中。因为缺少一个公共的根目录,所以savepoint将包含绝对路径,从而导致无法支持savepoint目录的迁移。
  • 作业包含了task-owned state, 比如GenericWriteAhreadLog sink。

如果我们使用的是MemoryStateBackend ,可能会在目录下没有数据文件,那是因为该模式下,metadata和savepoint的数据都会保存在_metadata文件中

注意:不建议移动或删除正在运行作业的最后一个 Savepoint ,因为这可能会干扰故障恢复。因此,Savepoint 对精确一次的接收器有副作用,为了确保精确一次的语义,如果在最后一个 Savepoint 之后没有 Checkpoint ,那么将使用 Savepoint 进行恢复。

分配算子ID

在编写工程项目代码的时候,强烈建议通过uid(string)的方式手动指定算子的ID,后续根据这些算子ID进行恢复每个算子的状态,以确保你的savepoint与你的代码一致。这边给个样例代码。

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

原因是因为,如果不手动指定ID,则会自定生成ID,如果ID不变则没啥问题,可以进行savepoint恢复。但是如果在手动savepoint之后,我们增加或者删除代码,更改了代码的结构,那么ID将会更改。举个简单的例子,如果原本的代码逻辑是map->filter->keyby,假设这些算子生成的ID依次是1,2,3,我们savepoint之后,更改代码,改为map->keyby->sum,那么自动生成的ID还将会是1,2,3,但是如果我们savepoint进行恢复的时候,就不对了。‘

Flink生成ID取决于程序的结构,并且对程序的更改很敏感,所以强烈建议,为了savepoint恢复一致性,还是手动分配算子ID。

savepoint操作

配置savepoint

你可以通过 state.savepoints.dir 配置 savepoint 的默认目录。 触发 savepoint 时,将使用此目录来存储 savepoint。 你可以通过使用触发器命令指定自定义目标目录来覆盖缺省值(请参阅:targetDirectory参数)。如果既未配置缺省值也未指定自定义目标目录,则触发 Savepoint 将失败。

注意: 目标目录必须是 JobManager(s) 和 TaskManager(s) 可访问的位置,例如,分布式文件系统上的位置。

触发savepoint

当触发savepoint时,将创建一个新的savepoint目录,其中存储数据和元数据。

#这将触发 ID 为 :jobId 的作业的 Savepoint,并返回创建的 Savepoint 路径。 你需要此路径来还原和删除 Savepoint 。
$ bin/flink savepoint :jobId [:targetDirectory]

#这将触发 ID 为 :jobId 和 YARN 应用程序 ID :yarnAppId 的作业的 Savepoint,并返回创建的 Savepoint 的路径。
$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId

恢复savepoint

$ bin/flink run -s :savepointPath [:runArgs]

这将提交作业并指定要从中恢复的 Savepoint 。 你可以给出 Savepoint 目录或 _metadata 文件的路径。

默认情况下,resume 操作将尝试将 Savepoint 的所有状态映射回你要还原的程序。 如果删除了运算符,则可以通过 --allowNonRestoredState(short:-n)选项跳过无法映射到新程序的状态

$ bin/flink run -s :savepointPath -n [:runArgs]

删除savepoint

$ bin/flink savepoint -d :savepointPath

这将删除存储在 :savepointPath 中的 Savepoint。

请注意,还可以通过常规文件系统操作手动删除 Savepoint ,而不会影响其他 Savepoint 或 Checkpoint(请记住,每个 Savepoint 都是自包含的)。 在 Flink 1.2 之前,使用上面的 Savepoint 命令执行是一个更乏味的任务。

F.A.Q

我应该为我作业中的所有算子分配 ID 吗?

根据经验,是的。 严格来说,仅通过 uid 方法给有状态算子分配 ID 就足够了。Savepoint 仅包含这些有状态算子的状态,无状态算子不是 Savepoint 的一部分

在实践中,建议给所有算子分配 ID,因为 Flink 的一些内置算子(如 Window 算子)也是有状态的,而内置算子是否有状态并不很明显。 如果你完全确定算子是无状态的,则可以跳过 uid 方法。

如果我在作业中添加一个需要状态的新算子,会发生什么?

当你向作业添加新算子时,它将在没有任何状态的情况下进行初始化。 Savepoint 包含每个有状态算子的状态。 无状态算子根本不是 Savepoint 的一部分。 新算子的行为类似于无状态算子

如果从作业中删除有状态的算子会发生什么?

默认情况下,从 Savepoint 恢复时将尝试将所有状态分配给新作业。如果有状态算子被删除,则无法从 Savepoint 恢复

你可以通过使用 run 命令设置 --allowNonRestoredState (简称:-n )来允许删除有状态算子:

$ bin/flink run -s :savepointPath -n [:runArgs]

如果我在作业中重新排序有状态算子,会发生什么?

如果给这些算子分配了 ID,它们将像往常一样恢复。

如果没有分配 ID ,则有状态操作符自动生成的 ID 很可能在重新排序后发生更改。这将导致你无法从以前的 Savepoint 恢复。

如果我添加、删除或重新排序作业中没有状态的算子,会发生什么?

如果将 ID 分配给有状态操作符,则无状态操作符不会影响 Savepoint 恢复

如果没有分配 ID ,则有状态操作符自动生成的 ID 很可能在重新排序后发生更改。这将导致你无法从以前的Savepoint 恢复。

当我在恢复时改变程序的并行度时会发生什么?

如果 Savepoint 是用 Flink >= 1.2.0 触发的,并且没有使用像 Checkpointed 这样的不推荐的状态API,那么你可以简单地从 Savepoint 恢复程序并指定新的并行度。

如果你正在从 Flink < 1.2.0 触发的 Savepoint 恢复,或者使用现在已经废弃的 api,那么你首先必须将作业和 Savepoint 迁移到 Flink >= 1.2.0,然后才能更改并行度。参见升级作业和Flink版本指南

我可以将 savepoint 文件移动到稳定存储上吗?

这个问题的快速答案目前是“是”,从 Flink 1.11.0 版本开始,savepoint 是自包含的,你可以按需迁移 savepoint 文件后进行恢复。

参考

Savepoints | Apache Flink

原网站

版权声明
本文为[Stray_Lambs]所创,转载请带上原文链接,感谢
https://blog.csdn.net/Stray_Lambs/article/details/122242314