当前位置:网站首页>Flink Broadcast 广播变量
Flink Broadcast 广播变量
2022-08-05 05:14:00 【bigdata1024】
Broadcast 广播变量:一句话解释,可以理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的任务在节点上都能够获取到,这个数据在每个节点上只会存在一份。如果不使用broadcast,则在每个节点中的每个任务中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。
Broadcast variables允许你创建一个数据集在所有的并行操作节点都能获取到,除了常规的输入操作。针对一些小的依赖数据集,这种方式是非常有用的,这个data set数据集将会作为一个Collection集合被操作访问。
- Broadcast(广播数据) :通过withBroadcastSet(DataSet, String) 进行广播数据,并给这份数据起名字
- Access(获取数据):通过getRuntimeContext().getBroadcastVariable(String)获取广播出去的数据
java代码示例
// 1. 准备等待广播的DataSet数据
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);
DataSet<String> data = env.fromElements("a", "b");
data.map(new RichMapFunction<String, String>() {
@Override
public void open(Configuration parameters) throws Exception {
// 3. 获取广播的DataSet数据 作为一个Collection
Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
}
@Override
public String map(String value) throws Exception {
...
}
}).withBroadcastSet(toBroadcast, "broadcastSetName"); // 2. 广播DataSet
scala代码示例
// 1. 准备等待广播的DataSet数据
val toBroadcast = env.fromElements(1, 2, 3)
val data = env.fromElements("a", "b")
data.map(new RichMapFunction[String, String]() {
var broadcastSet: Traversable[String] = null
override def open(config: Configuration): Unit = {
// 3. 获取广播的DataSet数据 作为一个Collection
broadcastSet = getRuntimeContext().getBroadcastVariable[String]("broadcastSetName").asScala
}
def map(in: String): String = {
...
}
}).withBroadcastSet(toBroadcast, "broadcastSetName") // 2. 广播DataSet
确保在广播的时候,和获取数据的时候使用相同的broadcastSetName名称。
注意:
1:广播出去的变量存在于每个节点的内存中,所以这个数据集不能太大。因为广播出去的数据,会常驻内存,除非程序执行结束。
2:广播变量在初始化广播出去以后不支持修改,这样才能保证每个节点的数据都是一致的。
个人建议:如果数据集在几十兆或者百兆的时候,可以选择进行广播,如果数据集的大小上G的话,就不建议进行广播了。
获取更多大数据资料,视频以及技术交流请加群:
边栏推荐
猜你喜欢
【过一下3】卷积&图像噪音&边缘&纹理
DOM及其应用
Geek卸载工具
Lecture 3 Gradient Tutorial Gradient Descent and Stochastic Gradient Descent
Convert the paper official seal in the form of a photo into an electronic official seal (no need to download ps)
Develop a highly fault-tolerant distributed system
coppercam primer [6]
【Transfer】What is etcd
shell函数
Error creating bean with name 'configDataContextRefresher' defined in class path resource
随机推荐
【过一下12】整整一星期没记录
jvm three heap and stack
What field type of MySQL database table has the largest storage length?
将照片形式的纸质公章转化为电子公章(不需要下载ps)
coppercam primer [6]
redis事务
Wise Force Deleter强制删除工具
My 的第一篇博客!!!
数据库实验五 备份与恢复
[Go through 10] sklearn usage record
怎么更改el-table-column的边框线
第四讲 back propagation 反向传播
day6-列表作业
【过一下15】学习 lstm的一周
序列基础练习题
【过一下8】全连接神经网络 视频 笔记
day7-列表作业(1)
vscode+pytorch使用经验记录(个人记录+不定时更新)
【After a while 6】Machine vision video 【After a while 2 was squeezed out】
2022 The 4th C.Easy Counting Problem (EGF+NTT)