当前位置:网站首页>Flink accumulator Counter 累加器 和 计数器
Flink accumulator Counter 累加器 和 计数器
2022-08-05 05:14:00 【bigdata1024】
Accumulators(累加器)是非常简单的,通过一个add操作累加最终的结果,在job执行后可以获取最终结果
最简单的累加器是counter(计数器):你可以通过Accumulator.add(V value)这个方法进行递增。在任务的最后,flink会吧所有的结果进行合并,然后把最终结果发送到client端。累加器在调试或者你想更快了解你的数据的时候是非常有用的。
Flink现在有一下内置累加器。每个累加器都实现了Accumulator接口。
- IntCounter, LongCounter 和 DoubleCounter:下面是一个使用计数器的例子。
- Histogram(直方图):针对离线箱子数量的直方图实现。内部是一个map,从integer到integer。你可以使用它来计算分布式的值。例如:分布式的单词统计程序。
如何使用累加器
第一步你需要在你想要使用的地方创建一个自定义的transformation算子,在算子中创建一个累加器对象(在这是一个counter)。
private IntCounter numLines = new IntCounter();
第二步你需要注册这个累加器对象,通常在rich函数的open方法里面。在这里你也可以定义一个名字
getRuntimeContext().addAccumulator("num-lines", this.numLines);
现在你可以在任何操作中使用这个累加器,包含open方法和close方法
this.numLines.add(1);
最终的总结果将会存储在从execute()中返回的JobExecutionResult对象中。(这个操作需要等待任务执行完成)
myJobExecutionResult.getAccumulatorResult("num-lines")
所有的累加器在一个job任务中都是只共享一个单独的命名空间,所以你可以在你的job任务中的不同算子函数中使用相同的累加器,flink内部会合并所有名称相同的累加器。
在使用累加器和迭代器的时候注意:目前累加器的最终结果只能在任务执行结束之后才能获得。我们也计划在下一次迭代的时候返回上一次迭代的结果。你可以使用聚合器来计算每个迭代统计等数据
自定义累加器
实现自定义的累加器只需要实现Accumulator接口即可。随意创建一个pull请求,如果你认为你的自定义累加器应该附加在flink上面。
你可以选择实现Accumulator 或者 SimpleAccumulator
Accumulator<V,R>是最灵活的。它定了一个V类型的值可以进行累加,和一个R类型的值作为最终结果。例如:针对一个直方图,V是一个数字,R是一个直方图。
SimpleAccumulator的情况下,这两种类型都是一样的,例如:counters累加器。
获取更多大数据资料,视频以及技术交流请加群:
边栏推荐
- 重新审视分布式系统:永远不会有完美的一致性方案……
- 【过一下16】回顾一下七月
- coppercam入门手册[6]
- [Software Exam System Architect] Software Architecture Design ③ Domain-Specific Software Architecture (DSSA)
- 【过一下3】卷积&图像噪音&边缘&纹理
- Dashboard Display | DataEase Look at China: Data Presents China's Capital Market
- entry point injection
- OFDM Lecture 16 5 -Discrete Convolution, ISI and ICI on DMT/OFDM Systems
- CAP+BASE
- vscode+pytorch使用经验记录(个人记录+不定时更新)
猜你喜欢
第四讲 back propagation 反向传播
CAP+BASE
Calling Matlab configuration in pycharm: No module named 'matlab.engine'; 'matlab' is not a package
2022杭电多校第一场01
"Recursion" recursion concept and typical examples
第三讲 Gradient Tutorial梯度下降与随机梯度下降
coppercam入门手册[6]
Lecture 2 Linear Model Linear Model
SQL(一) —— 增删改查
Geek卸载工具
随机推荐
ESP32 485 Illuminance
【过一下9】卷积
小白一枚各位大牛轻虐虐
Error creating bean with name 'configDataContextRefresher' defined in class path resource
The underlying mechanism of the class
多线程查询结果,添加List集合
Difference between for..in and for..of
day7-列表作业(1)
ES6 生成器
OFDM 十六讲 5 -Discrete Convolution, ISI and ICI on DMT/OFDM Systems
uva1325
The software design experiment four bridge model experiment
Mesos学习
【过一下11】随机森林和特征工程
range函数作用
Convert the paper official seal in the form of a photo into an electronic official seal (no need to download ps)
2022 Hangzhou Electric Multi-School 1st Session 01
Structured Light 3D Reconstruction (2) Line Structured Light 3D Reconstruction
HQL语句执行过程
vscode+pytorch使用经验记录(个人记录+不定时更新)