当前位置:网站首页>Flink - checkpoint Failure reason: Not all required tasks are currently running
Flink - checkpoint Failure reason: Not all required tasks are currently running
2022-07-28 09:36:00 【BIT_666】
一.引言
Flink 程序增加 readFile 生成文件流后,最初运行期间 CheckPoint 存储没有问题,待文件流 Finished 后 CheckPoint 存储报错: checkpoint Failure reason: Not all required tasks are currently running,下面分析并解决下。

二.错误分析与解决
1.问题排查
Flink 场景下实现多流 uinon 并存储 ValueState,除了实时流的内容外,还需要将一个固定文件的内容进行 union 处理,所以出现如下 Overview:

最左边的 Source: Custom File Source 就是单独增加的文件流,由于是固定文件且逻辑简单,所以执行开始时 Busy 打的比较高,在此期间查看 RockerDB 指定 path 下存储的 chk 是没有问题的。

但是由于固定文件内数据量有限,处理完毕后,该 File Source 由 Running 切换至 Finished 状态:

此时由于有 Finshed 的节点,所以会空闲一些 Task,这时候在看 checkpoint 存储的报错原因:
Not all required tasks are currently running
问题应该就出在这里了,由于 File Source 执行完毕后状态转为 Finished,从而导致有 task 状态转换,而存储 checkpoint 需要所有 task 处于 Running 状态,从而导致存储 checkpoint 报错,不过这里存储出问题,不会影响任务的执行。
出错后 checkpoint 大小变为 0。
2.问题分析
问题:SourceFile 执行完毕由 Running 切换至 Finished 导致 checkpoint 执行失败
解决:只需让 SourceFile 保持 Running 状态即可

readFile 支持额外参数 watchType 与 interval,博主之前在在Flink - DataStream 获取数据总结一文中对下述参数进行了分析,这里直接搬运:

WatchType 分为 PROCESS_CONTINUOUSLY 和 PROCESS_ONCE。
PROCESS_CONTINUOUSLY : 根据 interval 间隔扫描文件检测其状态,当文件被修改后会重新处理文件内容,这将打破 exactly-once 语义。
PROCESS_ONCE:示例中默认使用该模式,该模式下只读取一遍随后任务结束。关闭 source 会导致在那之后不会再有检查点。
3.问题解决
通过前两步的分析,解决任务的办法很简单了,只需在 readFile 时修改 WatchType 为 PROCESS_CONTINUES 即可保证 Source File 处于 Running 状态。
.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(path), schema), path)
修改后 ↓↓↓
.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(path), schema),
path,
FileProcessingMode.PROCESS_CONTINUOUSLY,
86400L)
修改后再次提交任务:

PROCESS_ONCE 执行时 Source File 5min 左右会进入 Finished 状态,修改后 30min 仍然处于 Running 状态。
三.新的问题
1.问题排查

上述修改提交后,程序正常运行一段时间后,整个执行视图丢失 WaterMark,程序不再 sendRecord 并卡住。可以看到 Co-Process-Broadcast-keyed 流接到 7770550 条数据但一条数据也没有发送,查看上面 window 执行窗口也没有 Watermark 显示,所以数据不发送原因应该是 WaterMark 不更新从而导致窗口不触发所以囤积数据。

再查看上述界面发现 Source File 的 RecordsReceive 和 RecordsSent 均为0,程序最开始处理文件是有内容的且这两个参数也有值,这里没值应该是重新扫描文件导致没有数据从而产生空流,进而导致 WaterMark 消失。没有 WaterMark 导致任务停滞可以参考:Flink - 新增 BroadcastStream 无 watermark 导致数据流异常。
2.问题分析
.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(path), schema),
path,
FileProcessingMode.PROCESS_CONTINUOUSLY,
86400L)
由于我的文件为固定文件即当天不会更新,所以我设置 Watch 检测文件的间隔为 86400,正好是一天的秒数,这里我怀疑文件重新扫描的 interval 参数我配置有问题,遂查看源码:

这里 sleep 采用 Thread.sleep 执行,其单位为 mills,所以这里如果设置一天应该改为 86400000 才对。
3.问题解决 ()
这里只需把 readFile 的 interval 参数调大即可:
.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(path), schema),
path,
FileProcessingMode.PROCESS_CONTINUOUSLY,
86400000L)
当然,如果你的文件不是按天更新或者很长一段时间才更新,这个 interval 参数可以设置的很大,从而将对应 Thread 挂起不影响整体任务。调大该参数后,任务不再中途丢失 WaterMark 卡住,正常执行。

四.问题总结
此次问题的发生还是对基本的 API 掌握不熟悉导致,从而会出现漏掉 WatchType 参数,写错 interval 参数单位的情况,还是要多多学习 API,查看源码与问题分析与记录。下面是源码基于 readFile 参数的解释,有需要的同学也可以到官方 API 系统学习一下。

虽然最终解决方案只是增加两个参数,但是分析与排查的过程也很重要。
边栏推荐
猜你喜欢

Seektiger eco pass STI new progress, log in to ZB on April 14

居家健康诊断时代下,Senzo打造增强侧向流测试产品

OSS直连上传-Rails服务实践

3分钟告诉你如何成为一名黑客|零基础到黑客入门指南,你只需要掌握这五点能力

NET 3行代码实现文字转语音功能

3 minutes to tell you how to become a hacker | zero foundation to hacker getting started guide, you only need to master these five abilities

How to get more marks in the game under the new economic model of Plato farm

The secret behind three salary increases a year

一文读懂Plato Farm的ePLATO,以及其高溢价缘由

pycharm使用conda调用远程服务器
随机推荐
New features of ES6
【MySQL】MySQL错误“ERROR 2006 (HY000):MySQL server has gone away”
MySQL master-slave architecture. After the master database is suspended and restarted, how can the slave database automatically connect to the master database
Standing on the shoulders of big men, you can see further
PlatoFarm几大创新经济模型,给予当下元宇宙市场的启发
PHP连接mysql原生代码
C form application uses object binding DataGridView data binding
设计一个支持百万用户的系统
Plato Farm-以柏拉图为目标的农场元宇宙游戏
Retrofit source code analysis
TimeBasedRollingPolicy简介说明
Basic knowledge of redis
C# 读写文件从用户态切到内核态,到底是个什么流程?
Opencv installation configuration test
Extreme deflation and perpetual motion machine model will promote the outbreak of platofarm
高温天气筑牢安全生产防线,广州海珠区开展加油站应急演练
PHP 基础
Symbolic operation of MATLAB
老板:公司系统太多,能不能实现账号互通?
How to get more marks in the game under the new economic model of Plato farm