当前位置:网站首页>Flink函数(1):rich function
Flink函数(1):rich function
2022-07-24 05:19:00 【sf_www】
rich function,即“富”函数,即相比一般的function来说,他提供了更多的功能。它定义了函数生命周期的一些方法,并可以获取函数运行时的上下文。方法有open, close, getRuntimeContext, setRuntimeContext。特别有用的地方就是我们可以获取keyed state,来做更多的控制,比如keyed State的TTL的设置。(State是使用RuntimeContext访问的,因此它只能在富函数中使用)
所有需要用户定义函数的转换都可以将rich函数作为参数(All transformations that require a user-defined function can instead take as argument a rich function. )因为RichFunction extends Function。
下面是官方文档对其一些有用的地方的说明:
Rich functions provide, in addition to the user-defined function (map, reduce, etc), four methods: open, close, getRuntimeContext, and setRuntimeContext. These are useful for parameterizing the function (see Passing Parameters to Functions), creating and finalizing local state, accessing broadcast variables (see Broadcast Variables), and for accessing runtime information such as accumulators and counters (see Accumulators and Counters), and information on iterations (see Iterations).
下面对关键函数进行说明:
void open(Configuration parameters) throws Exception;在函数(function,比如map join等)调用前,open()方法(method)先被调用,用于初始化操作,因此适合一次性设置工作。 作为函数迭代的一部分,此方法将在每个上层迭代开始前调用(For functions that are part of an iteration, this method will be invoked at the beginning of each iteration uperstep.)。
void close() throws Exception;是生命周期中的最后一个调用的方法,做一些清理工作。
RuntimeContext getRuntimeContext();获取运行时上下文
void setRuntimeContext(RuntimeContext t);设置运行时上下文。每个并行的算子子任务都有一个运行时上下文,上下文记录了这个算子运行过程中的一些信息,包括算子当前的并行度、算子子任务序号、广播数据、累加器、监控数据。最重要的是,我们可以从上下文里获取状态数据;
举例实现Count的TTL设置
package com.xx.bigdata.flink;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
public class TTLCountMapFunction extends RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
private transient ValueState<Integer> state;
@Override
public Tuple2<String, Integer> map(Tuple2<String, Integer> in) throws Exception {
Integer currentCount = state.value();
if(null == currentCount) {
currentCount = 0;
}
Integer nowCount = currentCount + in.f1;
state.update(nowCount);
return new Tuple2<>(in.f0, nowCount);
}
public void open(Configuration parameters) throws Exception {
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(300))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("count-state", Integer.class);
stateDescriptor.enableTimeToLive(ttlConfig);
state = getRuntimeContext().getState(stateDescriptor);
}
}边栏推荐
- 波卡创始人 Gavin Wood:波卡治理 v2 会有哪些变化?
- 自定义MVC 2.0
- WASM VS EVM,波卡的选择预示了公链未来
- MySQL之函数运用
- php的多选、单选结果怎么在前台显示?
- 在本地怎么使用phpstudy搭建WordPress网站
- 响应式页面
- Station B video comment crawling - take the blade of ghost destruction as an example (and store it in CSV)
- 4. Draw a red triangle and a yellow square on the screen. Triangle in the back, small; Square in front, big. Using the fusion technology, the triangle can be seen through the square, and the source an
- Cess test online line! The first decentralized storage network to provide multiple application scenarios
猜你喜欢
随机推荐
The project is deployed to the public network for free (intranet penetration)
Canvas Bezier Bezier curve
canvas - 填充
PoS机制随机性解读,波卡的随机性原理如何运作?
自定义MVC 2.0
Hex to RGB
mapboxgl + geoserver 配置本地地图教程
vscode配置autoprefixer
【虚拟化】如何将虚拟机从workstation转换到esxi
Function_ This keyword
The profound meaning of unlimited ecological development in Poka -- Multidimensional Interpretation of parallel chain
B站视频评论爬取——以鬼灭之刃为例(并将其存储到csv中)
LP双币流动性质押挖矿系统逻辑开发分析
Vulnhub solidstate: 1 target penetration test
通用分页2.0
How can the multiple-choice and single choice results of PHP be displayed in the foreground?
6. Draw a Bezier curve and a Bezier surface on the screen
MySQL误操作后如何快速恢复数据
vulnhub-SolidState: 1靶机渗透测试
Define attribute of UMI









