mapreduce编程:Hadoop MapReduce 并行编程模型和工作流

 2021-07-16 1:14    77  

MapReduce 并行编程模型MapReduce 是一种并行编程模型,用于大规模数据集的并行运算,它将复杂的、运行于大规模集群上的并行计算过程高度抽象到两个函数:Map和Reducemapreduce编程。

mapreduce编程:Hadoop MapReduce 并行编程模型和工作流程

Hadoop MapReduce运行在分布式文件系统HDFS上mapreduce编程。

mapreduce编程:Hadoop MapReduce 并行编程模型和工作流程

在MapReduce中,一个存储在分布式文件系统中的大规模中的大规模数据会被切分成许多独立的小数据块,这些小数据块可以被多个Map任务并行处理mapreduce编程。MapReduce框架会为每个Map任务输入一个数据子集,Map任务生成的结果会继续作为Reduce任务的输入,最终由Reduce任务输出最后结果,并写入分布式文件系统。

适合MapReduce 处理的数据集需要满足一个前提条件:

待处理的数据集可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进行处理。

MapReduce设计理念:计算向数据靠拢

MapReduce框架将Map程序就近地在HDFS数据所在的节点运行,即将计算节点和存储节点放在一起运行,从而减少节点间的数据移动开销。

MapReduce 工作流程MapReduce的核心思想:分而治之

把一个大的数据集拆分成多个小数据块在多台机器上并行处理,也就是说,一个大的MapReduce作业,首先会被拆分成许多个Map任务在多台机器上并行执行,每个Map任务通常运行在数据存储的节点上,这样,计算和数据就可以放在一起运行,不需要额外的数据传输开销。

当Map 任务结束后,会生成以<key, value>形式表示的许多中间结果。

为了让Reduce 可以并行处理Map的结果,需要对Map的输出进行一定的分区(Partition)、排序(Sort)、合并(Combine)、归并(Merge)等操作,得到<key, value-list> 形式的中间结果,再交给Reduce 进行处理,这个过程称为 Shuffle(洗牌)。

从无序的<key, value> 到有序的<key, value-list>,这个过程用Shuffle(洗牌)来称呼非常形象。

Reduce以一系列<key, value-list>中间结果作为输入,执行用户定义的逻辑,输出结果输出到分布式文件系统中。

小结一下MapReduce 执行全过程从分布式文件系统读入数据;执行Map任务输出中间结果;通过Shuffle 阶段把中间结果分区排序整理后发送给Reduce 任务;执行Reduce 任务得到最终结果,并写入分布式文件系统

Hadoop框架:MapReduce基本原理和入门案例

一、MapReduce概述1、基本概念Hadoop核心组件之一:分布式计算的方案MapReduce,是一种编程模型,用于大规模数据集的并行运算,其中Map(映射)和Reduce(归约)。

mapreduce编程:Hadoop MapReduce 并行编程模型和工作流程

MapReduce既是一个编程模型,也是一个计算组件,处理的过程分为两个阶段,Map阶段:负责把任务分解为多个小任务,Reduce负责把多个小任务的处理结果进行汇总。其中Map阶段主要输入是一对Key-Value,经过map计算后输出一对Key-Value值;然后将相同Key合并,形成Key-Value集合;再将这个Key-Value集合转入Reduce阶段,经过计算输出最终Key-Value结果集。

mapreduce编程:Hadoop MapReduce 并行编程模型和工作流程

2、特点描述MapReduce可以实现基于上千台服务器并发工作,提供很强大的数据处理能力,如果其中单台服务挂掉,计算任务会自动转义到另外节点执行,保证高容错性;但是MapReduce不适应于实时计算与流式计算,计算的数据是静态的。

mapreduce编程:Hadoop MapReduce 并行编程模型和工作流程

二、操作案例1、流程描述

mapreduce编程:Hadoop MapReduce 并行编程模型和工作流程

数据文件一般以CSV格式居多,数据行通常以空格分隔,这里需要考虑数据内容特点;

文件经过切片分配在不同的MapTask任务中并发执行;

MapTask任务执行完毕之后,执行ReduceTask任务,依赖Map阶段的数据;

ReduceTask任务执行完毕后,输出文件结果。

2、基础配置hadoop: # 读取的文件源 inputPath: hdfs://hop01:9000/hopdir/javaNew.txt # 该路径必须是程序运行前不存在的 outputPath: /wordOut3、Mapper程序public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> { Text mapKey = new Text(); IntWritable mapValue = new IntWritable(1); @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1、读取行 String line = value.toString(); // 2、行内容切割,根据文件中分隔符 String[] words = line.split(" "); // 3、存储 for (String word : words) { mapKey.set(word); context.write(mapKey, mapValue); } }}4、Reducer程序public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> { int sum ; IntWritable value = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { // 1、累加求和统计 sum = 0; for (IntWritable count : values) { sum += count.get(); } // 2、输出结果 value.set(sum); context.write(key,value); }}5、执行程序@RestControllerpublic class WordWeb { @Resource private MapReduceConfig mapReduceConfig ; @GetMapping("/getWord") public String getWord () throws IOException, ClassNotFoundException, InterruptedException { // 声明配置 Configuration hadoopConfig = new Configuration(); hadoopConfig.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName() ); hadoopConfig.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName() ); Job job = Job.getInstance(hadoopConfig); // Job执行作业 输入路径 FileInputFormat.addInputPath(job, new Path(mapReduceConfig.getInputPath())); // Job执行作业 输出路径 FileOutputFormat.setOutputPath(job, new Path(mapReduceConfig.getOutputPath())); // 自定义 Mapper和Reducer 两个阶段的任务处理类 job.setMapperClass(WordMapper.class); job.setReducerClass(WordReducer.class); // 设置输出结果的Key和Value的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //执行Job直到完成 job.waitForCompletion(true); return "success" ; }}6、执行结果查看将应用程序打包放到hop01服务上执行;

java -jar map-reduce-case01.jar

三、案例分析1、数据类型Java数据类型与对应的Hadoop数据序列化类型;

2、核心模块Mapper模块:处理输入的数据,业务逻辑在map()方法中完成,输出的数据也是KV格式;

Reducer模块:处理Map程序输出的KV数据,业务逻辑在reduce()方法中;

Driver模块:将程序提交到yarn进行调度,提交封装了运行参数的job对象;

四、序列化操作1、序列化简介序列化:将内存中对象转换为二进制的字节序列,可以通过输出流持久化存储或者网络传输;

反序列化:接收输入字节流或者读取磁盘持久化的数据,加载到内存的对象过程;

Hadoop序列化相关接口:Writable实现的序列化机制、Comparable管理Key的排序问题;

2、案例实现案例描述:读取文件,并对文件相同的行做数据累加计算,输出计算结果;该案例演示在本地执行,不把Jar包上传的hadoop服务器,驱动配置一致。

实体对象属性

public class AddEntity implements Writable { private long addNum01; private long addNum02; private long resNum; // 构造方法 public AddEntity() { super(); } public AddEntity(long addNum01, long addNum02) { super(); this.addNum01 = addNum01; this.addNum02 = addNum02; this.resNum = addNum01 + addNum02; } // 序列化 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(addNum01); dataOutput.writeLong(addNum02); dataOutput.writeLong(resNum); } // 反序列化 @Override public void readFields(DataInput dataInput) throws IOException { // 注意:反序列化顺序和写序列化顺序一致 this.addNum01 = dataInput.readLong(); this.addNum02 = dataInput.readLong(); this.resNum = dataInput.readLong(); } // 省略Get和Set方法}Mapper机制

public class AddMapper extends Mapper<LongWritable, Text, Text, AddEntity> { Text myKey = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 读取行 String line = value.toString(); // 行内容切割 String[] lineArr = line.split(","); // 内容格式处理 String lineNum = lineArr[0]; long addNum01 = Long.parseLong(lineArr[1]); long addNum02 = Long.parseLong(lineArr[2]); myKey.set(lineNum); AddEntity myValue = new AddEntity(addNum01,addNum02); // 输出 context.write(myKey, myValue); }}Reducer机制

public class AddReducer extends Reducer<Text, AddEntity, Text, AddEntity> { @Override protected void reduce(Text key, Iterable<AddEntity> values, Context context) throws IOException, InterruptedException { long addNum01Sum = 0; long addNum02Sum = 0; // 处理Key相同 for (AddEntity addEntity : values) { addNum01Sum += addEntity.getAddNum01(); addNum02Sum += addEntity.getAddNum02(); } // 最终输出 AddEntity addRes = new AddEntity(addNum01Sum, addNum02Sum); context.write(key, addRes); }}案例最终结果:

推荐阅读:GitHub源码和分类管理,持续更新

Hadoop框架:集群模式下分布式环境搭建

Hadoop框架:单服务下伪分布式集群搭建

Hadoop框架:NameNode工作机制详解

Hadoop框架:HDFS高可用环境配置

Hadoop框架:DataNode工作机制详解

Hadoop框架:HDFS简介与Shell管理命令

Hadoop框架:HDFS读写机制与API详解

本文标签:入门框架MapReduce

原文链接:https://www.xgfox.com/bcrm/958.html

本文版权:如无特别标注,本站文章均为原创。