当前位置:网站首页>Flink学习笔记(九)状态编程
Flink学习笔记(九)状态编程
2022-07-03 09:00:00 【小胡今天有变强吗】
文章目录
9. 状态编程
9.1 Flink 中的状态
在流处理中,数据是连续不断到来和处理的。每个任务进行计算处理时,可以基于当前数 据直接转换得到输出结果;也可以依赖一些其他数据。这些由一个任务维护,并且用来计算输 出结果的所有数据,就叫作这个任务的状态。
9.1.1 有状态算子
在 Flink 中,算子任务可以分为无状态和有状态两种情况。
无状态的算子任务只需要观察每个独立事件,根据当前输入的数据直接转换输出结果。而有状态的算子任务,则除当前数据之外,还需要一些其他数据来得到计算结果。这里的 “其他数据”,就是所谓的状态(state),最常见的就是之前到达的数据,或者由之前数据计算 出的某个结果。
有状态算子处理流程:
9.1.2 状态的管理
在传统的事务型处理架构中,这种额外的状态数据是保存在数据库中的。Flink 的将状态直接保存在内存中来保证性能,并通过分布式扩展来提高吞吐量。
Flink 有一套完整的状态管理机制,将底层一些核心功能全部封装起 来,包括状态的高效存储和访问、持久化保存和故障恢复,以及资源扩展时的调整。
9.1.3 状态的分类
- 托管状态(Managed State)和原始状态(Raw State)
Flink 的状态有两种:托管状态(Managed State)和原始状态(Raw State)。托管状态就是 由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由 Flink 实现,我们 只要调接口就可以;而原始状态则是自定义的,相当于就是开辟了一块内存,需要我们自己管 理,实现状态的序列化和故障恢复。
- 算子状态(Operator State)和按键分区状态(Keyed State)
很多有状态的操作(比如聚合、窗口)都是要先做 keyBy 进行按键分区的。按键分区 之后,任务所进行的所有计算都应该只针对当前 key 有效,所以状态也应该按照 key 彼此隔离。 在这种情况下,状态的访问方式又会有所不同。基于此,又可以将托管状态分为两类:算子状态和按键分区状态。
(1)算子状态(Operator State)
状态作用范围限定为当前的算子任务实例,也就是只对当前并行子任务实例有效。这就意 味着对于一个并行子任务,占据了一个“分区”,它所处理的所有数据都会访问到相同的状态, 状态对于同一任务而言是共享的。
(2)按键分区状态(Keyed State)
状态是根据输入流中定义的键(key)来维护和访问的,所以只能定义在按键分区流 (KeyedStream)中,也就 keyBy 之后才可以使用。
9.2 按键分区状态(Keyed State)
9.2.1 基本概念和特点
按键分区状态(Keyed State)顾名思义,是任务按照键(key)来访问和维护的状态。
在进行按键分区(keyBy)之后,具有相同键的所有数据,都会分配到同一个 并行子任务中。因为一个并行子任务可能会处理多个 key 的数据,所以 Flink 需要对 Keyed State 进行一些 特殊优化。在底层,Keyed State 类似于一个分布式的映射(map)数据结构,所有的状态会根 据 key 保存成键值对(key-value)的形式。这样当一条数据到来时,任务就会自动将状态的访 问范围限定为当前数据的 key,从 map 存储中读取出对应的状态值。所以具有相同 key 的所有 数据都会到访问相同的状态,而不同 key 的状态之间是彼此隔离的。
使用 Keyed State 必须基于 KeyedStream。
9.2.2 支持的结构类型
实际应用中,需要保存为状态的数据会有各种各样的类型,有时还需要复杂的集合类型。
- 值状态(ValueState)
状态中只保存一个“值”(value)。ValueState本身是一个接口,源码中定 义如下:
public interface ValueState<T> extends State {
T value() throws IOException;//获取当前状态的值
void update(T value) throws IOException;//对状态进行更新,传入的参数 value 就是要覆写的状态值
}
- 列表状态(ListState)
将需要保存的数据,以列表(List)的形式组织起来。在 ListState接口中同样有一个 类型参数 T,表示列表中数据的类型。。ListState 也提供了一系列的方法来操作状态,使用方式 与一般的 List 非常相似。
- 映射状态(MapState)
把一些键值对(key-value)作为状态整体保存起来,可以认为就是一组 key-value 映射的 列表。对应的 MapState接口中,就会有 UK、UV 两个泛型,分别表示保存的 key 和 value 的类型。MapState 提供了操作映射状态的方法,与 Map 的使用非常类似。
- 归约状态(ReducingState)
类似于值状态(Value),不过需要对添加进来的所有数据进行归约,将归约聚合之后的值 作为状态保存下来。ReducintState这个接口调用的方法类似于 ListState,只不过它保存的 只是一个聚合值,所以调用.add()方法时,不是在状态列表里添加元素,而是直接把新数据和 之前的状态进行归约,并用得到的结果更新状态。
- 聚合状态(AggregatingState)
9.2.3 整体介绍
在 Flink 中,状态始终是与特定算子相关联的;算子在使用状态前首先需要“注册”,其 实就是告诉 Flink 当前上下文中定义状态的信息,这样运行时的 Flink 才能知道算子有哪些状态。状态的注册,主要是通过“状态描述器”(StateDescriptor)来实现的。
9.2.4 状态生存时间(TTL)
状态的失效其实不需要立即删除,所以我们可以给状态附加一个属性,也就是状态的“失 效时间”。状态创建的时候,设置 失效时间 = 当前时间 + TTL;之后如果有对状态的访问和 修改,我们可以再对失效时间进行更新;当设置的清除条件被触发时(比如,状态被访问的时 候,或者每隔一段时间扫描一次失效状态),就可以判断状态是否失效、从而进行清除了。
配置状态的 TTL 时,需要创建一个 StateTtlConfig 配置对象,然后调用状态描述器 的.enableTimeToLive()方法启动 TTL 功能。
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("my
state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
- .newBuilder()
状态 TTL 配置的构造器方法,必须调用,返回一个 Builder 之后再调用.build()方法就可以 得到 StateTtlConfig 了。方法需要传入一个 Time 作为参数,这就是设定的状态生存时间。
- .setUpdateType()
设置更新类型。
- .setStateVisibility()
设置状态的可见性。
需要注意,目前的 TTL 设置只支持处理时间。
9.3 算子状态(Operator State)
9.3.1 基本概念和特点
算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前 算子任务。算子状态跟数据的 key 无关,所以不同 key 的数据只要被分发到同一个并行子任务, 就会访问到同一个 Operator State。
9.3.2 状态类型
算子状态也支持不同的结构类型,主要有三种:ListState、UnionListState 和 BroadcastState。
- 列表状态(ListState)
与 Keyed State 中的 ListState 一样,将状态表示为一组数据的列表。
- 联合列表状态(UnionListState)
与 ListState 类似,联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别 在于,算子并行度进行缩放调整时对于状态的分配方式不同。
- 广播状态(BroadcastState)
算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。 这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样。
9.4 状态持久化和状态后端
在 Flink 的状态管理机制中,很重要的一个功能就是对状态进行持久化(persistence)保 存,这样就可以在发生故障后进行重启恢复。Flink 对状态进行持久化的方式,就是将当前所 有分布式状态进行“快照”保存,写入一个“检查点”(checkpoint)或者保存点(savepoint) 保存到外部存储系统中。具体的存储介质,一般是分布式文件系统(distributed file system)。
9.4.1 检查点(Checkpoint)
有状态流应用中的检查点(checkpoint),其实就是所有任务的状态在某个时间点的一个快 照(一份拷贝)。简单来讲,就是一次“存盘”,让我们之前处理数据的进度不要丢掉。在一个 流应用程序运行时,Flink 会定期保存检查点,在检查点中会记录每个算子的 id 和状态;如果 发生故障,Flink 就会用最近一次成功保存的检查点来恢复应用的状态,重新启动处理流程, 就如同“读档”一样。
默认情况下,检查点是被禁用的,需要在代码中手动开启。直接调用执行环境 的.enableCheckpointing()方法就可以开启检查点。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment();
env.enableCheckpointing(1000);
这里传入的参数是检查点的间隔时间,单位为毫秒。
9.4.2 状态后端(State Backends)
在应用进 行检查点保存时,首先会由 JobManager 向所有 TaskManager 发出触发检查点的命令; TaskManger 收到之后,将当前任务的所有状态进行快照保存,持久化到远程的存储介质中; 完成之后向 JobManager 返回确认信息。
在 Flink 中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就 叫作状态后端(state backend)。状态后端主要负责两件事:一是本地的状态管理,二是将检查 点(checkpoint)写入远程的持久化存储。
状态后端的分类
(1)哈希表状态后端(HashMapStateBackend)
把状态存放在内存里。具体实现上,哈希表状态后端在内 部会直接把状态当作对象(objects),保存在 Taskmanager 的 JVM 堆(heap)上。对于检查点的保存,一般是放在持久化的分布式文件系统(file system)中,也可以通过 配置“检查点存储”(CheckpointStorage)来另外指定。
(2)内嵌 RocksDB 状态后端(EmbeddedRocksDBStateBackend)
RocksDB 是一种内嵌的 key-value 存储介质,可以把数据持久化到本地硬盘。配置 EmbeddedRocksDBStateBackend 后,会将处理中的数据全部放入 RocksDB 数据库中,RocksDB 默认存储在 TaskManager 的本地数据目录里。
总结
详细介绍了 Flink 中的按键分区状态(Keyed State)和算 子状态(Operator State)的特点和用法,并对广播状态(Broadcast State)做了进一步的展开 说明。还介绍了状态的持久化和状态后端,引出了检查点(checkpoint)的概念。 检查点是一个非常重要的概念,是 Flink 容错机制的核心。
边栏推荐
- 【点云处理之论文狂读经典版13】—— Adaptive Graph Convolutional Neural Networks
- LeetCode 324. Swing sort II
- Hudi 快速体验使用(含操作详细步骤及截图)
- WARNING: You are using pip ; however. Later, upgrade PIP failed, modulenotfounderror: no module named 'pip‘
- LeetCode 438. Find all letter ectopic words in the string
- Introduction to the basic application and skills of QT
- 【点云处理之论文狂读前沿版12】—— Adaptive Graph Convolution for Point Cloud Analysis
- Common formulas of probability theory
- 【点云处理之论文狂读前沿版10】—— MVTN: Multi-View Transformation Network for 3D Shape Recognition
- [kotlin learning] control flow of higher-order functions -- lambda return statements and anonymous functions
猜你喜欢
LeetCode 324. Swing sort II
【点云处理之论文狂读前沿版12】—— Adaptive Graph Convolution for Point Cloud Analysis
【点云处理之论文狂读前沿版8】—— Pointview-GCN: 3D Shape Classification With Multi-View Point Clouds
传统企业数字化转型需要经过哪几个阶段?
excel一小时不如JNPF表单3分钟,这样做报表,领导都得点赞!
Crawler career from scratch (3): crawl the photos of my little sister ③ (the website has been disabled)
LeetCode 75. Color classification
Data mining 2021-4-27 class notes
Hudi 快速体验使用(含操作详细步骤及截图)
LeetCode 30. Concatenate substrings of all words
随机推荐
On a un nom en commun, maître XX.
[point cloud processing paper crazy reading frontier edition 13] - gapnet: graph attention based point neural network for exploring local feature
Numerical analysis notes (I): equation root
Problems in the implementation of lenet
We have a common name, XX Gong
Sword finger offer II 029 Sorted circular linked list
Construction of simple database learning environment
ERROR: certificate common name “www.mysql.com” doesn’t match requested host name “137.254.60.11”.
Linxu learning (4) -- Yum and apt commands
Temper cattle ranking problem
Vs2019 configuration opencv3 detailed graphic tutorial and implementation of test code
State compression DP acwing 91 Shortest Hamilton path
Bert install no package metadata was found for the 'sacraments' distribution
Save the drama shortage, programmers' favorite high-score American drama TOP10
Go language - Reflection
Powerdesign reverse wizard such as SQL and generates name and comment
Using Hudi in idea
How to check whether the disk is in guid format (GPT) or MBR format? Judge whether UEFI mode starts or legacy mode starts?
Common formulas of probability theory
Explanation of the answers to the three questions