当前位置:网站首页>SparkStreaming实时数仓 问题&回答
SparkStreaming实时数仓 问题&回答
2022-06-10 12:32:00 【WK-WK】
总览
1、什么是实时计算
实时计算一般都是针对海量数据进行的,并且要求为秒级
实时计算(项目)的特点:
1、一旦启动,只要不关闭程序,程序会一直占用资源
2、数据源是实时的不间断的
3、数据量大且无法或没必要预算,但要求对用户的响应时间是实时的
补充:实时计算主要强调的是低延迟、高吞吐(同一时间并没有大量的数据进行计算)
2、衡量标准
批处理:5秒-1分钟,批处理取决于窗口的大小,以及获取数据量的大小
实时数据:微批次数据处理延迟为秒级,流数据处理延迟为毫秒级
3、举例几个用到实时计算的例子
1、分地区 访问明细
2、分地区 交易
3、分地区 订单
4、分地区 金额
(补充:实时计算主要展现的是数据是时间上的变化趋势)
4、实时计算有哪几种实现技术
Flink:流处理
clickhouse
SparkStreaming:批处理
ElasticSearch
5、单台计算的压力在哪儿
内存资源有限、线程数有限,mysql关系型数据库超过一定量的数据,需要计算很长时间
补充:
计算资源:内存、核
存储资源:磁盘
容错性:单体资源不足容易宕机
6、实时计算的架构
项目采用的是三个阶段,分别是采集阶段、实时计算阶段、即席查询阶段
实时计算阶段又分为:数据分流(维度实时)、数据聚合(业务数据补充连接)
源文件下载:https://gitee.com/wang0101gitee/spark.git
7、为什么要这样设计
要保证数据的时效性,就要尽可能的减少中间过程,但又不能过于简化,过来的数据直接进行处理,数据量太大,计算时间严重超时,为了平衡两者的关系,一般会分成这三个阶段,合理的利用资源进行运算
三层架构:
ODS:存储原始数据
DWD:各类主题数据(DIM)
DWS:汇总数据
(补充:SparkStreaming可以设置批次时间,数据量大可以调解)
第一章:采集阶段
日志数据
1、如何采集?
两种方案
1、日志数据(增量),不保留历史数据,通过flume采集送往下一层
2、追求时效性,可以直接将数据送往下一层
a.web服务
b.nginx
2、两种方案的优劣?
1、采用flume可以实现断点续传,如果下一层的节点崩了,仍然可以将数据保留下来,等待下一层恢复
速度不如第二种,如果采用memory channel 如果flume崩溃仍然会丢失数据
2、速度快,但是安全性不高
a.web 建立连接的方式直接方式直接发送
b.nginx 可以搭建集群,单独拿出一个节点做负载均衡
3、数据送往哪儿?为什么?为什么不能直接计算?
数据会直接送往kafka存储,存储不是目的,拉取才是目的,
数据如果直接进行计算,sparkstreaming发送速率和处理数据不一致会导致数据积压
解决办法是数据可以送到一个消息队列,主要特点限流削峰,异步处理,因此可以选用的组件只有kafka,以此作为原始数据的存储层
(补充:限流削峰是队列特点,Kafka专为大数据设计,吞吐量大、可用性高【分布式架构】)
如果sparkstreaming直接进行计算,只能拉取数据,日志数据如果落盘的话,会影响效率,读写快,顺序写磁盘
4、日志在kafka中如何存储的?为什么要这么存储?
1、日志服务器直接发送到kafka,属于后端的任务,不需要进行任何处理
2、日志按原始结构发送到kafka成为一个主题(同时进行分区,以便分布式计算)
5、主题为什么要分区?
增加计算的并行度
如何避免数据倾斜?
用mid做分区键 为什么不用userid,启动未登录的用户可能为空
业务数据
1、业务采集方式?
1、增量采集
2、全量采集
2、业务数据增量同步怎么实现的?详述?
– 技术
开启mysql的binlog,binlog会记录用户对mysql的增、删、改的每一步操作;
可以利用这种功能实现增量同步,开启binlog只是在本地生成的文件,将数据同步到kafka,需要一些特定的工具,如maxwell和canal
具体的实现原理
都是伪装成从机,mysql开启mysql主从复制,会将数据传给对应组件
传输的数据结构前后会有变化吗?
发生了改变,具体改变由组件定义
3、这部分会不会丢数据
两个组件都具备断点续传的功能,都拥有自己维护的偏移量,崩了之后,不会丢失数据
binlog的不同模式?
1、row:
每次记录变化前后的内容
优点:数据绝对一致,
缺点:浪费空间
2、statement
每一次执行写操作的语句
优点:节省空间
缺点:无法保证数据一致 如 两个地方各自执行语句,如now()
3、mixedlevel
两个混合在一起,默认时statement,以下几种情况会使用row
1、包含UUID()
2、包含 AUTO_INCREMENT 字段的表被更新时;
3、Insert delayed 语句
4、UDF时
优点:节省空间 同时兼顾一致性
缺点:某些情况下依旧会造成数据不一致
4、为什么会有一个全量导入的过程?
全量导入主要是为了维度表的数据,以后的计算可能会用到维度表的数据,增量导入不能导入历史数据
5、全量导入的实现方式?
1、datax
传过来的数据结构,会适应目标容器的数据格式
2、maxwell的bootstrap
同步过来的数据格式会发生变化,只读表,数据操作类型出开始和结束之外,其类型全为insert
6、全量同步的频率
实时计算,一般只用同步一次
当维度数据发生变化时
7、全量同步时,程序挂掉了,会不会有重复数据
会有重复数据,但是因为数据量小,不会影响计算
(补充:同步数据也可以使用幂等性操作避免数据重复)
8、避免数据倾斜
增量同步可以定义分区主键
全量同步没多少数据,可以不考虑
kafka存储
1、主要作用?为什么选用kafka?
上面已经回答了
作用:解耦、数据削峰、高吞吐量、适用性高(分布式框架)、分区(分布式计算,提高并行度)
2、kafka数据的存放?
1个主题多少个分区多少个副本
多少分区取决于有后面有多少消费者 峰值/消费者数
副本数量取决于是否要考虑安全
3、kafka接受的数据会不会产生重复?
幂等性
ack
4、kafka某个节点的进程挂掉了,数据会丢失吗?
多副本不会,如果分区数量刚好等于kafka的节点数量,处理数据会有所下降
5、吞吐量
可以使用测试脚本测以下本地每分钟的读写速度
6、kafka监控
自己开发的监控器,开源的监控器eagle
7、kafka保存数据多长时间
跟离线项目一样保存3天
(补充:kafka默认保存时间是7天)
第二章:实时计算
关于实时
1、主流的计算方式?
spark批处理方式,这里采用的是spark批处理的方式(微批次模拟流数据)
flink流批一体
2、为什么不直接计算出结果?
如果数据量很少可以很快计算出结果
但事实计算的特点就是,低延迟,高吞吐量的分析数据
(重复)
3、还需要采用分层的结构?
要保证数据的时效性,就要尽可能的减少中间过程,但有不能过于简化,过来的数据直接进行处理,数据量太大,计算时间严重超时,为了平衡两者的关系,一般会分成这三个阶段,合理的利用资源进行运算
三层架构:
ODS:存储原始数据
DWD:各类主题数据
DWS:汇总数据
ODS -> DWD
1、ODS层的主要目的?载体是谁?
目的:保存历史数据,做为缓冲(将业务系统与数据计算解耦)
载体:kafka
选择的原因:消息队列,写的快,读的快,其他数据库承受不了怎么大的数据量、(分布式架构)、分区并行处理
2、从ODS层到DWD层,数据流向是怎么样的?
通过SparkStreaming程序进行分流,对于不同的数据源分别编写不同的程序来处理
日志数据:根据不同的页面信息分配到kafka不同的主题
业务数据:对事实表和维度表分流(维度表数据流向Redis)
3、如何做到精确一次消费?
再程序不崩溃的情况下,采用后置提交偏移量的方法,可以保证不丢失数据
解决数据重复可以使用容器本身的 幂等性来解决,或者可以用事务操作,插入和更新偏移量都成功才算成功
项目解决数据重复采用的是幂等性
(补充:如果目标容器不进行统计操作,可允许数据重复,减少计算压力)
4、在哪儿维护偏移量?为什么?
项目中采用的是redis,因为要保证精准一次消费,偏移量的维护需要靠容器本身的幂等性来解决问题
容器的选择有多种可以选择 如
1、mysql replace是幂等的
2、hbase put rowkey 是幂等的
3、redis set类型 hset类型 是幂等的
4、hive insert overwrite 幂等操作
选用redis,读写快,缺点:崩了会丢失数据
偏移量存储的类型 hash 为什么用hash?
5、精确一次生产数据?
在程序不崩的前提下:精准一次 = ack(-1) +生产者开启幂等性
6、不同场景下,存放偏移量容器的选择?
7、redis崩了,会有什么后果?
redis崩了,程序获取redis连接失败,程序运行失败,再次运行会有重复数据吗?
8、对业务数据的操作?
程序对维度表和事实表进行分流,同时也每张表分别存储
对于维度数据:除用户表之后变化不大,大部分是宽表,可以选用数据库进行长期存储
对于事实数据:数据量多,且不断变化,因此存在在kafka中
(补充:维度数据涉及业务需求计算的各种条件要求,读取比较频繁)
9、维度表和事实表的存储
维度表可以选择的容器有:
1、redis 读取最快
2、hbase 存储的量比较大
3、mysql 介于两者之间,成本最便宜
项目中选用redis,原因,读取快,不容易发生故障
实时表kafka
10、有没有可以优化的点?举例说明
redis集群提高容错
11、DWD层的主要目的?载体是谁?
用来存储不断变化的事实表,载体是kafka
12、如何区分业务表是维度表还是事实表
程序无法自动区分,需要人工干预,可以在程序中指定哪些是维度表,哪些是事实表
这样做的后果,以后出现新的表,需要改动原来的代码,因此我们选用将事实表和维度表的信息存放到别的容器中,如mysql、hbase、redis
项目中用的是redis 定义维度表名key和事实表名key
(补充:关键点在于,因为可能随时修改这些信息,所以在执行每次计算时,都需要读取一遍信息,以此达到及时更新的要求)
13、写入redis的数据能否保证有序
需要全程分析
1、从数据源到kafka,数据是有序的(在kafka中,分区内有序,分区间无序)
2、程序读取数据的时候,因为存在多分区多消费者的情况,不能保证从分区中消费顺序
解决:可以保证局部有序,在一个分区内的数据是有序的
在canal传入到kafka中的时候,可以定义数据按分区键传输
(补充:将需要保证有序的数据发送到一个分区内,比如用户访问页面顺序,就可通过mid_id作为分区键)
14、广播变量(对数据资源读取的优化)
每个RDD都需要从redis里面读取表名,每个RDD的分区都会读取一次,浪费资源
优化手段,在driver中读取表名,封装成广播变量分发给每个Executor
15、在逻辑代码中,对不同资源获取与释放的设计,如何实现?
外部资源:Kafka、Redis
具体操作:计算数据、偏移量、维度数据、静态资源
设计原理:根据对资源使用时所处的环境(driver、executor)不同,灵活使用不同的算子将环境加以区分。
DWD->DWS
为什么要写入到DWS中?程序不能直接得到结果吗?
数据量还是多,sparkStreaming做一些聚合操作,涉及到shuffle的计算还是慢
为了加快查询效率放到OLAP中
DWS的载体?为什么?
载体主要是OLAP分析工具,本次项目中采用ES
优点:
1、OLAP 计算能力更强 能够管理的数据量大 快
2、聚合
3、范围数据过滤
4、OLAP 开发一个业务指标比较快
5、列式存储,便于聚合
缺点
1、关联Join
2、复杂的语义表达
3、窗口函数或某种算法
4、机器学习
5、外部连接
这当中SparkStreaming起到了什么作用?
解决ES不擅长的部分
如何根据业务编写程序?举两个例子
1、首先明确数据在展示中的度量
2、确定有关度量的维度信息
3、获取数据,进行关联数据补充等操作
4、对完整数据进行聚合查询
例如:
1 生成日活的明细表(去重、维度关联)
2 生成下单业务的明细表(维度关联 、双流join)
对数据去重?
思路:存放状态?来一条记一条
实现:1、spark中有自己维护的状态,但是存放再hdfs上,读取比较麻烦
2、采用其他容器,redis、hbase、mysql
可能出现的问题:spark程序可以多线程并行执行,如果同时修改共享资源,可能出现乱序的情况
解决方案:加锁 -》太麻烦
原子操作或者事务操作
项目中去重为了保证数据唯一,可采取幂等性解决,选用redis的set操作判存和写入一致操作
维度关联?
维度数据存放再其他的容器中,拼接时需要从其他容器中获取,形成新的对象,然后返回
维度拼接可能产生的问题?
字段类型不匹配,维度表中没有事实数据所对应的值,可能会报空指针异常
对象.属性或者对象.方法
双流Join的实现思路?
思路:
所谓的双流join就是拼接两个流之间的数据
如果一个流的key都在一个分区当中,join两个流的时候不会发生shuffle,项目中是根据key来分区的,所以相同key的数据都在一个分区当中
可能出现的问题:
数据迟到,即本在该在第一批次的数据,发生了延迟数据发送到了第二批次中,因此为了让数据能够正确匹配
解决思路:
1、滑动窗口、数据有重复
2、中间数据做缓存,如果没有数据就放到一个容器中
容器的选择:因为流不能一直等待,数据可能晚来几分种,不需要一直在缓存种存在
因此容器最好是设置过期时间的,项目种选用redis
具体实现:
join两个流的前提是k-v类型的流,所以join前需要用map改变一下结构
双流join是否存在数据丢失的情况?如何避免?
如果数据迟迟不到,缓存中相对应的数据已经达到过期时间,数据在来,会丢失数据
双流join是否存在数据重复的情况?如何避免?
如果主流的key,对应副流的多个key
副流的key,对应主流的多个key
需要根据业务判断
程序有没有shuffle,什么时候会有shuffle
为了避免数据倾斜,key不在同一个分区里面,使用join的时候会发生数据倾斜
从DWD->DWS过程中存不存在数据丢失的情况?
存在,去重的时候写入缓存,但是在写入ES的程序崩溃,数据再次进来的时候,会先查redis的值,发现redis里面有值,说明数据来过,数据不会继续往下面走,这时这条数据会丢失?
解决方案:
采用状态还原的方法解决,不确定redis里面的值是否准确,可以在程序开始的时候查询ES已经存在的值,根据存在的值重写写入redis。这时不会发生数据丢失
从DWD->DWS过程中存不存在数据重复的情况?
双流Join
SparkStreaming程序崩了,重启之后会不会对业务产生影响
不会,正常执行,得益于先检查ES在写入redis的机制,数据不会丢失,
redis 在这部分承担的作用
1、偏移量
2、存放去重数据
3、存放缓存数据
4、读取维度属性
写入ES时,如何确保数据精确一次?
采用put写入,数据去重
写入ES时,redis发生了故障,后果是什么?
1、去重数据没有了
2、维度数据没有了 需要重新导入
3、缓存数据没有了
去重的数据可能重复写入ES,缓存的数据可能会丢失
ES对数据的规划
1、先设计索引模板
2、每天一张索引
3、不同的业务不同的索引
写入ES之后,算结束了吗?
我们的目的是提供给用户直观的数据,得到的数据用户可以直接使用
第三章:即席查询
1、什么是即席查询
即席查询是用户根据自己的需求,灵活的选择查询条件,系统根据用户的选择生成相应的统计报表。
2、OLAP的选择,除ES还有其他选择吗?
clickhouse kylin
3、简述以下ES的结构?特点(倒排索引)列式存储?
正排索引:当用户发起请求时,搜索引擎会扫描索引库中的文档,找出所有包含关键词的文档,然后再去文档中查找是否含有关键词的方法叫做正向索引 如sql中的like
倒排索引:数据插入时,会对数据分词,生成分词所对应的文档ID列表,查询时,会获取词所对应的ID号
4、倒排索引的结构?
term index(词索引) + term dictionary(字典) + posting list(文档列表)
term index:内存中
二叉树结构 快速定位分词
term dictionary:内存中
存储已经存在的所有单词
posting list:磁盘文件中
存储一个单词所对应出现这个单词的ID列表
5、ES分片的存放规则?
1、一张索引可以设置多个分片
2、在创建的时候设置分片之后不可改变
3、分片分配逻辑 通过 hash(routing) % number_of_primary_shards 其中 routing是一个可变值,默认是文档的id
4、一个ES节点的堆内存来说,不要超过32G,即最好不要超过1000个分片
6、ES写过程
1、用户请求ES所在节点(协调节点)
2、协调节点会向索引主分片发起写入请求
3、主分片写入成功后,请求副分片写入
4、如果都写入成功,发送信息给协调节点,协调节点返回给用户
7、ES读过程
1、客户端发送请求到node1
2、要读取的数据在分片0上,因为每个节点都有分片0的副本
3、将请求转给node2,node2将结果返回给node1,然后返回给客户端
在读取副本的时候,存在主分片的数据写入成功,但是副分片还没有写入,这种情况下查询,副本分片会报告文档不存在,但是主分片可能返回文档
8、ES的搜索流程
两个阶段
1、query查询阶段
a. 客户端访问node3(协调节点),并创建一个优先队列,大小为from+size
b. 请求这个索引中每个分片拷贝(主分片和副本分片)
c. 每个分片会将本地查询的结果添加到排序好的优先级队列中
d. 每个分片返回文档id和所有参与排序字段的值给协调节点
e. 协调节点合并这些值到自己的优先队列中进行一个全局的排序
2、Fetch 捕获阶段
a. 协调节点辨别出哪些文档需要被取回并向相关的分片提交GET请求
b. 每个分片丰富这个文档(行内容)
c. 所有文档取回后,协调节点将结果返回给客户端
9、ES中数据的物理提交流程?
1、输入写入请求,如果数据现在不可写,会将数据放入到buffer中,同时写入translog中
2、translog会根据默认设置同步磁盘文件,此时会返回给请求端处理成功
3、1-2秒后,执行refresh,数据将buffer中的数据整理成段提交到进入到可读缓存,这时不会真正的写入,会在可读缓存中存放一定时间
4、默认是30分钟,或者translog达到上限512M,flush到磁盘中
5、磁盘当现在有分片和段文件,后台周期或者手动将这些文件合并成一个更大的分片文件
3.11 es的段合并过程?
10、ES中段合并的优化
1、后台提交:影响性能、效果不是很好
2、手动提交:每天提交一次
11、es默认分词和中文分词的区别?
ES中默认对中文分词是按一个字一个字分,但是这不符合中文的语义,于是有了第三方帮助我们进行分词
12、BI工具和可视化工具的区别?
BI它是一套完整的解决方案,用来将企业中现有的数据进行有效的整合
可视化工具将数据用图形的方式展示出来,用户可以自定义怎么展示
第四章:-- 进阶–
1、spark中的数据倾斜怎么处理?
现象:发现某个task执行很慢
原因:某个task执行的任务量很大,往往是因为join
解决:1、预聚合原始数据
2、避免shuffle
如果一定要用shuffle,可以设置shuffle并行度
3、hive进行ETL清洗
提前过滤掉可能导致数据倾斜的数据
如果数据为空,给数据一个随机key,尽量分布均匀
4、局部聚合+全局聚合
5、提高reduce并行度(提高了reduce端task数量,分到每个task的数据就减少)
6、将reduce join改为map join(广播小RDD全量数据+map)
2、Spark的shuffle种类,什么情况下决定用哪些shuffle
1、BypassMergeSortShuffleWriter
1、没有预聚合功能
2、聚合的分区数量小于等于200(默认,可以配置)
2、UnsafeShuffleWriter
1、序列化对象支持重定位操作
2、没有预聚合功能
3、聚合的分区数量小于等于16777216
3、SortShuffleWriter
不满足以上的
在Spark中有三种shuffle写,分别是BypassMergeSortShuffleWriter、UnsafeShuffleWriter、SortShuffleWriter。分别对应三种不同的shuffleHandle。
3、Spark有哪些聚合类的算子,我们应该尽量避免什么类型的算子?
groupbykey,reducebykey,countbykey,sortbykey,要避免造成shuffle的算子
4、spark on yarn 作业执行流程
自己画两遍
5、yarn-client 和 yarn cluster 有什么区别
yarn-cluster适用于生产环境;而yarn-client适用于交互和调试,也就是希望快速地看到application的输出。
driver在哪儿的区别 客户端driver在客户端中
集群环境下,driver在AM中
6、Spark为什么比Mapreduce快?
1、消除了冗余的HDFS读写
2、消除了冗余的MapReduce阶段
3、JVM的优化
7、Spark那部分容易挂?为什么?
shuffle阶段最容易挂
那么为什么会挂掉呢,这其实和Spark的shuffle机制的设计有关,假设现在有一个shuffle过程,一共有N个task,也就是说会有N个节点shuffle write(也就是mapper),N个节点shuffle read(reducer),Spark是all-to-all的设计,对于每一个reducer而言,它都需要去所有的N个mapper中拉取它所需要的数据,不难看出整体的复杂度是N*N级别的,当你的任务有2000个task时,单这一个任务,Spark集群中就会产生数百万次的网络数据传输。
8、spark是流处理吗?
不是,微批次的处理,具体处理的数据量,取决于开窗的大小
9、RDD, DAG, Stage怎么理解?
RDD 分布式数据集 只是一个结构,一个RDD有多个分区,真正的数据是在分区上执行的
- 可分区:提高消费能力,更适合并发计算,类似kafka的消费者消费数据,“一个分区对应一个task,在executor中,一个core对应一个task,这样就体现了并发计算”。
a. 弹性:变化,可变。
a、存储弹性:可以在磁盘和内存之间自动切换;“shuffle阶段,就会将数据存入磁盘中,避免数据量过大,导致任务失败。一个任务分很多个阶段,每个阶段内的运行,则是基于内存的。”
b、容错弹性:数据丢失可以自动恢复;
c、计算弹性:计算出错后重试机制;
d、分区弹性: 根据计算结果动态改变分区的数量。“每次计算以后,可能数据会减少,这样一来,就会造成数据倾斜的状况,通过动态修改分区的数量,这样就可以数据使尽量均匀分布在不同的分区内。”
b. 不可变:类似不可变集合
RDD只存储计算的逻辑,不存储数据,计算的逻辑是不可变的,一旦改变,则会创建新的RDD;
c. RDD :一个抽象类,需要子类具体实现,说明有很多种数据处理方式
10、RDD 如何通过记录更新的方式容错
两种方式
– 缓存和检查点区别
- Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。
- Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。
- 建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD
11、宽依赖、窄依赖怎么理解?
血缘关系:指的是不同算子的执行顺序
区别下游算子和上游算子的对应关系
宽依赖:上游的一个分区对应n个下游分区,存在shuffle
窄依赖:上游的一个分区只对应一个下游分区,一个下游分区可以对应多个上游分区
12、Job 和 Task 怎么理解
–几个概念
- Application:应用程序,初始化一个sparkContext就会产生一个application
- job:一个行动算子就会产生一个job
- stage:宽依赖的个数 + 1
- task:一个stage阶段中,最后一个RDD的分区个数就是task的个数。
– 特列:在转换算子中如果调用了行动算子,那么在转换算子的内部也会有job的提交。
application > job > stage > task ,每一层都是1 对N 的关系。
13、Spark 血统的概念
指的是血缘关系,即算子之间执行关系
主要用来实现分布式式环境下的数据容错
RDD Lineage被称为RDD运算图或RDD依赖关系图,是RDD所有父RDD的图。它是在RDD上执行transformations函数并创建逻辑执行计划(logical execution plan)的结果
当程序崩了,可以依靠血缘关系恢复运算
14、Transformation和action是什么?区别?举几个常用方法
spark中的算子分为两种
1、转换算子
map -》转换数据结构
flatmap-》扁平化处理
filter -》过滤
groupby -》按照指定规则分组
glom -》讲分区数据放到一个集合中
2、行动算子
reduce -》聚合
collect -》将所有分区的数据拉到driver中
count -》返回RDD中的数据个数
first -》 返回RDD的第一个元素
take -》返回RDD的前几个数据
15、简单描述缓存cache、persist和checkpoint的区别
缓存(cache/persist)
cache和persist其实是RDD的两个API,并且cache底层调用的就是persist,区别之一就在于cache不能显示指定缓存方式,只能缓存在内存中,但是persist可以通过指定缓存方式,比如显示指定缓存在内存中、内存和磁盘并且序列化等。通过RDD的缓存,后续可以对此RDD或者是基于此RDD衍生出的其他的RDD处理中重用这些缓存的数据集
容错(checkpoint)
本质上是将RDD写入磁盘做检查点(通常是checkpoint到HDFS上,同时利用了hdfs的高可用、高可靠等特征)。上面提到了Spark lineage,但在实际的生产环境中,一个业务需求可能非常非常复杂,那么就可能会调用很多算子,产生了很多RDD,那么RDD之间的linage链条就会很长,一旦某个环节出现问题,容错的成本会非常高。此时,checkpoint的作用就体现出来了。使用者可以将重要的RDD checkpoint下来,出错后,只需从最近的checkpoint开始重新运算即可使用方式也很简单,指定checkpoint的地址[SparkContext.setCheckpointDir(“checkpoint的地址”)],然后调用RDD的checkpoint的方法即可。
checkpoint与cache/persist对比
1、都是lazy操作,只有action算子触发后才会真正进行缓存或checkpoint操作
(懒加载操作是Spark任务很重要的一个特性,不仅适用于Spark RDD还适用于Spark sql等组件)
2、cache只是缓存数据,但不改变lineage。通常存于内存,丢失数据可能性更大
3、改变原有lineage,生成新的CheckpointRDD。通常存于hdfs,高可用且更可
16、描述repartition和coalesce的关系与区别
关系:repartition底层调用的coalesce
区别:repartition 一般用于增加分区 会shuffle
coalesce一般用户减少分区 默认不会shuffle
17、Spark中的广播变量与累加器
广播变量:分布式共享只读变量
为什么要引入这个:
一个Executor有多个core,所以可以同时执行多个task,当Driver需要传递一个数据量很大的对象时,由于每一个task中都含有这么一个变量,这样一来,数据在executor中就存在多份
可能导致的问题:
executor中的数据冗余,内存可能溢出,如果在shuffle阶段,数据传输效率特别低
作用:dirver只相executor中发送一个对象,所有task共享这个变量
累加器:分布式共享只写变量
为什么要引入这个: 想通过没有shuffle过程的算子来实现数据的累加
作用:driver向executor发送累加器,一个executor只存在一个累加器,executor执行完毕之后,返回给driver
driver保留所有executor的累加器,driver中进行两两聚合。executor之间不能互相访问对方的累加器
18、spark中的内存优化
基于java的内存,spark做了优化
内存的特点:内存可以动态占用,发生在执行内存和存储内存之间
机制:
1、执行内存和执行内存都满了,数据会写道磁盘上
2、当存储内存满了,可以占用执行内存,当资源不够时,占用的资源会丢弃或者写道磁盘上
3、当执行内存满了,可以占用存储内存,当资源不够时,执行内存只能等待被释放,它占用的不能被淘汰
第五章:-- 场景分析 –
1、极其特殊的情况下,维度数据比实时数据来的慢了怎么办
1、如果实时数据必须要维度数据才能操作,采用阻塞实时数据的操作,自旋
2、放到一个缓冲区里面,并设置过期一定的过期时间
2、分时购物车
从DWD-》DWS处理
1、需要什么数据
一小时内的购物车数量吗?
2、从哪里取数据
3、怎么处理
3、分时交易
4、分时订单数
5、分时支付数
6、分时评论数
7、分地区访问
8、分地区交易
9、分地区订单
10、分地区数据
11、分地区金额
12、排行榜
13、top20
14、sku
15、spu
16、品牌
17、品类
18、购物卷
19、搜索关键
20、风控退单
21、风控恶评
22、风控投诉
23、风控黑名单
24、恶意领购物卷
边栏推荐
- STM32学习笔记(2)-USART(基础应用1)
- Learning of fm4057s single lithium battery linear charging chip
- (10) Notes on null pointer accessing member function and this pointer
- 【移动机器人】轮式里程计原理
- Can chip learning of max3051
- (十)空指针访问成员函数与this指针注意事项
- (11) Const decorated member function
- Slm4054 independent linear lithium battery charger chip learning
- (十 一)const修饰成员函数
- (八)初始化列表
猜你喜欢

别把“IT信息化”不当“超级工程”

(11) Const decorated member function

(五)类和对象及类的分文件操作(2)

What if the xshell evaluation period has expired? Follow the steps below to solve the problem!

使用SoapUI工具生成发送短信接口代码

ShaderGraph——302游动的龙

Web design and development, efficient web development

(5) Class, object and class file splitting operation (2)

web设计与开发,高效web开发

UML类图
随机推荐
2022 Zhejiang provincial competition
ASP.NET 利用ImageMap控件设计导航栏
(8) Initialization list
Practical cases, in-depth analysis
Web design and development, efficient web development
使用SoapUI工具生成发送短信接口代码
Learning of fm4057s single lithium battery linear charging chip
Unity3D开发MR实现模型遮挡与透明地面接收阴影
STM32F407学习笔记(1)-EXTI中断事件与NVIC寄存器
STM32F407时钟树与系统时钟学习笔记
移动端整页滑屏示例(上下滑动整屏)(整理)
request获取请求服务器ip地址
(十 一)const修饰成员函数
AD-PCB原理图学习(1)
ASP. Net using imagemap control to design navigation bar
[solved] vagrant up solution to slow download box
JS array de duplication: de duplication of two-dimensional arrays, removal of the same value, and removal of the same array
eseses
What if the xshell evaluation period has expired? Follow the steps below to solve the problem!
(四)类和对象(1)