当前位置:网站首页>spark中saveAsTextFile如何最终生成一个文件
spark中saveAsTextFile如何最终生成一个文件
2022-07-25 15:10:00 【南风知我意丿】
项目场景:
- 一般而言,
saveAsTextFile会按照执行task的多少生成多少个文件,比如part-00000一直到part-0000n,n自然就是task的个数,亦即是最后的stage的分区数。那么有没有办法最后只生成一个文件,而不是成百上千个文件了?答案自然是有办法。
误区
在RDD上调用
coalesce(1,true).saveAsTextFile(), 意味着做完计算之后将数据汇集到一个分区,然后再执行保存的动作,显然,一个分区,Spark自然只起一个task来执行保存的动作,也就只有一个文件产生了。又或者,可以调用repartition(1),它其实是coalesce的一个包装,默认第二个参数为true。事情最终就这么简单吗?
显然不是。你虽然可以这么做,但代价是巨大的。因为Spark面对的是大量的数据,并且是并行执行的,如果强行要求最后只有一个分区,必然导致大量的磁盘IO和网络IO产生,并且最终执行reduce操作的节点的内存也会承受很大考验。Spark程序会很慢,甚至死掉。这往往是初学Spark的一个思维陷阱,需要改变原先那种单线程单节点的思维,对程序的理解要转变多多个节点多个进程中去,需要熟悉多节点集群自然产生多个文件这种模式。
此外,saveAsTextFile要求保存的目录之前是没有的,否则会报错。所以,最好程序中保存前先判断一下目录是否存在。
当我运行完一个Spark程序想把结果保存为saveAsTextFile,
结果使用Hadoop fs -ls /output后发现里面有一系列的part,好几千个。原因:
运行Spark的时候把数据分成了很多份(partition),每个partition都把自己的数据保存成partxxx文件形式。
如果想保存为一份的话,就要:
先collect
或者data.coalesce(1,true).saveAsTextFile()
也或者
data.repartition(1).saveAsTextFile( ) //You can also use repartition(1), which is just a wrapper for coalesce() with the suffle argument set to true.
data.repartition(1).saveAsTextFile( “HDFS://OUTPUT”)
- 问题:
但是如果你的数据很大,难以在单机内存上装下,以上操作可能会造成单机内存不足(OOM)
原因在于以上操作都是讲分布在各个机器上的数据汇总到单机,然后再保存到磁盘(HDFS)上。
以上操作将各个机器上的RDD partition 合并到单一主机后再读入磁盘。
解决方案:
以下给出更安全的操作,即采用HDFS磁盘合并操作。
如果已经存了很多个part:
可以把大文件夹getmerge:
把HDFS 上的多个文件 合并成一个 本地文件:
hadoop fs -getmerge /hdfs/output /local/file.txt
也可以:
hadoop fs -cat /hdfs/output/part-r-* > /local/file.txt
边栏推荐
- LeetCode_ Factorization_ Simple_ 263. Ugly number
- 用setTimeout模拟setInterval定时器
- dpdk 收发包问题案例:使用不匹配的收发包函数触发的不收包问题定位
- How many ways can you assign initial values to a two-dimensional array?
- About RDBMS and non RDBMS [database system]
- Login of MySQL [database system]
- 任务、微任务、队列和调度(动画展示每一步调用)
- 图片裁剪cropper 示例
- Scala111-map、flatten、flatMap
- Deployment and simple use of PostgreSQL learning
猜你喜欢

Overview of cloud security technology development

32 chrome调试工具的使用

基于OpenCV和YOLOv3的目标检测实例应用

"Ask every day" reentrantlock locks and unlocks

L1 and L2 regularization

pl/sql 创建并执行oralce存储过程,并返回结果集

39 simple version of millet sidebar exercise

API health status self inspection

延迟加载源码剖析:

如何解决Visual Stuido2019 30天体验期过后的登陆问题
随机推荐
Universal smart JS form verification
37 元素模式(行内元素,块元素,行内块元素)
[C topic] the penultimate node in the Niuke linked list
Process control (Part 1)
密码强度验证示例
Raft of distributed consistency protocol
vscode 插件篇收集
Content type corresponding to office file
SPI传输出现数据与时钟不匹配延后问题分析与解决
Scala111-map、flatten、flatMap
C, c/s upgrade update
[Nacos] what does nacosclient do during service registration
When using jetty to run items, an error is reported: form too large or form too many keys
PHP implements non blocking (concurrent) request mode through native curl
【JS高级】js之正则相关函数以及正则对象_02
sql to linq 之存储过程偏
SSM Advanced Integration
如何解决Visual Stuido2019 30天体验期过后的登陆问题
我的创作纪念日
Copy files / folders through Robocopy