当前位置:网站首页>Spark的RDD(弹性分布式数据集)返回大结果集
Spark的RDD(弹性分布式数据集)返回大结果集
2022-07-06 09:29:00 【若苗瞬】
(一)过大的结果集导致Driver端OOM错误
很早之前《从零开始学习大数据平台》就提到了这个问题。
通过collect()
返回结果集,数据量太大就会报driver端内存错误。
比如我这里,返回千万数据,就导致Out of memory,或者GC overhead limit exceeded。
Spark的核心弹性分布式数据集,对于rdd.collect()
是这么描述的:
java.util.List collect() //Python版本差不多的
返回一个数组包含RDD中所有的元素
.
备注
.
这个方法只能在结果集较小的情况下使用,因为所有的数据都会加载到driver的内存中。
(二)各种解决方案
如果结果集大怎能办?
网上查了一圈,大家的说法有:
- 大的数据集用rdd写入HDFS文件的方法,比如
rdd.saveAsTextFile
,rdd.saveAsNewAPIHadoopFile
各个节点将结果写入到HDFS的目录中(一大堆文件)。 - 不传到driver上而是
rdd.foreach
,rdd.foreachPartition
打印到屏幕上。 - 通过可序列化的类型,发送到数据库等等。
(三)分批返回结果集
不过我比较懒,希望整个流程和以前无缝切换,
所以我采用了collectPartitions
,分批返回数据。
Java代码如下:
...
if (ColBatch < out_2.getNumPartitions()) {
//如果设置了批量值,并小于分区数,则分批collect
int[] Par = new int[ColBatch];
for (int i = 0; i < out_2.getNumPartitions(); i += Par.length) {
int ParLen = 0;
for (int j = 0; j < Par.length; j++) {
if (i + j < out_2.getNumPartitions()) {
Par[j] = i + j;
ParLen++;
}
}
TmpLine = String.format("当前: %d - %d\n", i, i + ParLen - 1);
System.out.print(TmpLine);
List<Tuple2<String, String>>[] output2 = out_2.collectPartitions(Par);
for (int j = 0; j < ParLen; j++) {
for (Tuple2<String, String> tuple : output2[j]) {
oF002.write(String.format("%s|%s\n", tuple._1(), tuple._2()));
g002++;
}
}
}
} else {
List<Tuple2<String, String>> output1 = out_2.collect();
for (Tuple2<String, String> tuple : output1) {
oF002.write(String.format("%s|%s\n", tuple._1(), tuple._2()));
g002++;
}
}
...
PS:但Python中没有找到collectPartitions
方法啊???肿么办!!! 。
边栏推荐
- Li Kou - 298th weekly match
- (POJ - 1458) common subsequence (longest common subsequence)
- QT implementation window gradually disappears qpropertyanimation+ progress bar
- (lightoj - 1323) billiard balls (thinking)
- Codeforces Round #802(Div. 2)A~D
- Installation and configuration of MariaDB
- Read and save zarr files
- Flag framework configures loguru logstore
- Market trend report, technological innovation and market forecast of double door and multi door refrigerators in China
- Kubernetes集群部署
猜你喜欢
随机推荐
Acwing - game 55 of the week
Log statistics (double pointer)
pytorch提取骨架(可微)
(POJ - 3685) matrix (two sets and two parts)
Calculate the time difference
Effet d'utilisation, déclenché lorsque les composants de la fonction sont montés et déchargés
Kubernetes集群部署
Problem - 922D、Robot Vacuum Cleaner - Codeforces
input 只能输入数字,限定输入
指定格式时间,月份天数前补零
Codeforces - 1526C1&&C2 - Potions
Pull branch failed, fatal: 'origin/xxx' is not a commit and a branch 'xxx' cannot be created from it
Research Report of desktop clinical chemical analyzer industry - market status analysis and development prospect prediction
顺丰科技智慧物流校园技术挑战赛(无t4)
Codeforces Round #800 (Div. 2)AC
js时间函数大全 详细的讲解 -----阿浩博客
Generate random password / verification code
useEffect,函数组件挂载和卸载时触发
window11 conda安装pytorch过程中遇到的一些问题
Advancedinstaller安装包自定义操作打开文件