当前位置:网站首页>Flink accumulator Counter 累加器 和 计数器
Flink accumulator Counter 累加器 和 计数器
2022-08-05 05:14:00 【bigdata1024】
Accumulators(累加器)是非常简单的,通过一个add操作累加最终的结果,在job执行后可以获取最终结果
最简单的累加器是counter(计数器):你可以通过Accumulator.add(V value)这个方法进行递增。在任务的最后,flink会吧所有的结果进行合并,然后把最终结果发送到client端。累加器在调试或者你想更快了解你的数据的时候是非常有用的。
Flink现在有一下内置累加器。每个累加器都实现了Accumulator接口。
- IntCounter, LongCounter 和 DoubleCounter:下面是一个使用计数器的例子。
- Histogram(直方图):针对离线箱子数量的直方图实现。内部是一个map,从integer到integer。你可以使用它来计算分布式的值。例如:分布式的单词统计程序。
如何使用累加器
第一步你需要在你想要使用的地方创建一个自定义的transformation算子,在算子中创建一个累加器对象(在这是一个counter)。
private IntCounter numLines = new IntCounter();
第二步你需要注册这个累加器对象,通常在rich函数的open方法里面。在这里你也可以定义一个名字
getRuntimeContext().addAccumulator("num-lines", this.numLines);
现在你可以在任何操作中使用这个累加器,包含open方法和close方法
this.numLines.add(1);
最终的总结果将会存储在从execute()中返回的JobExecutionResult对象中。(这个操作需要等待任务执行完成)
myJobExecutionResult.getAccumulatorResult("num-lines")
所有的累加器在一个job任务中都是只共享一个单独的命名空间,所以你可以在你的job任务中的不同算子函数中使用相同的累加器,flink内部会合并所有名称相同的累加器。
在使用累加器和迭代器的时候注意:目前累加器的最终结果只能在任务执行结束之后才能获得。我们也计划在下一次迭代的时候返回上一次迭代的结果。你可以使用聚合器来计算每个迭代统计等数据
自定义累加器
实现自定义的累加器只需要实现Accumulator接口即可。随意创建一个pull请求,如果你认为你的自定义累加器应该附加在flink上面。
你可以选择实现Accumulator 或者 SimpleAccumulator
Accumulator<V,R>是最灵活的。它定了一个V类型的值可以进行累加,和一个R类型的值作为最终结果。例如:针对一个直方图,V是一个数字,R是一个直方图。
SimpleAccumulator的情况下,这两种类型都是一样的,例如:counters累加器。
获取更多大数据资料,视频以及技术交流请加群:
边栏推荐
猜你喜欢
Pycharm中使用pip安装第三方库安装失败:“Non-zero exit code (2)“的解决方法
OFDM Lecture 16 5 -Discrete Convolution, ISI and ICI on DMT/OFDM Systems
Structured light 3D reconstruction (1) Striped structured light 3D reconstruction
Wise Force Deleter强制删除工具
[Study Notes Dish Dog Learning C] Classic Written Exam Questions of Dynamic Memory Management
Database experiment five backup and recovery
SQL(一) —— 增删改查
【过一下4】09-10_经典网络解析
Matplotlib(三)—— 实践
vscode+pytorch use experience record (personal record + irregular update)
随机推荐
Redux
1.3 mysql批量插入数据
The fourth back propagation back propagation
Returned object not currently part of this pool
Lecture 4 Backpropagation Essays
RL强化学习总结(一)
day11-函数作业
第三讲 Gradient Tutorial梯度下降与随机梯度下降
Geek卸载工具
NodeJs接收上传文件并自定义保存路径
human weakness
学习总结week2_2
Qt produces 18 frames of Cupid to express his love, is it your Cupid!!!
Multi-threaded query results, add List collection
【过一下9】卷积
Map、WeakMap
day7-列表作业(1)
解决端口占用问题
学习总结week3_4类与对象
RDD和DataFrame和Dataset