当前位置:网站首页>Flink Distributed Cache 分布式缓存
Flink Distributed Cache 分布式缓存
2022-08-05 05:14:00 【bigdata1024】
Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件。此功能可用于共享文件,包含静态的外部数据,例如字典或者machine-learned回归模型。
此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。当程序执行,Flink自动将文件或者目录复制到所有worker节点的本地文件系统。用户函数可以查找文件或者目录通过这个指定的名称,然后从worker节点的本地文件系统访问它。
使用分布式缓存 如下示例:
java代码:
在ExecutionEnvironment中注册文件或者目录
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 从hdfs注册一个文件
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
// 注册一个本地可执行的脚本文件
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
// 定义程序代码 并且执行
...
DataSet<String> input = ...
DataSet<Integer> result = input.map(new MyMapper());
...
env.execute();
在用户函数中访问缓存文件或者目录(这里是一个map函数)。这个函数必须继承RichFunction,因为它需要使用RuntimeContext读取数据
// 继承RichFunction 为了获取RuntimeContext
public final class MyMapper extends RichMapFunction<String, Integer> {
@Override
public void open(Configuration config) {
// 通过RuntimeContext 和 DistributedCache访问缓存文件
File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
// 读取文件(或者本地目录)
...
}
@Override
public Integer map(String value) throws Exception {
// 使用缓存文件的内容做一些处理
...
}
}
scala代码:
在ExecutionEnvironment中注册文件或者目录
val env = ExecutionEnvironment.getExecutionEnvironment
// 从hdfs注册一个文件
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
// 注册一个本地可执行的脚本文件
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
// 定义程序代码 并且执行
...
val input: DataSet[String] = ...
val result: DataSet[Integer] = input.map(new MyMapper())
...
env.execute()
在用户函数中访问缓存文件或者目录(这里是一个map函数)。这个函数必须继承RichFunction,因为它需要使用RuntimeContext读取数据
// 继承RichFunction 为了获取RuntimeContext
class MyMapper extends RichMapFunction[String, Int] {
override def open(config: Configuration): Unit = {
// 通过RuntimeContext 和 DistributedCache访问缓存文件
val myFile: File = getRuntimeContext.getDistributedCache.getFile("hdfsFile")
// 读取文件(或者本地目录)
...
}
override def map(value: String): Int = {
// 使用缓存文件的内容做一些处理
...
}
}
获取更多大数据资料,视频以及技术交流请加群:
边栏推荐
- Returned object not currently part of this pool
- 【Untitled】
- The underlying mechanism of the class
- Algorithms - ones and zeros (Kotlin)
- 序列基础练习题
- 位运算符与逻辑运算符的区别
- Flink accumulator Counter 累加器 和 计数器
- Community Sharing|Tencent Overseas Games builds game security operation capabilities based on JumpServer
- OFDM Lecture 16 5 -Discrete Convolution, ISI and ICI on DMT/OFDM Systems
- 物理层的接口有哪几个方面的特性?各包含些什么内容?
猜你喜欢
Algorithms - ones and zeros (Kotlin)
DOM and its applications
Pycharm中使用pip安装第三方库安装失败:“Non-zero exit code (2)“的解决方法
CAP+BASE
[Go through 8] Fully Connected Neural Network Video Notes
Flutter real machine running and simulator running
Lecture 4 Backpropagation Essays
[Go through 3] Convolution & Image Noise & Edge & Texture
第5讲 使用pytorch实现线性回归
Convert the paper official seal in the form of a photo into an electronic official seal (no need to download ps)
随机推荐
Using pip to install third-party libraries in Pycharm fails to install: "Non-zero exit code (2)" solution
软件设计 实验四 桥接模式实验
【Transfer】What is etcd
1.3 mysql batch insert data
Using QR codes to solve fixed asset management challenges
Matplotlib(二)—— 子图
【记一下1】2022年6月29日 哥和弟 双重痛苦
【过一下10】sklearn使用记录
对数据排序
Structured Light 3D Reconstruction (2) Line Structured Light 3D Reconstruction
BFC(Block Formatting Context)
学习总结week3_2函数进阶
The software design experiment four bridge model experiment
Opencv中,imag=cv2.cvtColor(imag,cv2.COLOR_BGR2GRAY) 报错:error:!_src.empty() in function ‘cv::cvtColor‘
Basic properties of binary tree + oj problem analysis
[Student Graduation Project] Design and Implementation of the Website Based on the Web Student Information Management System (13 pages)
鼠标放上去变成销售效果
【After a while 6】Machine vision video 【After a while 2 was squeezed out】
Geek卸载工具
day10-字符串作业