hadoop系列四:MapReduce和Yarn笔记
网站首页 文章专栏 hadoop系列四:MapReduce和Yarn笔记
hadoop系列四:MapReduce和Yarn笔记
编辑时间:2020-05-09 20:00 作者:毛毛小妖 浏览量:365 评论数:0

一、MapReduce概述

1.定义

MapReduce是一个分布式运算程序的编程框架

2.MapReduce核心思想

MapReduce运算程序需要分为2个阶段:Map阶段和Reduce阶段

总结就是:分片聚集

3.MapReduce进程

一个完整的MapReduce程序在分布式运行时有三种进程:

1)MrAppMaster:负责整个程序的过程调度及状态协调

2)MapTask:负责map阶段的数据处理流程

3)ReduceTask:负责reduce阶段的数据处理流程

5.常用数据序列化类型

Java类型

Hadoop Writable类型

Boolean

BooleanWritable

Byte

ByteWritable

Int

IntWritable

Float

FloatWritable

Long

LongWritable

Double

DoubleWritable

String

Text

Map

MapWritable

Array

ArrayWritable

6.MapReduce编程规范

用户编写的程序分为三部分:Mapper,Reducer和Driver

6.1.Mapper阶段

对输入的k-v进行处理,输出新的k-v

6.2.Reducer阶段

对map输入的数据进行聚合

6.3.Driver阶段

相当于Yarn集群的客户端,用于提交整个程序到Yarn集群,提交的是封装了MapReduce程序运行参数的job对象。

二、Hadoop序列化

1.概述

1.1.什么是序列化

序列化就是把内存对象转成二进制形式,便于持久化或者网络传输

反序列化就是将二进制序列转换成内存对象

1.2.为什么要序列化

对象只存在于内存中,将对象序列化之后就可以持久化到磁盘或进行网络传输。

1.3.为什么不用java的序列化

java序列化是一个重量级序列化框架,传输效率比较低。所以,Hadoop自己开发了一套序列化机制(Writable)。

2.序列化使用

实现bean对象序列化步骤如下:

1)必须实现Writable接口

2)反序列化时,需要反射调用空的构造函数,所以必须有空的构造函数

public FlowBean() {
	super();
}

3)重写序列化方法

@Override
public void write(DataOutput out) throws IOException {
	out.writeLong(upFlow);
	out.writeLong(downFlow);
	out.writeLong(sumFlow);
}

4)重写反序列化方法

@Override
public void readFields(DataInput in) throws IOException {
	upFlow = in.readLong();
	downFlow = in.readLong();
	sumFlow = in.readLong();
}

5)注意反序列化的顺序和序列化的顺序完全一致

6)要想把结果显示在文件中,需要重写toString()。

7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框架中的Shuffle过程要求对key必须排序。

@Override
public int compareTo(FlowBean o) {
	// 倒序排列,从大到小
	return Long.compare(this.sumFlow,o.getSumFlow());
}

三、MapReduce框架原理

MapReduce的数据流如下图所示:

1.InputFormat数据输入

1.1.切片与MapTask并行度决定机制

问题引出:

MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。

思考1G数据启动8个MapTask,可以提高集群的并发处理能力。那么1K的数据,也启动8个MapTask,会提高集群性能吗?MapTask并行任务是否越多越好呢?哪些因素影响了MapTask并行度?

MapTask并行度决定机制:

数据块:Block是HDFS物理上把数据分成一块一块

数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片。

1.2.Job提交流程源码解析

waitForCompletion()

submit();

// 1建立连接
connect();	
// 1)创建提交Job的代理
new Cluster(getConfiguration());
// (1)判断是本地yarn还是远程
initialize(jobTrackAddr, conf); 

// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
// 1)创建给集群提交数据的Stag路径
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

// 2)获取jobid ,并创建Job路径
JobID jobId = submitClient.getNewJobID();

// 3)拷贝jar包到集群
copyAndConfigureFiles(job, submitJobDir);	
rUploader.uploadFiles(job, jobSubmitDir);

// 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);

// 5)向Stag路径写XML配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);

// 6)提交Job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

1.3.FileInputFormat切片源码解析

1)程序先找到数据存储的目录

2)开始遍历处理目录下的每个文件

3)遍历第一个文件ss.txt

        a)获取文件大小fs.sizeOf(ss.txt)

        b)计算切片大小computeSplitSize(Math.max(minSize,Math.min(maxSize,blockSize))) = blockSize = 128MB

        c)默认情况下,切片大小 = blockSzie

        d)开始切,形成第一个切片:ss.txt——0M:128M 第二个切片:ss.txt——128M:256M 第三个切片:ss.txt——256M:300M(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一个切片)

        e)将切片信息写到一个切片规划文件中

        f)整个切片的核心过程在getSplit()方法中完成

        g)InputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等

1.4.FileInputFormat切片机制

1)简单地按照文件内容的长度进行切片

2)切片大小,默认等于block大小

3)切片时不考虑数据集整体,而是逐个针对每个文件单独切片

1.5.CombineTextInputFormat切片机制

框架默认的TextInputFormat切片机制是任务按文件规划切片不管文件多小都会是一个单独的切都会交给一个MapTask这样如果有大量小文件产生大量的MapTask处理效率极其低下。

1)应用场景

CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中这样多个小文件可以交给一个MapTask处理

2)虚拟存储切片最大值设置

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m

注意虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。

3)切片机制

生成切片过程包括虚拟存储过程和切片过程二部分

1.6.自定义InputFormat

自定义InputFormat步骤如下:

1)自定义一个类继承FileInputFormat

2)改写RecordReader,实现一次读取一个完整文件封装为K-V

2.MapReduce工作流程

流程详解:

1MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中

2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件

3)多个溢出文件会被合并成大的溢出文件

4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序

5)ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据

6)ReduceTask会取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)

7)合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程

注意:

Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。

缓冲区的大小可以通过参数调整,参数:io.sort.mb默认100M。

3.Shuffle机制

Map方法之后Reduce方法之前的数据处理过程称之为Shuffle,如图:

3.1.分区

问题引出:要求将统计结果按条件输出到不同文件中(分区)。比如:将统计结果按手机归属地不同省份输出到不同文件中

默认分区是根据key的hashCode对Reducetask个数取模得到的,用户没法控制哪个key存储到哪个分区。

3.2.自定义分区

1)集成Partitioner,重写getPartition()方法

2)在Job驱动中,设置自定义Partitioner

job.setPartitionerClass(MyPartitioner.class);

3)自定义Partitioner之后,要根据自定义Partitioner逻辑设置相应数量的ReduceTask

job.setNumReduceTasks(5);

3.3.WritableComparable排序

排序是mapReduce框架中最重要的操作之一。

MapTask和Reduce均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序。

对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序

对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序

自定义排序:

1)原理分析

bean对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序

@Override
public int compareTo(FlowBean o) {

	int result;
		
	// 按照总流量大小,倒序排列
	if (sumFlow > bean.getSumFlow()) {
		result = -1;
	}else if (sumFlow < bean.getSumFlow()) {
		result = 1;
	}else {
		result = 0;
	}

	return result;
}

3.4.Combiner合并

(1)Combiner是MR程序中Mapper和Reducer之外的一种组件。

(2)Combiner组件的父类就是Reducer。

(3)Combiner和Reducer的区别在于运行的位置

        Combiner是在每一个MapTask所在的节点运行;

        Reducer是接收全局所有Mapper的输出结果;

(4)Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。

(5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应起来。

(6)自定义Combiner实现步骤

        (a)自定义一个Combiner继承Reducer,重写Reduce方法

public class WordcountCombiner extends Reducer<Text, IntWritable, Text,IntWritable>{

	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

        // 1 汇总操作
		int count = 0;
		for(IntWritable v :values){
			count += v.get();
		}

        // 2 写出
		context.write(key, new IntWritable(count));
	}
}

        (b)在Job驱动类中设置

job.setCombinerClass(WordcountCombiner.class);

3.5.GroupingComparator分组(辅助排序)

对Reduce阶段的数据根据某一个或几个字段进行分组。

分组排序步骤:

1)自定义类继承WritableComparator

2)重写compare()方法

@Override
public int compare(WritableComparable a, WritableComparable b) {
    // 比较的业务逻辑
	return result;
}

3)创建一个构造比较对象的类传父类

protected OrderGroupingComparator() {
	super(OrderBean.class, true);
}

4.MapTask工作机制

MapTask工作机制如图:

1)Read阶段:MapTask通过用户编写的RecordReader,从输入的InputSplit中解析出一个个的K-V对

2)Map阶段:该节点主要是将K-V对交给用户编写的map()函数,并产生一些列新的K-V对

3)Collect阶段:在用户编写的map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,他会调用Partitioner将生成的K-V分区,写入一个环形内存缓冲区中

4)Spill阶段:当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

5)Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

5.Reduce工作机制

Reduce工作机制,如图所示:

1)Copy阶段:ReduceTask各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。

3)Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。

4)Reduce阶段:reduce()函数将计算结果写到HDFS上。

6.OutputFormat数据输出

6.1.OutputFormat接口实现类

TextOutputFormat:默认的输出格式,它把每条记录写为文本行。

SequenceFileOutputFormat:格式紧凑,很容易被压缩。

6.2.自定义OutputFormat

1)自定义一个类继承FileOutputFormat

2)改写RecordWriter,具体改写输出数据的方式write()

7.Join的应用

7.1.Reduce Join

Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。

Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。

7.2.Map Join

Map Join适用于一张表十分小、一张表很大的场景

四、Hadoop数据压缩

1.MapReduce支持的压缩编码

压缩格式

hadoop自带?

算法

文件扩展名

是否可切分

换成压缩格式后,原来的程序是否需要修改

DEFLATE

是,直接使用

DEFLATE

.deflate

和文本处理一样,不需要修改

Gzip

是,直接使用

DEFLATE

.gz

和文本处理一样,不需要修改

bzip2

是,直接使用

bzip2

.bz2

和文本处理一样,不需要修改

LZO

否,需要安装

LZO

.lzo

需要建索引,还需要指定输入格式

Snappy

否,需要安装

Snappy

.snappy

和文本处理一样,不需要修改

2.压缩方式选择 

一般采用snappy压缩方式

优点:高速压缩速度和合理的压缩率

缺点:不支持split;需要安装

3.压缩位置选择

压缩可以在MapReduce作用的任意阶段启用

4.压缩参数配置

要在Hadoop中启用压缩,可以配置如下参数:

参数

默认值

阶段

建议

io.compression.codecs   

(在core-site.xml中配置)

org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec

 

输入压缩

Hadoop使用文件扩展名判断是否支持某种编解码器

mapreduce.map.output.compress(在mapred-site.xml中配置)

false

mapper输出

这个参数设为true启用压缩

mapreduce.map.output.compress.codec(在mapred-site.xml中配置)

org.apache.hadoop.io.compress.DefaultCodec

mapper输出

企业使用LZO或Snappy编解码器在此阶段压缩数据

mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置)

false

reducer输出

这个参数设为true启用压缩

mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置)

org.apache.hadoop.io.compress. DefaultCodec

reducer输出

使用标准工具或者编解码器,如gzip和bzip2

mapreduce.output.fileoutputformat.compress.type(在mapred-site.xml中配置)

RECORD

reducer输出

SequenceFile输出使用的压缩类型:NONE和BLOCK

五、Yarn资源调度器

Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而MapReduce等运算程序则相当于运行于操作系统之上的应用程序

1.Yarn基本架构

YARN主要由ResourceManager、NodeManager、ApplicationMaster和Container等组件构成,如图:

2.Yarn工作机制

yarn工作机制如图所示:

1)MR程序提交到客户端所在的节点。

2)YarnRunner向ResourceManager申请一个Application。

3)RM该应用程序的资源路径返回给YarnRunner

4)该程序将运行所需资源提交到HDFS上

5)程序资源提交完毕后,申请运行mrAppMaster

6)RM将用户的请求初始化成一个Task

7)其中一个NodeManager领取到Task任务。

8)该NodeManager创建容器Container,并产生MRAppmaster

9)Container从HDFS上拷贝资源到本地

10)MRAppmasterRM 申请运行MapTask资源。

11)RM运行MapTask任务分配给另外两个NodeManager另两个NodeManager分别领取任务创建容器。

12)MR向两个接收到任务的NodeManager发送程序启动脚本这两个NodeManager分别启动MapTaskMapTask数据分区排序。

13)MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask

14)ReduceTask向MapTask获取相应分区的数据。

15)程序运行完毕后,MR会向RM申请注销自己。

3.资源调度器

目前,Hadoop作业调度器主要有三种:FIFO、Capacity Scheduler和Fair Scheduler。Hadoop2.7.2默认的资源调度器是Capacity Scheduler。

具体设置详见:yarn-default.xml文件

<property>
    <description>The class to use as the resource scheduler.</description>
    <name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>

3.1.FIFO调度器

3.2.容量调度器

3.3.公平调度器

4.任务的推测执行

4.1.推测执行机制

发现拖后腿的任务,比如某个任务运行速度远慢于任务平均速度。为拖后腿任务启动一个备份任务,同时运行。谁先运行完,则采用谁的结果。

4.2.推测执行任务的前提条件

1)每个Task只能有一个备份任务

2)当前Job已完成的Task必须不小于0.05(5%)

3)开启推测执行参数设置。mapred-site.xml文件中默认是打开的。

<property>
  	<name>mapreduce.map.speculative</name>
  	<value>true</value>
  	<description>If true, then multiple instances of some map tasks may be executed in parallel.</description>
</property>

<property>
  	<name>mapreduce.reduce.speculative</name>
  	<value>true</value>
  	<description>If true, then multiple instances of some reduce tasks may be executed in parallel.</description>
</property>

 

来说两句吧
最新评论
    还没有人评论哦,快来坐沙发吧!