Shuffle过程详解及优化

15 篇文章 5 订阅
订阅专栏
11 篇文章 2 订阅
订阅专栏

1.MapReduce Shuffle

Map是映射,负责数据的过滤分 发;Reduce是规约,负责数据的计算归并。Reduce的数据来源于Map,Map的输出即是Reduce的输入,Reduce需要通过 Shuffle来获取数据。

 

从Map输出到Reduce输入的整个过程可以广义地称为Shuffle。Shuffle横跨Map端和Reduce端,在Map端包括Spill过程,在Reduce端包括copy和sort过程,如图所示:

Map的shuffle过程

Spill过程包括输出、排序、溢写、合并等步骤,如图所示:

Collect:

每个Map任务不断地以对的形式把数据输出到在内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间,在内存中放置尽可能多的数据。这个数据结构其实就是个字节数组,叫Kvbuffer

Sort:

先把Kvbuffer中的数据按照partition值和key两个关键字升序排序,移动的只是索引数据,排序结果是Kvmeta中数据按照partition为单位聚集在一起,同一partition内的按照key有序。

Spill:

Spill线程为这次Spill过程创建一个磁盘文件:从所有的本地目录中轮训查找能存储这么大空间的目录,找到之后在其中创建一个类似于 “spill12.out”的文件。Spill线程根据排过序的Kvmeta挨个partition的把数据吐到这个文件中,一个partition对应的数据吐完之后顺序地吐下个partition,直到把所有的partition遍历 完。一个partition在文件中对应的数据也叫段(segment)。

内存缓冲区是有大小限制的,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。

Merge

         Map任务如果输出数据量很大,可能会进行好几次Spill,out文件和Index文件会产生很多,分布在不同的磁盘上。最后把这些文件进行合并的merge过程闪亮登场。如前面的例子,“aaa”从某个map task读取过来时值是5,从另外一个map 读取时值是8,因为它们有相同的key,所以得merge成group。什么是group。对于“aaa”就是像这样的:{“aaa”, [5, 8, 2, …]}

 

Reduce的shuffle过程

Copy

Reduce 任务通过HTTP向各个Map任务拖取它所需要的数据。每个节点都会启动一个常驻的HTTP server,其中一项服务就是响应Reduce拖取Map数据。当有MapOutput的HTTP请求过来的时候,HTTP server就读取相应的Map输出文件中对应这个Reduce部分的数据通过网络流输出给Reduce。

Merge SORT

这里使用的Merge和Map端使用的Merge过程一样。Map的输出数据已经是有序的,Merge进行一次合并排序,所谓Reduce端的 sort过程就是这个合并的过程。一般Reduce是一边copy一边sort,即copy和sort两个阶段是重叠而不是完全分开的。

Reducer的输入文件已定,整个Shuffle才最终结束

 

 

2.Spark的Shuffle过程介绍

Shuffle Writer

Spark丰富了任务类型,有些任务之间数据流转不需要通过Shuffle,但是有些任务之间还是需要通过Shuffle来传递数据,比如wide dependency的group by key。

Spark中需要Shuffle 输出的Map任务会为每个Reduce创建对应的bucket,Map产生的结果会根据设置的partitioner得到对应的bucketId,然后填 充到相应的bucket中去。每个Map的输出结果可能包含所有的Reduce所需要的数据,所以每个Map会创建R个bucket(R是reduce的 个数),M个Map总共会创建M*R个bucket。

Map创建的bucket其实对应磁盘上的一个文 件,Map的结果写到每个bucket中其实就是写到那个磁盘文件中,这个文件也被称为blockFile。每个Map要在节点上创建R个磁盘文件用于结果输出,Map的结果是直接输 出到磁盘文件上的,100KB的内存缓冲是用来创建Fast Buffered OutputStream输出流。这种方式一个问题就是Shuffle文件过多。

针对上述Shuffle 过程产生的文件过多问题,Spark有另外一种改进的Shuffle过程:consolidation Shuffle,以期显著减少Shuffle文件的数量。在consolidation Shuffle中每个bucket并非对应一个文件,而是对应文件中的一个segment部分。Job的map在某个节点上第一次执行,为每个 reduce创建bucket对应的输出文件,把这些文件组织成ShuffleFileGroup,当这次map执行完之后,这个 ShuffleFileGroup可以释放为下次循环利用;当又有map在这个节点上执行时,不需要创建新的bucket文件,而是在上次的 ShuffleFileGroup中取得已经创建的文件继续追加写一个segment;当前次map还没执行完,ShuffleFileGroup还没有 释放,这时如果有新的map在这个节点上执行,无法循环利用这个ShuffleFileGroup,而是只能创建新的bucket文件组成新的 ShuffleFileGroup来写输出。

Shuffle Fetcher

Reduce去拖Map的输出数据,Spark提供了两套不同的拉取数据框架:通过socket连接去取数据;使用netty框架去取数据。Spark Map输出的数据没有经过排序,Spark Shuffle过来的数据也不会进行排序,Spark认为Shuffle过程中的排序不是必须的,并不是所有类型的Reduce需要的数据都需要排序,强 制地进行排序只会增加Shuffle的负担。educe拖过来的数据会放在一个HashMap中,HashMap中存储的也是对,key是Map输出的key,Map输出对应这个key的所有value组成HashMap的value。Spark将 Shuffle取过来的每一个对插入或者更新到HashMap中,来一个处理一个。HashMap全部放在内存中。

 

3.MapReduce Shuffle后续优化方向

压缩:对数据进行压缩,减少写读数据量;

 

减少不必要的排序:并不是所有类型的Reduce需要的数据都是需要排序的,排序这个nb的过程如果不需要最好还是不要的好;

 

内存化:Shuffle的数据不放在磁盘而是尽量放在内存中,除非逼不得已往磁盘上放;当然了如果有性能和内存相当的第三方存储系统,那放在第三方存储系统上也是很好的;这个是个大招;

 

网络框架:netty的性能据说要占优了;

 

本节点上的数据不走网络框架:对于本节点上的Map输出,Reduce直接去读吧,不需要绕道网络框架。

 

4.Spark Shuffle后续优化方向

 

Spark作为MapReduce的进阶架构,对于Shuffle过程已经是优化了的,特别是对于那些具有争议的步骤已经做了优化,但是Spark的Shuffle对于我们来说在一些方面还是需要优化的。

 

压缩:对数据进行压缩,减少写读数据量;

 

内存化:Spark历史版本中是有这样设计的:Map写数据先把数据全部写到内存中,写完之后再把数据刷到磁盘上;考虑内存是紧缺资源,后来修改成把数据直接写到磁盘了;对于具有较大内存的集群来讲,还是尽量地往内存上写吧,内存放不下了再放磁盘。

Shuffle机制及优化
jiedaodezhuti的博客
02-10 615
Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。 具体Shuffle过程详解: (1)MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中 (2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件 (3)多个溢出文件会被合并成大的溢出文件 (4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序 (5)ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据 (6)ReduceTask会取到同一个分区的
MapReduce优化----Shuffle过程剖析及性能优化
Big Data
02-26 237
shuffle过程图   1.    Map端 当Map 开始产生输出时,它并不是简单的把数据写到磁盘,因为频繁的磁盘操作会导致性能严重下降。它的处理过程更复杂,数据首先是写到内存中的一个缓冲区,并做了一些预排序,以提升效率。 每个Map 任务都有一个用来写入输出数据的循环内存缓冲区。这个缓冲区默认大小是100MB,可以通过io.sort.mb 属性来设置具体大小。当缓冲区中的数据量...
shuffle原理 及优化策略
python -学习笔记
03-29 1628
1、shuffle操作原理: 在Spark中,数据通常不会跨分区分布,以满足特定操作的需要。在计算期间,单 个任务将对单个分区进行操作——因此,要组织单个reduceByKey 的计算任务要执行 的所有数据,Spark需要执行一个all-to-all操作。它必须从所有分区中读取所有 键的所有值,然后将所有分区的值放在一起计算每个键的最终结果——这称为shuffle。 Spark中的某些操作会触...
Hive Shuffle 的具体过程
最新发布
My_wife_QBL的博客
08-01 761
Shuffle 是指在 MapReduce 作业中,将数据从 Mapper 中传输到 Reducer 的过程。这个过程通常涉及到数据的重新分配和排序,以确保相同的键被发送到同一个 Reducer 节点。Shuffle 是大数据处理中的关键环节,因为它直接影响到作业的性能和结果的正确性。Join 操作:将两个或多个表的数据进行合并。Group By 操作:将数据分组成多个组并进行聚合计算。Aggregation 操作:对数据进行汇总,例如求和、计数等。
shuff过程
A_A_Forever的博客
06-04 483
一、maptask 1)一个maptask对应一个逻辑切片split, 2)运行代码: 1 先执行 setup方法(函数) 在maptask开始时调用一次 ①mapjoin: 把小文件放在运行节点的缓存区,然后流读取,并把其存在一个缓存容器中, 1) getCacheFiles()[0] | getLocalCacheFiles()[0] 2) 创建一个输入流读取缓...
shuffle阶段
m0_49889089的博客
05-15 311
shuffle阶段
Shuffle过程详解
热门推荐
至道
06-15 1万+
Shuffle过程详解      Shuffle过程是MapReduce的核心,最近看了很多资料,网上说法大体相同,但有些地方有一点点出入,就是各个阶段的执行顺序 总个shuffle过程可以看做是从map输出到reduce输入的这个中间过程,在这个中间过程中,经过了一系列的步骤     下面看看官方给出的图     Map端 下面是我画的一张图    1.In
Hive 优化 (important)
06-13 136
Hive ive优化 要点: 优化时,把hive sql当做map reduce程序来读,会有意想不到的惊喜。 理解hadoop的核心能力,是hive优化的根本。 长期观察hadoop处理数据的过程,有几个显著的特征: 1.不怕数据多,就怕数据倾斜。 2.对jobs数比较多的作业运行效率相对比较低,比如即使有几百行的表,如果多次关联多次汇总,产生十...
Spark 的Shuffle过程详解
weixin_45333766的博客
08-05 3629
一、Shuffle的作用是什么? Shuffle的中文解释为“洗牌操作”,可以理解成将集群中所有节点上的数据进行重新整合分类的过程。其思想来源于hadoop的mapReduce,Shuffle是连接map阶段和reduce阶段的桥梁。由于分布式计算中,每个阶段的各个计算节点只处理任务的一部分数据,若下一个阶段需要依赖前面阶段的所有计算结果时,则需要对前面阶段的所有计算结果进行重新整合和分类,这就需要经历shuffle过程。 在spark中,RDD之间的关系包含窄依赖和宽依赖,其中宽依赖涉及shuffle操作
详解shuffle过程
12-30
Hadoop Shuffle 过程详解 Hadoop 的 Shuffle 过程是 MapReduce 的核心,也被称为奇迹发生的地方。要想理解 MapReduce,Shuffle 是必须要了解的。Shuffle 的正常意思是洗牌或弄乱,可能大家更熟悉的是 Java API里的 ...
Spark shuffle 过程详解
weixin_40319752的博客
10-24 456
Shuffle 过程分为map 端的write 和 reducer 端的read 两阶段 Shuffle write 端发展史从 hashShuffleManager(默认spark1.2之前) 和 到 sortShuffleManger HashShuffleManager 分为普通shuffle 和 consilodate机制shuffle 1.普通shuffle 普通shuffle...
【hadoop】shuffle过程优化
sofeld的博客
07-05 605
shuffle 过程优化(MapReduce 的优化) combiner 合并优化 在 map 阶段提前进行了一次合并,一般来讲等同于提前执行了 reduce 操作 好处:可以降低 reduce 的压力 为什么说在 map 阶段提前运行 reduce 方法可以降低 reduce 的压力? 在 map阶段的进行合并是并行的(分布式的)。 combiner 合并可以解决数据倾斜问题: 什么...
HadoopMapReduce 的 Shuffle 阶段
杯莫廷的博客
12-17 522
HadoopMapReduce 的 Shuffle 阶段 Hadoop MapReduce 的 Shuffle 阶段是指从 Map 的输出开始,包括系统执行排序,以及传送 Map 输出到 Reduce 作为输入的过程。 排序阶段是指对 Map 端输出的 Key 进行排序的过程。不同的 Map 可能输出相同的 Key,相同的 Key 必须发送到同一个 Reduce 端处理。Shuffle 阶段可以分...
mr的shuffle过程
huangxiaoxun235的博客
06-15 3893
Map Task Map Task产生输出的时候,并不是直接将数据写到本地磁盘,这个过程涉及到两个部分:写缓冲区、预排序。 (1)写缓冲区 每一个Map Task都拥有一个“环形缓冲区”作为Mapper输出的写缓冲区。写缓冲区大小默认为100MB(通过属性io.sort.mb调整),当写缓冲区的数据量达到一定的容量限额时(默认为80%,通过属性io.sort.spill.percent调整)
Shuffle阶段详细解读
tianqinglei的博客
09-02 7938
Shuffer阶段说明 shuffle阶段主要包括map阶段的combine、group、sort、partition以及reducer阶段的合并排序。Map阶段通过shuffle后会将输出数据按照reduce的分区分文件的保存,文件内容是按照定义的sort进行排序好的。Map阶段完成后会通知ApplicationMaster,然后AM会通知Reduce进行数据的拉取,在拉取过程中进行r
MapReduce详解shuffle阶段
follweme888的专栏
06-24 1068
MapReduce详解shuffle阶段(看图理解): Mapreduce的过程整体上分为四个阶段:InputFormat MapTask ReduceTask OutPutFormat 当然中间还有shuffle阶段 InputFormat: 我们通过在runner类中用 job.setInputPaths 或者是addInputPath添加输入文件或者是目录(这两者是...
Shuffle阶段的自我理解
Hadoop学习博客
06-29 2305
Shuffle阶段的概述Shuffle是连接map,reduce两个管道的衔接套。Map的输出经过partition写到内存的buffer里面,当内存满了的时候,会排序写到零时文件,这样当mapTask整个运行完之后,会产生一大堆零时文件,shuffle要把它们merge在一起。(会调用combine函数) Reduce端通过TaskTracker监听到自己的需要数据的哪个map运行完了,就回去p
shuffle过程详解(spark与mr)
我还是个菜鸟
10-12 1848
spark-shuffle与MapReduce shuffleMR shuffle1、map端2、reduce端配置调优map端的调优属性:reduce端的调优属性spark shuffle一.定义二.演变三.Hash Shuffle V1有多少个reduce任务就会产生多少个中间文件(一个task--->所产生的文件数量=== reduce task数量)Hash Shuffle V1 过...
Spark Core - Shuffle过程详解
959
12-01 2276
Shuffle 洗牌 Map阶段处理的数据如何传递给reduce阶段,shuffle在其中起到了很关键的作用,shuffle分布在MapReduce的map阶段和reduce阶段。 Map的shuffle过程:主要包括输出、排序、溢写、合并等步骤,如下图所示: 1、collect:每个Maptask都将数据输出到该Maptask对应的环形缓冲区Kvbuffer中,使用环形数据结构是为了更有效地使用内存空间,在内存中放置尽可能多的数据。 2、Sort:在对数据进行合并的同时,会进行排序操作,由于 MapTa
写文章

热门文章

  • Hive分析窗口函数: LAG、LEAD、FIRST_VALUE、LAST_VALUE 28336
  • Shuffle过程详解及优化 21960
  • Spark如何处理数据倾斜 20220
  • Hadoop之HDFS文件读写过程 9654
  • 数据仓库之维度建模 6970

分类专栏

  • OLAP 1篇
  • hadoop环境搭建 5篇
  • linux基础 3篇
  • shell脚本 1篇
  • 编程练习 4篇
  • hive配置 1篇
  • hive 11篇
  • leetcode-java 5篇
  • hdfs 5篇
  • trash 1篇
  • spark 15篇
  • YARN 1篇
  • 数据仓库 1篇
  • hadoop 11篇

最新评论

  • Spark如何处理数据倾斜

    纯正的熊猫说这不是bug: 大佬,现在shuffle write是sort based的了,不是hash partitioner了

  • Hadoop集群nodes unhealthy解决方法

    盗梦骇客: 有搞头表情包

  • Hive分析窗口函数: LAG、LEAD、FIRST_VALUE、LAST_VALUE

    NightFall丶: 范围默认是到当前行的,但是看着失效了,变成分组内全局排序了

  • Hive分析窗口函数: LAG、LEAD、FIRST_VALUE、LAST_VALUE

    用子弹写代码: first_value 函数对应的last2 这一列 是不是有点问题?

  • Spark如何处理数据倾斜

    赣江: 文章写的不错,比较有参考意义

最新文章

  • Doris 数据模型
  • hive 经纬度之间的距离
  • Spark Shuffle过程
2021年1篇
2020年4篇
2019年2篇
2018年21篇
2017年23篇

目录

目录

评论 1
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43元 前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值

玻璃钢生产厂家重庆城市标志玻璃钢雕塑玻璃钢花盆能否放在家里邳州主题商场美陈杭州玻璃钢雕塑哪家专业四川玻璃钢雕塑订制价格七台河玻璃钢天使雕塑定做厂广东美陈玻璃钢雕塑玻璃钢雕塑厂址仿玉玻璃钢花盆哪家好台州玻璃钢仿真水果雕塑定制屯昌县玻璃钢雕塑制作厂家二七玻璃钢雕塑加工厂家锻铜玻璃钢彩绘雕塑定做溧水商场大厅美陈鹤壁校园玻璃钢人物雕塑制作上海拉丝玻璃钢雕塑品牌企业宁夏广场玻璃钢雕塑设计阳山玻璃钢雕塑厂家玻璃钢艳后雕塑厂家玻璃钢雕塑品种大全玻璃钢雕塑上漆红古铜常用玻璃钢雕塑摆件供应黑龙江园林玻璃钢雕塑定制南宁定制玻璃钢雕塑济南玻璃钢雕塑生成厂家冰雪节商场美陈白银仿真玻璃钢雕塑价格远安玻璃钢花盆花器韶关玻璃钢卡通雕塑销售广东发光景观玻璃钢雕塑供应商香港通过《维护国家安全条例》两大学生合买彩票中奖一人不认账让美丽中国“从细节出发”19岁小伙救下5人后溺亡 多方发声单亲妈妈陷入热恋 14岁儿子报警汪小菲曝离婚始末遭遇山火的松茸之乡雅江山火三名扑火人员牺牲系谣言何赛飞追着代拍打萧美琴窜访捷克 外交部回应卫健委通报少年有偿捐血浆16次猝死手机成瘾是影响睡眠质量重要因素高校汽车撞人致3死16伤 司机系学生315晚会后胖东来又人满为患了小米汽车超级工厂正式揭幕中国拥有亿元资产的家庭达13.3万户周杰伦一审败诉网易男孩8年未见母亲被告知被遗忘许家印被限制高消费饲养员用铁锨驱打大熊猫被辞退男子被猫抓伤后确诊“猫抓病”特朗普无法缴纳4.54亿美元罚金倪萍分享减重40斤方法联合利华开始重组张家界的山上“长”满了韩国人?张立群任西安交通大学校长杨倩无缘巴黎奥运“重生之我在北大当嫡校长”黑马情侣提车了专访95后高颜值猪保姆考生莫言也上北大硕士复试名单了网友洛杉矶偶遇贾玲专家建议不必谈骨泥色变沉迷短剧的人就像掉进了杀猪盘奥巴马现身唐宁街 黑色着装引猜测七年后宇文玥被薅头发捞上岸事业单位女子向同事水杯投不明物质凯特王妃现身!外出购物视频曝光河南驻马店通报西平中学跳楼事件王树国卸任西安交大校长 师生送别恒大被罚41.75亿到底怎么缴男子被流浪猫绊倒 投喂者赔24万房客欠租失踪 房东直发愁西双版纳热带植物园回应蜉蝣大爆发钱人豪晒法院裁定实锤抄袭外国人感慨凌晨的中国很安全胖东来员工每周单休无小长假白宫:哈马斯三号人物被杀测试车高速逃费 小米:已补缴老人退休金被冒领16年 金额超20万

玻璃钢生产厂家 XML地图 TXT地图 虚拟主机 SEO 网站制作 网站优化