当前位置:网站首页>Flink Distributed Cache 分布式缓存
Flink Distributed Cache 分布式缓存
2022-08-05 05:14:00 【bigdata1024】
Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件。此功能可用于共享文件,包含静态的外部数据,例如字典或者machine-learned回归模型。
此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。当程序执行,Flink自动将文件或者目录复制到所有worker节点的本地文件系统。用户函数可以查找文件或者目录通过这个指定的名称,然后从worker节点的本地文件系统访问它。
使用分布式缓存 如下示例:
java代码:
在ExecutionEnvironment中注册文件或者目录
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 从hdfs注册一个文件
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
// 注册一个本地可执行的脚本文件
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
// 定义程序代码 并且执行
...
DataSet<String> input = ...
DataSet<Integer> result = input.map(new MyMapper());
...
env.execute();
在用户函数中访问缓存文件或者目录(这里是一个map函数)。这个函数必须继承RichFunction,因为它需要使用RuntimeContext读取数据
// 继承RichFunction 为了获取RuntimeContext
public final class MyMapper extends RichMapFunction<String, Integer> {
@Override
public void open(Configuration config) {
// 通过RuntimeContext 和 DistributedCache访问缓存文件
File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
// 读取文件(或者本地目录)
...
}
@Override
public Integer map(String value) throws Exception {
// 使用缓存文件的内容做一些处理
...
}
}
scala代码:
在ExecutionEnvironment中注册文件或者目录
val env = ExecutionEnvironment.getExecutionEnvironment
// 从hdfs注册一个文件
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
// 注册一个本地可执行的脚本文件
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
// 定义程序代码 并且执行
...
val input: DataSet[String] = ...
val result: DataSet[Integer] = input.map(new MyMapper())
...
env.execute()在用户函数中访问缓存文件或者目录(这里是一个map函数)。这个函数必须继承RichFunction,因为它需要使用RuntimeContext读取数据
// 继承RichFunction 为了获取RuntimeContext
class MyMapper extends RichMapFunction[String, Int] {
override def open(config: Configuration): Unit = {
// 通过RuntimeContext 和 DistributedCache访问缓存文件
val myFile: File = getRuntimeContext.getDistributedCache.getFile("hdfsFile")
// 读取文件(或者本地目录)
...
}
override def map(value: String): Int = {
// 使用缓存文件的内容做一些处理
...
}
}
获取更多大数据资料,视频以及技术交流请加群:

边栏推荐
- Community Sharing|Tencent Overseas Games builds game security operation capabilities based on JumpServer
- ESP32 485 Illuminance
- 【过一下11】随机森林和特征工程
- 分布式和集群
- Error creating bean with name 'configDataContextRefresher' defined in class path resource
- Flink accumulator Counter 累加器 和 计数器
- 机器学习(二) —— 机器学习基础
- 【过一下6】机器视觉视频 【过一下2被挤掉了】
- SQL(一) —— 增删改查
- 【练一下1】糖尿病遗传风险检测挑战赛 【讯飞开放平台】
猜你喜欢

OFDM 十六讲 5 -Discrete Convolution, ISI and ICI on DMT/OFDM Systems

Matplotlib(二)—— 子图

Difference between for..in and for..of

【过一下12】整整一星期没记录

Using pip to install third-party libraries in Pycharm fails to install: "Non-zero exit code (2)" solution

RL reinforcement learning summary (1)

vscode+pytorch use experience record (personal record + irregular update)

【NFT网站】教你制作开发NFT预售网站官网Mint作品

CAP+BASE

多线程查询结果,添加List集合
随机推荐
The mall background management system based on Web design and implementation
day9-字符串作业
第5讲 使用pytorch实现线性回归
【过一下14】自习室的一天
What are the characteristics of the interface of the physical layer?What does each contain?
Multi-threaded query results, add List collection
server disk array
Flink HA安装配置实战
转正菜鸟前进中的经验(废话)之谈 持续更新中... ...
【过一下11】随机森林和特征工程
【技能】长期更新
【Over 16】Looking back at July
学习总结week2_3
[Go through 10] sklearn usage record
RL reinforcement learning summary (1)
Returned object not currently part of this pool
Opencv中,imag=cv2.cvtColor(imag,cv2.COLOR_BGR2GRAY) 报错:error:!_src.empty() in function ‘cv::cvtColor‘
位运算符与逻辑运算符的区别
pycharm中调用Matlab配置:No module named ‘matlab.engine‘; ‘matlab‘ is not a package
HQL语句执行过程