Shuffle过程,也称Copy阶段。reduce task从各个map task上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定的阀值,则写到磁盘上,否则直接放到内存中。
MAP端
map函数开始产生输出时,并不是简单地将它写到磁盘上。这个过程更复杂,它利用缓冲的方式写到内存并出于效率的目的进行预排序。
每个map任务都有一个环形缓冲区用于存储任务输出。在默认情况下,缓冲区的大小为100MB,这个值可以通过mapreduce.task.io.sort.mb
属性来调整。一旦缓冲内容达到阈值(mapreduce.map.sort.spill.percent
,默认为80%),一个后台线程便开始把内容溢写(spill)到磁盘,在溢写到磁盘的过程中,map输出继续写道缓冲区,但如果在此期间缓冲区被写满,map会被阻塞直到磁盘过程完成。溢写过程按轮询方式将缓冲区的内容写到mapreduce.cluster.local.dir
属性在作业特定子目录下的指定的目录中。在写磁盘之前,线程首先根据数据最终要传的reducer把数据划分成相应的分区(partition,用户也可自定义分区函数,但默认的partitioner通过哈希函数来分区,也很高效)。在每个分区中,后台线程按键进行内存中排序,如果有一个combiner函数,它就在排序后的输出上运行。运行combiner函数使得map输出结果更紧凑,因此减少写到磁盘的数据和传递给reducer的数据。
每次内存缓冲区达到溢出阈值时,就会新建一个溢出文件(spill file),因此,在map任务写完其最后一个输出记录后,会有几个溢写文件。在任务完成之前,溢写文件被合并成一个已分区且已排序的输出文件。配置属性是mapreduce.task.io.sort.factor
控制着一次最多能合并多少流,默认值是10.
如果至少存在3个溢写文件(通过mapreduce.map.combine.minspills
属性设置)时,则combiner就会在输出文件写到磁盘之前再次运行。combiner可以在输入上反复运行,但并不影响最终结果。如果只有1个或者2个溢写文件,那么由于map输出规模减少,因此不值得调用combiner带来的开销,因此不会为该map输出再次运行combiner。
在将压缩map输出写到磁盘的过程中对他进行压缩往往是一个很好的主意,因为这样写磁盘的速度更快,节约磁盘空间,并且减少传给reducer的数据量。在默认情况下,输出时不压缩的,但只要将mapreduce.map.output.compress
设置为true,就可以轻松使用此功能。使用的压缩库由mapreduce.map.output.compress.codec
指定。
reducer通过HTTP得到输出文件的分区。用于文件分区的工作线程的数量由任务的mapreduce.shuffle.max.threads
属性控制,此设置针对的是每一个节点管理器,而不是针对每个map任务。默认值0将最大线程数设置为机器中处理器数量的两倍。
REDUCE端
现在转到处理过程的reduce部分。map输出文件位于运行map任务的tasktracker的本地磁盘(注意,尽管map输出经常写到map tasktracker 的本地磁盘,但reduce输出并不这样),现在,tasktracker需要为分区文件运行reduce任务。并且,reduce任务需要集群上若干个map任务的map输出作为其特殊的分区文件。每个map任务的完成时间可能不同,因此在每个任务完成时,reduce任务就开始复制其输出。这就是reduce任务的复制阶段。reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,但这个默认值可以修改设置mapreduce.reduce.shuffle.parallelcopies
属性即可。
如果map输出相当小,会被复制到reduce任务JVM的内存(缓冲区大小由mapreduce.reduce.shuffle.input.buffer.percent
属性控制,指定用于此用途的堆空间的百分比),否则,map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapreduce.reduce.shuffle.merge.percent
决定)或者达到map输出阈值(由mapreduce.reduce.merge.inmen.threshold
控制),则合并后溢出写到磁盘中。如果指定combiner,则在合并期间运行它以降低写入硬盘的数据量。
随着磁盘上副本增多,后台线程会将它们合并为更大的、排好序的文件。这会为后面的合并节省一些时间。注意,为了合并,压缩的map输出(通过map任务)都必须在内存中被解压缩。
复制完所有map输出后,reduce任务进入排序阶段(更恰当的说法是合并阶段,因为排序是在map端进行的),这个阶段将合并map输出,维持其顺序排序。这是循环进行的。比如,如果有50个map输出,而合并因子是10(10为默认设置,由mapreduce.task.io.sort.factor
属性设置,与map的合并类似),合并将进行5趟,每趟将10个文件合并成一个文件,因此最后有5个中间文件。
在最后阶段,即reduce阶段,直接把数据输入reduce函数,从而省略了一次磁盘往返行程,并没有将这5个文件合并成一个已排序的文件作为最后一趟。最后的合并可以来自内存和磁盘片段。
❝
每趟合并的文件数实际上比事例中展示有所不同。目标是合并最少数量的文件以便满足于最后一趟的合并系数。因此如果有40个文件,我们并不会在四趟中每趟合并10个文件从而得到4个文件。相反,第一趟只合并4个文件,随后的三趟合并完整的10个文件。在最后一趟中,4个已合并的文件和余下的6个(未合并的)文件合计10个。
在reduce阶段,对已排序输出中的每个键都调用reduce函数。此阶段的输出直接写到输出文件系统,一般为HDFS(可自定义)。如果采用HDFS,由于节点管理器也运行数据节点,所以第一个块的副本将被写入到本地磁盘。
以上就是良许教程网为各位朋友分享的Linu系统相关内容。想要了解更多Linux相关知识记得关注公众号“良许Linux”,或扫描下方二维码进行关注,更多干货等着你 !