当前位置:网站首页>Flink accumulator Counter 累加器 和 计数器
Flink accumulator Counter 累加器 和 计数器
2022-08-05 05:14:00 【bigdata1024】
Accumulators(累加器)是非常简单的,通过一个add操作累加最终的结果,在job执行后可以获取最终结果
最简单的累加器是counter(计数器):你可以通过Accumulator.add(V value)这个方法进行递增。在任务的最后,flink会吧所有的结果进行合并,然后把最终结果发送到client端。累加器在调试或者你想更快了解你的数据的时候是非常有用的。
Flink现在有一下内置累加器。每个累加器都实现了Accumulator接口。
- IntCounter, LongCounter 和 DoubleCounter:下面是一个使用计数器的例子。
- Histogram(直方图):针对离线箱子数量的直方图实现。内部是一个map,从integer到integer。你可以使用它来计算分布式的值。例如:分布式的单词统计程序。
如何使用累加器
第一步你需要在你想要使用的地方创建一个自定义的transformation算子,在算子中创建一个累加器对象(在这是一个counter)。
private IntCounter numLines = new IntCounter();第二步你需要注册这个累加器对象,通常在rich函数的open方法里面。在这里你也可以定义一个名字
getRuntimeContext().addAccumulator("num-lines", this.numLines);
现在你可以在任何操作中使用这个累加器,包含open方法和close方法
this.numLines.add(1);最终的总结果将会存储在从execute()中返回的JobExecutionResult对象中。(这个操作需要等待任务执行完成)
myJobExecutionResult.getAccumulatorResult("num-lines")所有的累加器在一个job任务中都是只共享一个单独的命名空间,所以你可以在你的job任务中的不同算子函数中使用相同的累加器,flink内部会合并所有名称相同的累加器。
在使用累加器和迭代器的时候注意:目前累加器的最终结果只能在任务执行结束之后才能获得。我们也计划在下一次迭代的时候返回上一次迭代的结果。你可以使用聚合器来计算每个迭代统计等数据
自定义累加器
实现自定义的累加器只需要实现Accumulator接口即可。随意创建一个pull请求,如果你认为你的自定义累加器应该附加在flink上面。
你可以选择实现Accumulator 或者 SimpleAccumulator
Accumulator<V,R>是最灵活的。它定了一个V类型的值可以进行累加,和一个R类型的值作为最终结果。例如:针对一个直方图,V是一个数字,R是一个直方图。
SimpleAccumulator的情况下,这两种类型都是一样的,例如:counters累加器。
获取更多大数据资料,视频以及技术交流请加群:

边栏推荐
猜你喜欢

数据库实验五 备份与恢复
![[Study Notes Dish Dog Learning C] Classic Written Exam Questions of Dynamic Memory Management](/img/0b/f7d9205c616f7785519cf94853d37d.png)
[Study Notes Dish Dog Learning C] Classic Written Exam Questions of Dynamic Memory Management

Matplotlib(三)—— 实践

Detailed Explanation of Redis Sentinel Mode Configuration File

OFDM 十六讲 5 -Discrete Convolution, ISI and ICI on DMT/OFDM Systems

第二讲 Linear Model 线性模型

Algorithms - ones and zeros (Kotlin)

数据库 单表查询

Qt produces 18 frames of Cupid to express his love, is it your Cupid!!!

RL reinforcement learning summary (1)
随机推荐
2022杭电多校第一场01
【过一下8】全连接神经网络 视频 笔记
第四讲 反向传播随笔
02.01-----参数的引用的作用“ & ”
学习总结week2_4
A blog clears the Redis technology stack
Multi-threaded query results, add List collection
【过一下10】sklearn使用记录
[WeChat applet] WXML template syntax - conditional rendering
Map、WeakMap
day6-列表作业
有用番茄来监督自己的同道中人吗?加一下我的自习室,一起加油
【过一下9】卷积
UVA10827
Database experiment five backup and recovery
Xiaobai, you big bulls are lightly abused
【Over 16】Looking back at July
2022 Hangzhou Electric Multi-School 1st Session 01
redis 缓存清除策略
day11-函数作业