shuffle原理 及优化策略

1、shuffle操作原理:

在Spark中,数据通常不会跨分区分布,以满足特定操作的需要。在计算期间,单
个任务将对单个分区进行操作——因此,要组织单个reduceByKey 的计算任务要执行
的所有数据,Spark需要执行一个all-to-all操作。它必须从所有分区中读取所有
键的所有值,然后将所有分区的值放在一起计算每个键的最终结果——这称为shuffle。
 
Spark中的某些操作会触发称为shuffle的事件。shuffle是Spark用于重新分发数据
的机制,以便跨分区对数据进行不同的分组。这通常涉及跨执行程序和机器复制数据,
使shuffle成为一项复杂而昂贵的操作。在Spark Core中,Shuffle是划分宽窄依赖
依据Stage的依据

宽依赖:一对多 (有shuffle操作)
窄依赖:一对一 或者多对一

2、 Shuffle操作问题解决

2.1 数据倾斜原理

在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task
来进行处理,此时如果某个key对应的数据量特别大的话,就会发生数据倾斜
(在实际生产中去null值是必须的)

2.2 数据倾斜问题发现与解决

通过Spark Web UI来查看当前运行的stage各个task分配的数据量,从而进一步确定
是不是task分配的数据不均匀导致了数据倾斜。
知道数据倾斜发生在哪一个stage之后,接着我们就需要根据stage划分原理,推算出
来发生倾斜的那个stage对应代码中的哪一部分,这部分代码中肯定会有一个shuffle
类算子。
通过countByKey查看各个key的分布。

2.3 数据倾斜解决方案

2.3.1 过滤少数导致倾斜的key
2.3.2 提高shuffle操作的并行度
2.3.3 局部聚合和全局聚合

案例<一>:采样倾斜key并分拆join操作(join的两表都很大,但仅一个RDD的几个key的数据量过大)
方案实现思路:
对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key。
然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。
接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个RDD。
再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打散成n份,分散到多个task中去进行join了。
而另外两个普通的RDD就照常join即可。
最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。

在这里插入图片描述
案例<二>使用随机前缀和扩容RDD进行join(RDD中有大量的key导致数据倾斜)
方案实现思路:
   将含有较多倾斜key的RDD扩大多倍,与相对分布均匀的RDD配一个随机数。 在这里插入图片描述
4 spark shuffle参数调优

spark.shuffle.file.buffer

默认值:32k
参数说明:该参数用于设置shuffle write task的
BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入
buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。

调优建议:

如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),
从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,
进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

spark.reducer.maxSizeInFlight

默认值:48m
参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer
缓冲决定了每次能够拉取多少数据。

调优建议:

如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),
从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中
发现,合理调节该参数,性能会有1%~5%的提升。

spark.shuffle.io.maxRetries

默认值:3
参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数
据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以
重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行
失败。

调优建议:

对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次)
,以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中
发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度
提升稳定性。

spark.shuffle.io.retryWait

默认值:5s
参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。

调优建议:

建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

spark.shuffle.memoryFraction

默认值:0.2

参数说明:

该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,
默认是20%。

调优建议:

在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议
调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合
过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。

spark.shuffle.manager

默认值:sort

参数说明:

该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、
sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是
Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与
sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。

调优建议:

由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该
排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不
需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的
HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意
的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。

spark.shuffle.sort.bypassMergeThreshold

默认值:200

参数说明:

当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于
这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按
照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的
所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。

调优建议:

当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数
调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,
map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产
生大量的磁盘文件,因此shuffle write性能有待提高。

spark.shuffle.consolidateFiles

默认值:false

参数说明:

如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启
consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read
task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。

调优建议:

如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可
以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,
同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的
SortShuffleManager要高出10%~30%。
zsj.python之路
关注 关注
  • 1
    点赞
  • 2
    收藏
    觉得还不错? 一键收藏
  • 0
    评论
大数据Hive】hive 优化策略之job任务优化
congge_study的博客
11-22 8681
hive job优化策略
Shuffle机制及优化
jiedaodezhuti的博客
02-10 615
Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。 具体Shuffle过程详解: (1)MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中 (2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件 (3)多个溢出文件会被合并成大的溢出文件 (4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序 (5)ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据 (6)ReduceTask会取到同一个分区的
超实用Hadoop的Shuffle过程优化:map端,reduce端,网络IO以及常见参数配置
不爱吃鱼的馋猫
09-06 483
Shuffle功能强大,但是Shuffle过程中多次落盘操作以及网络IO,是导致MapReduce慢的主要原因,可以尝试从以下几个方面进行优化: 1、Map阶段 (1)增大环形缓冲区大小。由100m扩大到200m (2)增大环形缓冲区溢写的比例。由80%扩大到90% (3)减少对溢写文件的merge次数。(10个文件,一次20个merge) (4)不影响实际业务的前提下,采用Combiner提前合并,减少 I/O。 2、Reduce阶段 (1)合理设置Map和Reduce数:两个都不能设置太少
(转载)Shuffle过程详解及优化
moose_killer的博客
03-11 1597
1.MapReduce Shuffle Map是映射,负责数据的过滤分 发;Reduce是规约,负责数据的计算归并。Reduce的数据来源于Map,Map的输出即是Reduce的输入,Reduce需要通过 Shuffle来获取数据。 从Map输出到Reduce输入的整个过程可以广义地称为ShuffleShuffle横跨Map端和Reduce端,在Map端包括Spill过程,在Reduce端包括copy和sort过程,如图所示: Map的shuffle过程 Spill过程包括输出、排序、溢写、.
shuffle工作原理
weixin_30415113的博客
03-05 708
Shuffle的正常意思是洗牌或弄乱,可能大家更熟悉的是Java API里的Collections.shuffle(List)方法,它会随机地打乱参数list里的元素顺序。如果你不知道MapReduce里Shuffle是什么,那么请看这张图:这张是官方对Shuffle过程的描述。但我可以肯定的是,单从这张图你基本不可能明白Shuffle的过程,因为它与事实相差挺多,细节也是错乱的...
Spark内核机制解析与性能调优:Shuffle原理及性能优化策略
# 1. Spark内核机制概述 ## 1.1 Spark内核概述 Apache Spark是一个快速通用的集群计算系统,具有高扩展性...ShuffleSpark中用于数据重分区和数据交换的核心机制,在一些操作(如groupByKey、join等)需要将数据重新
spark shuffle原理
最新发布
04-21
理解Spark Shuffle原理对于优化大数据应用的性能至关重要,开发者可以通过调整Shuffle相关的配置参数,如buffer size、shuffle partition数量等,来平衡内存使用、磁盘I/O和网络传输,以达到最佳的处理效果。
Spark数据分区与Shuffle优化策略
# 1. 理解Spark数据分区 ## 1.1 什么是Spark数据分区? Spark数据分区是将数据集划分成更小的数据块的过程。每个数据分区都包含数据集的一个子集。数据分区是Spark处理大规模数据的基本...而数据分区的优化可以提高Sp
YOLOv5小目标检测实战指南:深入解析算法原理优化策略,快速提升模型性能
[YOLOv5小目标检测实战指南:深入解析算法原理优化策略,快速提升模型性能](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/726e794f294c43278145d11facb9a1ab~tplv-k3u1fbpfcp-zoom-in-crop-mark:1512:0:0:0...
shuffle的机制
03-30
这里详细的分析了hadoop的shuffle机制,具体步骤等等。
MR的Shuffle过程以及优化
weixin_59295776的博客
11-20 1656
数据从map方法出来以后,首先进入getPartition方法,然后会对数据进行分区,之后进入环形缓冲区(默认大小为100Mb,数据量到达80%时会进行磁盘溢写),在溢写数据时会进行一次快排,这里的快排是对key的索引进行字典顺序排序,溢写之后会产生大量的小文件,(由于在hdfs中,每个文件不论大小,都会固定占用150字节的系统空间,因为文件或者文件夹是以对象形式存储在hdfs上的),所以我们要在快排之后在进行一次归并排序,来减少小文件个数,之后把数据按照指定的分区放入对应的分区下,来等待reduce的拉取
Shuffle优化
hyunbar的博客
08-11 589
1、Map阶段 增大缓冲区的大小:默认100M,可以改为200 增大缓冲区的溢写百分比:默认0.8,可以改为0.9 减少溢写文件的merge次数 采用combiner提前预聚合,减少IO。(不影响业务逻辑的前提下,只能加减,不能做乘除等复杂聚合) 2、Reduce阶段 合理设置map和reduce数:两个都不能设置太少,也不能设置太多。 太少,会导致task等待,延长处理时间 太...
Hadoop shuffle过程
Simple 专栏
05-13 807
Map端的shuffle Map端会处理输入数据并产生中间结果,这个中间结果会写到本地磁盘,而不是HDFS。每个Map的输出会先写到内存缓冲区中,当写入的数据达到设定的阈值时,系统将会启动一个线程将缓冲区的数据写到磁盘,这个过程叫做spill。 在spill写入之前,会先进行二次排序,首先根据数据所属的partition进行排序,然后每个partition中的数据再按key来排序。partit...
ShuffleManager 原理
weixin_34190136的博客
06-11 141
Spark 的源码中,负责 shuffle 过程的执行、计算、处理的组件主要是 ShuffleManager。 在 Spark 1.2 以前,默认的 shuffle 计算引擎是 HashShuffleManager。该 ShuffleMananger 有一个非常严重的弊端,就是会产生大量的磁盘文件,进而有大量的磁盘 IO 操作,比较影响性能。 因此在 Spark 1.2 之后,默认的 Shuf...
shuffle机制和原理分析
LYlinye的博客
06-05 1907
Shuffle简介 Shuffle描述着数据从map task输出到reduce task输入的这段过程。shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过shuffle这个环节,shuffle的性能高低直接影响了整个程序的...
性能调优-shuffle调优
u013939918的博客
03-08 1682
shuffle调优 什么情况下会发生shuffle,然后shuffle原理是什么? 在spark中,主要是以下几个算子:groupByKey、reduceByKey、countByKey、join,等等。 什么是shuffle? groupByKey,要把分布在集群各个节点上的数据中的同一个key,对应的values,都给集中到一块儿, 集中到集群中同一
Shuffle原理
weixin_43825047的博客
01-10 337
·  MapReduce是怎样为数据进行分组,整合等操作的呢?这就涉及到Map和Reduce在中间十分关键的衔接部分Shuffle。 上面介绍了Map 和 Reduce的各自的任务和总的流程,MapReduce是怎样为数据进行分组,整合等操作的呢?这就涉及到Map和Reduce在中间十分关键的衔接部分Shuffle。   什么是shuffleshuffle原意是洗牌,混乱。而在MapReduce...
Shuffle过程详解及优化
热门推荐
菜如张学清的博客
08-23 2万+
1.MapReduce Shuffle Map是映射,负责数据的过滤分 发;Reduce是规约,负责数据的计算归并。Reduce的数据来源于Map,Map的输出即是Reduce的输入,Reduce需要通过 Shuffle来获取数据。   从Map输出到Reduce输入的整个过程可以广义地称为ShuffleShuffle横跨Map端和Reduce端,在Map端包括Spill过程,在Reduc...
写文章

分类专栏

  • sql 5篇
  • 数据 3篇
  • 考证心得 1篇
  • oracle 9篇
  • 大数据 15篇
  • python 41篇
  • java
  • c
  • c++
  • MySQL 20篇
  • pythonnet 14篇
  • MongoDB 4篇
  • 正则表达式 2篇
  • web 1篇
  • javascript 2篇
  • Flask 3篇
  • AJAX 1篇
  • Django 3篇
  • 爬虫 2篇
  • 面试题 1篇
  • 算法 2篇
  • SAS 3篇
  • 数据分析 4篇
  • 运维 3篇
  • shell 6篇
  • Redis 1篇
  • linux 7篇

最新评论

  • 用python计算100以内的素数

    爱coding的小羊: 逻辑是不是有错误呀

  • 用python写的一个简易聊天室,用到多线程,udp套接字

    肖壹刀: 这个里面的os.fork会报错,请问是什么原因呢

  • 用python计算100以内的素数

    mobeicanyue: 其实也就是把x换成x的平方根

  • python之运算符重载

    洒去犹能化碧涛: n3=n1+n2等价于n3=n1.__add__(n2)这是怎么等价的啊,你没有调用这个add方法啊

  • python之迭代器(iterator)详解

    清風明月照我心: 请教一下,next函数将迭代器的值取一个少一个,我试了一下,如果最后再加上list(iter)仍能将全部值都取出来?

大家在看

  • Web+MySql ——Mybatis第二弹 1
  • 9.21学习 552
  • 单链表源码实现 241
  • 【JDK8新特性】Stream API 结合Lambda语法在项目中的实战应用 960
  • 地平线秋招2025 966

最新文章

  • hadoop构建数据仓库(一)
  • 银行从业资格考试心得
  • 数据核对思维
2021年1篇
2020年43篇
2019年27篇
2018年86篇

目录

目录

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43元 前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值

玻璃钢生产厂家雕塑 玻璃钢特点玻璃钢人物铜雕塑大宁玻璃钢花盆花器平顶山铜玻璃钢人物雕塑涿州玻璃钢花盆花器北京蛋形玻璃钢花盆玻璃钢雕塑代加工厂加盟桐乡玻璃钢圆门头雕塑景观玻璃钢雕塑销售厂家广东玻璃钢卡通雕塑阿狸定做山东玻璃钢雕塑浮雕厂组合式玻璃钢雕塑摆件价格云浮玻璃钢动物雕塑产品介绍东营人物玻璃钢雕塑定制江苏动物玻璃钢雕塑优势驻马店玻璃钢人物室外镂空雕塑赣州玻璃钢商场美陈焦作专业玻璃钢人物雕塑制作湘潭大象玻璃钢雕塑大型玻璃钢雕塑哪家专业公仔玻璃钢雕塑制作重庆玻璃钢雕塑厂郑州大型玻璃钢人物雕塑阳信玻璃钢花盆花器宜宾玻璃钢仿真水果雕塑桐乡玻璃钢雕塑订做公园玻璃钢雕塑厂家现货特色玻璃钢雕塑摆件销售西安佛像玻璃钢雕塑供应商济南个性化玻璃钢雕塑订做价格香港通过《维护国家安全条例》两大学生合买彩票中奖一人不认账让美丽中国“从细节出发”19岁小伙救下5人后溺亡 多方发声单亲妈妈陷入热恋 14岁儿子报警汪小菲曝离婚始末遭遇山火的松茸之乡雅江山火三名扑火人员牺牲系谣言何赛飞追着代拍打萧美琴窜访捷克 外交部回应卫健委通报少年有偿捐血浆16次猝死手机成瘾是影响睡眠质量重要因素高校汽车撞人致3死16伤 司机系学生315晚会后胖东来又人满为患了小米汽车超级工厂正式揭幕中国拥有亿元资产的家庭达13.3万户周杰伦一审败诉网易男孩8年未见母亲被告知被遗忘许家印被限制高消费饲养员用铁锨驱打大熊猫被辞退男子被猫抓伤后确诊“猫抓病”特朗普无法缴纳4.54亿美元罚金倪萍分享减重40斤方法联合利华开始重组张家界的山上“长”满了韩国人?张立群任西安交通大学校长杨倩无缘巴黎奥运“重生之我在北大当嫡校长”黑马情侣提车了专访95后高颜值猪保姆考生莫言也上北大硕士复试名单了网友洛杉矶偶遇贾玲专家建议不必谈骨泥色变沉迷短剧的人就像掉进了杀猪盘奥巴马现身唐宁街 黑色着装引猜测七年后宇文玥被薅头发捞上岸事业单位女子向同事水杯投不明物质凯特王妃现身!外出购物视频曝光河南驻马店通报西平中学跳楼事件王树国卸任西安交大校长 师生送别恒大被罚41.75亿到底怎么缴男子被流浪猫绊倒 投喂者赔24万房客欠租失踪 房东直发愁西双版纳热带植物园回应蜉蝣大爆发钱人豪晒法院裁定实锤抄袭外国人感慨凌晨的中国很安全胖东来员工每周单休无小长假白宫:哈马斯三号人物被杀测试车高速逃费 小米:已补缴老人退休金被冒领16年 金额超20万

玻璃钢生产厂家 XML地图 TXT地图 虚拟主机 SEO 网站制作 网站优化