mapreduce编程:Hadoop核心学习之MapReduce编程入门

 2021-07-16 1:13    77  

在IT技术领域,大数据是热点,大数据技术也是热点,以Hadoop为例,作为主流的第一代大数据技术框架,可以说是入门必学mapreduce编程。而学习Hadoop,通常从核心框架HDFS和MapReduce学起,今天我们就主要来讲讲Hadoop MapReduce编程入门。

mapreduce编程:Hadoop核心学习之MapReduce编程入门

mapreduce编程:Hadoop核心学习之MapReduce编程入门

MapReduce入门简介MapReduce是Hadoop的核心框架之一mapreduce编程,主要负责分布式并行计算。MapReduce 既是计算框架,也是编程模型,主要基于Java语言来编程,这也是为什么Hadoop学习要求要有一定的Java基础。

mapreduce编程:Hadoop核心学习之MapReduce编程入门

当然mapreduce编程,在这几年的发展当中,MapReduce的计算性能受到诟病,取而代之受到重用的是Spark。但是作为初代计算框架,MapReduce的学习仍然是有必要的,也对于同样基于MapReduce计算模型的Spark学习,有很好的引入。

MapReduce编程入门MapReduce运行过程,通常涉及到input、split、map、shuffle、reduce、output几个阶段,其中shuffle过程包括sort、copy、combine操作,reduce之前有时涉及二次排序。

MapReduce编程,主要有三种方式:

Hadoop streaming执行MapreduceHive执行MapreduceJava MR编程

①Hadoop streaming执行MapReduce

优点:

可以用大多数语言开发;

代码量少,开发速度快;

方便本地调试。

不足:

只能通过参数控制MR框架,控制性较弱,比如定制partitioner、combiner;

支持的数据类型和数据结构有限,不适合做复杂处理,处理字符型较多;

②Hive执行MapReduce

将类SQL转换成MapReduce,定位于数据仓库。

优点:

开发速度快,易调试,易理解;

易于构建数据仓库模型;

内置函数功能齐全,比如rownumber等窗口函数;

可扩展性好,比如自定义存储格式、自定义函数UDF;

多接口,比如JDBC、Thrift、Rest等。

缺点:

不能用于复杂计算,比如涉及时序处理的数据;

可控制性较弱,比如partition、关联等操作。

③Java MR编程

用Java编写MR,可以说是最“原始”的一种方式,Java面向对象编程,设计模式成熟,通用性好,并且Java方面第三方类库非常丰富。

优点:

定制性强,比如定制partitioner、定制combiner等;

数据类型和数据结构丰富,队列、堆栈、自定义类等使用方便;

控制性非常高,包括MR运行过程的一些控制,Map端join等;

可以方便使用Hadoop组件类库中的类或工具,比如HDFS文件操作类等。

缺点:

相比Hive、Hadoop streaming或Pyspark,开发代码量较大,对开发环境要求高且不易调试;

通常每个操作都要写一个MR类;

不如Spark执行效率高。

关于Hadoop核心学习,MapReduce编程入门,以上就是今天的内容分享了。在大数据当中,Hadoop在基础架构上还是占据着重要的地位,而Hadoop MapReduce的学习,也仍有其意义所在,还是应该有相应的学习和掌握。

MapReduce编程模型

mapreduce编程:Hadoop核心学习之MapReduce编程入门

mapreduce编程:Hadoop核心学习之MapReduce编程入门

MapReduce Programming Model在 Hadoop 问世之前,分布式计算也是存在的,但是没有通用的解决方案,只能专门处理某一类计算。 这大概是跟 不使用任何Web框架和系统库来开发Web程序 的感觉差不多。

MapReduce 既是一个编程模型,又是一个计算框架。作为编程模型来说, MapReduce 并不是一个“神奇”的东西。但是,作为 大数据计算框架 来说,它的出现使得开发大数据应用的门槛降低了许多,就跟Web框架的应用一个道理。

大数据计算框架解决的是什么呢?简单来说,就是:

如何基于分布式存储分配计算资源如何调度计算任务今天这篇文章先介绍一下作为编程模型的 MapReduce 。计算框架的介绍放在下一篇。

MarReduce - 编程模型由于我们的数据是存放在分布式文件系统中,自然不能用传统的编程模型来完成任务了。 接下来我用3个代码例子来解释传统编程模型和分布式模型的区别,以经典的 WordCount 程序为例。

Example 1: Python语言单机版WordCount我们可以创建一个 Hash Table ,然后遍历文本中的每一个单词。 如果在 Hash Table 中存在,就将 key(单词)的 value +1 ; 否则将单词作为 key 添加到 Hash Table 中。

# %%# 文本前期处理strlList = """Hello WorldBye WorldHello WorldBye World"""strlList = strlList.replace('\n', ' ').lower().split(' ')countDict = {}# %%# 如果字典里有该单词则加1,否则添加入字典for w in strlList: if w in countDict: countDict[w] += 1 else: countDict[w] = 1# %%print(countDict)# output:# {'hello': 2, 'world': 4, 'bye': 2}复制代码如果文本很长,我们担心不能把文本一次性加载到内存中,也可以使用 Generator 特性来完成迭代操作。但是这样将会花费很长的 时间 。这是第一个问题。 我们自然会想到可以通过多进程的方式来增加处理速度,对文本设置多个偏移量,并行地处理。

但是数据量大到一个机器无法装得下的情况下,就要考虑如何将数据分散地存放到一个集群中了。有一个经典的比喻:

如果有一块巨大的石头拉不动,你不会找一头巨型牛来拉,而会去找一群牛来一齐拉。

那么如何将数据 分散地 存放在一个集群中,并尽可能快地处理这些数据呢,这是第二个问题。

试想一下,如果让你来设计一个并行处理分散在多个服务器上的数据的系统,你会考虑哪些问题? 明显地,这需要结合数据源来设计,因为数据源(HDFS)是采用分布式设计的,所以像上面这样的单机版逻辑就不满足需求了。

Example 2: Shell脚本的MapReduce现在我们来看一个复杂的,不用大数据框架来完成的MapReduce模型的计算任务。这是一个基于 Shell 脚本实现的程序。

这个示例来自这本书 - Hadoop: The Definitive Guide, 4th Edition

以下的示例,我们编写一个挖掘天气数据的程序。气象传感器每小时在全球许多地方收集数据,并收集大量的日志数据,这是使用MapReduce进行分析的理想选择,因为我们要处理所有数据,并且数据是半结构化(semi-structured)且记录导向的(record-oriented)。

资料格式我们将使用的数据来自 国家气候数据中心 (National Climatic Data Center)(NCDC)。 数据是使用行导向的(line-oriented)ASCII格式存储的,其中每一行都是一条记录。该格式包含一组丰富的气象元素,其中许多是 可选的 或者具有可变的数据长度。 为简单起见,我们关注一些始终存在且宽度固定的基本元素,例如 温度 。

示例2-1显示了一个示例行,其中标注了一些显著的字段。该行已经被分为多行以显示每个字段;在实际的文件中,这些字典打包在同一行,没有分行。

示例2-1 NCDC记录的格式0057332130 # USAF weather station identifier99999 # WBAN weather station identifier19500101 # observation date0300 # observation time4+51317 # latitude (degrees x 1000)+028783 # longitude (degrees x 1000)FM-12+0171 # elevation (meters)99999V020320 # wind direction (degrees)1 # quality codeN0072100450 # sky ceiling height (meters)1 # quality codeCN010000 # visibility distance (meters)1 # quality codeN9-0128 # air temperature (degrees Celsius x 10)1 # quality code-0139 # dew point temperature (degrees Celsius x 10)1 # quality code10268 # atmospheric pressure (hectopascals x 10)1 # quality code复制代码NOTE: 在一个文件中应该保存着多行这样的数据,我们可以看到这样一行数据中有气象站的代号、日期、时间、温度等重要信息。这一行信息的时间单位可能是一天,也可能是几个小时。

数据文件按日期和气象站来组织。从1901年到2001年,每年都有一个目录,每个目录都包含每个气象站的压缩文件以及该年的读书。例如,以下是1990年的第一个条目:

% ls raw/1990 | head010010-99999-1990.gz010014-99999-1990.gz010015-99999-1990.gz010016-99999-1990.gz010017-99999-1990.gz010030-99999-1990.gz010040-99999-1990.gz010080-99999-1990.gz010100-99999-1990.gz010150-99999-1990.gz复制代码NOTE: 例如 010010-99999-1990.gz 这个文件中应该是包含一个文件,该文件中包含多条 示例2-1 那样的数据。 99999 是某个气象站的编号。 由上面的例子推测,1990年,编号为 99999 的气象站可能产生很多个上面那样的 gz 压缩文件,其中每个 gz 的内容为多条 示例2-1 的数据。

有数万个气象站,因此整个数据集由大量的相对小的文件组成。通常来说,处理较少数量的相对较大的文件更容易,更高效,因此我们先对数据做了预处理,以便将每年的读数合并为一个文件。

资料预处理由于上述的原因,我们需要先对这些大量的小文件先进行预处理,我们想将一年的数据压缩成一个文件。 我们使用一个剧有 map 函数的程序来做这件事,不需要 reduce 函数,因为我们不需要并行地合并, map 函数可以并行执行所有文件处理。

这段处理过程已经使用了Hadoop的MapReduce框架,所以在此就省略了,感兴趣的可以点击这里查看这个程序的源码: MapReduce - streming接口示例

处理完成后的文件列表是这样的:

% ls -1 all/1901.tar.bz21902.tar.bz21903.tar.bz2...2000.tar.bz2复制代码每个 tar 文件都包含所有气象站的当年的读数的文件,并使用 gzip 压缩。(也就是说 1901.tar.bz2 中保存的还是压缩文件,所以这个 1901.tar.bz2 本身的 bzip2 压缩是多余的)

% tar jxf 1901.tar.bz2% ls 1901 | head029070-99999-1901.gz029500-99999-1901.gz029600-99999-1901.gz029720-99999-1901.gz029810-99999-1901.gz227070-99999-1901.gz复制代码使用Unix工具分析数据现在我们的需求是计算每一年中全球的最高气温。看看用 shell 脚本要如何处理:

我们使用处理行导向的经典工具 - awk 。 参照 示例2-2

示例2-2 一个从NCDC气象记录中逐年查找最高记录温度的程序#!/usr/bin/env bashfor year in all/*do echo -ne `basename $year .gz`"\t" gunzip -c $year | \ awk '{ temp = substr($0, 88, 5) + 0; q = substr($0, 93, 1); if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp } END { print max }'done复制代码该脚本循环遍历压缩的 year 文件,首先打印年份,然后使用 awk 处理每个文件。 awk 脚本从数据提取两个字段: 空气温度 (air temperature)和 质量代码 (quality code)。 空气温度值通过加0变成整数。 接下来,进行测试以查看温度是否有效(9999这个值表示NCDC数据集中的缺失值)以及质量代码表示读数是可信的还是错误的。 如果读数正常,则将该值与到目前为止看到的最大值进行比较,如果找到新的最大值,则将更新该最大值。 END 代码块将在 awk 处理完所有行后被执行,并输出最大值。

脚本运行之后看起来像这样:

% ./max_temperature.sh1901 3171902 2441903 2891904 2561905 283...复制代码源文件中的温度值放大了10倍,因此得出1901年的最高温度为31.7°C(本世纪初读数很少,所以这是合理的)。 计算这一个世纪每年的最高温,这个程序完全运行在一台 EC2 High-CPU Extra Large 实例上,一次运行了42分钟。

为了加快处理速度,我们需要并行运行程序的各个部分。从理论上讲,这很简单:我们可以使用机器上所有可用的硬件线程,在不同的过程中处理不同的年份。但是,这有一些问题。

首先,将 工作 分成相等大小的部分并不总是那么容易。在这种情况下,不同年份的文件大小会有很大差异,因此某些进程将比其他进程更早完成。即使他们接手进一步的工作,整个运行仍以最长的文件为主。尽管需要更多工作,但是更好的方法是将输入拆分为固定大小的块,并将每个块分配给一个进程。

NOTE: 抽象地考虑,这里的 工作 有 计算 和 存储 两层含义。所以大数据处理在收集原始数据的时候就要和传统编程方式下不一样了。

其次,将独立进程的结果合并可能需要进一步处理。在这种情况下,每年的结果与其他年份无关,可以通过合并所有结果并按年份排序来合并它们。如果使用固定大小的块方法,则组合会更加精细。对于此示例,特定年份的数据通常会分为几个块,每个块独立处理。我们将得出每个块的最高温度,因此最后一步是寻找每年这些最高温度中的最高温度。

在资料预处理的阶段我们使用了Hadoop的Map功能,上述的shell脚本则是相当于一个Reduce功能的程序。

第三,您仍然受到单台计算机处理能力的限制。如果您现有的处理器数量可以达到的最佳时间是20分钟,那就是这样了。您无法使其运行更快。此外,某些数据集超出了单台计算机的容量。当我们开始使用多台机器时,就会需要考虑许多其他因素了,主要是协调性和可靠性的范畴。谁负责整体工作?我们如何处理失败的进程?

因此,尽管并行处理是可行的,但在实践中却很棘手。使用像 Hadoop 这样的框架来解决这些问题是一个很好的选择。

事实上,您会发现不得不开发一个像样的分布式系统来做这样的事。

Example 3: Java语言使用Hadoop的MapReduceHadoop 在并行处理上有优势,要使用 Hadoop ,我们需要将查询(query)表示为 MapReduce 作业(job)。经过一些本地的小规模测试之后,我们将能够在一组机器上运行它。

Map and ReduceMapReduce 通过将处理分为两个阶段进行工作:Map阶段和Reduce阶段。 每个阶段都有键值对作为输入和输出,程序员可以选择其类型。程序员还需要指定用于Map和Reduce的函数 - map函数和reduce函数。

Map阶段的输入是原始NCDC数据。输入的数据是一个键值对(key-value pair)。 我们选择一种文本输入格式,该格式将数据集 (dataset)中的每一行作为文本类型,并作为值(value)处理。 键(key)是文件的开头到该行的开头的 偏移量 。

NOTE: 文件开头的偏移量是 0 ;第1行有 120 个字符,那么第二行的偏移量则是 121 ;以此类推。

我们的map函数很简单。我们提取年份和气温,因为这是我们唯一感兴趣的字段。在这种情况下,map函数只是数据准备阶段,以便reduce函数可以执行以下操作: 找出每年的最高温度。 map函数也是删除不良记录的好方法:在这里,我们可以过滤掉缺失的、可疑的或错误的温度。

为了可视化map的工作方式,请考虑以下输入数据提示示例(已经删除一些未使用的列以适配页面,用省略号表示):

0067011990999991950051507004...9999999N9+00001+99999999999...0043011990999991950051512004...9999999N9+00221+99999999999...0043011990999991950051518004...9999999N9-00111+99999999999...0043012650999991949032412004...0500001N9+01111+99999999999...0043012650999991949032418004...0500001N9+00781+99999999999...复制代码这些行以键值对的形式呈现给map函数:

(0, 006701199099999 1950 051507004 ... 9999999N9 + 00001 + 99999999999...)

(106, 004301199099999 1950 051512004 ... 9999999N9 + 00221 + 99999999999...)

(212, 004301199099999 1950 051518004 ... 9999999N9- 00111 + 99999999999...)

(318, 004301265099999 1949 032412004 ... 0500001N9 + 01111 + 99999999999...)

(424, 004301265099999 1949 032418004 ... 0500001N9 + 00781 + 99999999999...)

key是行号在文件中的偏移量,我们在map函数中将其忽略。map函数仅提取年份和气温(以粗体显示),并将其作为输出输出(温度值已解释为整数):

(1950, 0) (1950, 22) (1950, −11) (1949, 111) (1949, 78)复制代码map函数的输出在发送给reduce函数之前,由MapReduce框架处理。此处理会根据Key来对此key-value对进行排序和分组。因此,继续该示例,我们的reduce函数将会看到以下输入:

(1949, [111, 78])(1950, [0, 22, −11])复制代码每年都会显示所有气温读数的列表,所有reduce函数现在要做的是遍历列表并获取最大读书:

(1949, 111)(1950, 22)复制代码这是最终输出:每年记录的最高全球温度。

NOTE: map 函数的输入主要是一个 <Key, Value> 对,在这个例子里,Value 是要统计的所有文本中的一行数据,Key 在一般计算中都不会用到。

整个数据流 如图2-1所示 。 该图的底部是一个Unix管道,它模仿了整个MapReduce流,当我们研究Hadoop Streaming时。以后研究Hadoop Streaming时,我们会在此看到。

图2-1。MapReduce逻辑数据流

JAVA MAPREDUCE上面我们看了MapReduce程序工作的机制,下一步是要通过代码表达它。我们需要三件事:一个map函数,一个reduce函数以及一些代码以运行这个job。map函数由Mapper类来表示,该类声明一个抽象map()方法。 示例2-3 显示了我们的map函数的实现。

示例2-3 最高温度的Mapper示例import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private static final int MISSING = 9999; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if (airTemperature != MISSING && quality.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } }}复制代码Mapper类是一个通用类型(Generic Type),具有四个形式参数以指定输入键(input key)、输入值(input value)、输出键(output key)和输出值(output value)的类型。

对于本示例,输入键是一个长整数偏移量,输入值是一行文本,输出键是年份,输出值是气温(整数)。 Hadoop不使用内置的Java类型,而是提供了自己的一组基本类型。这些类型针对网络序列化进行了优化。这些类型位于 org.apache.hadoop.io 这个软件包。 在这里,我们使用LongWritable,其对应于一个Java Long,Text(如Java String),和IntWritable(如Java Integer)。

该map()方法传递了一个键和一个值。我们将Text包含输入行的值转换为Java String,然后使用其substring()方法提取我们感兴趣的列。

该map()方法还提供了一个Context实例,用于将输出写入到reduce()。在这个例子中,我们将年份(year)输出为一个Text对象(因为我们只将它当作一个key),然后我们用IntWritable来包装。 我们只有在温度值不为空并且其品质代码(quality code)表示气温读书为OK时才会输出一个记录。

与map函数相似,reduce函数使用Reducer类来实现,如下面的 示例2-4 。

示例2-4 计算最高温度的Reducer示例import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); }}复制代码同样,四个形式参数用于指定输入和输出类型,这一次是reduce函数。reduce函数的输入类型必须与map函数的输出类型匹配:Text和IntWritale。 在这个示例中,reduce函数的输出类型是Text和IntWritable,代表了年份和其最高温度,这是通过迭代温度并将每个温度与迄今为止找到的最高记录进行比较而得出的。

第三段代码运行这个MapReduce作业(请参见 示例2-5 )。

示例2-5 在天气数据集中查找最高温的应用import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class MaxTemperature { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: MaxTemperature <input path> <output path>"); System.exit(-1); } Job job = new Job(); job.setJarByClass(MaxTemperature.class); job.setJobName("Max temperature"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); }}复制代码我们需要遵循Job对象的规范来实现这个作业,使您可以控制作业的运行方式。当我们在Hadoop集群上运行此作业时,我们会将代码打包到一个JAR文件中(Hadoop将在集群中分发该文件)。 无需显示指定JAR文件的名称,我们可以在Job的setJarByClass()方法中传递一个类,Hadoop将通过查找包含此类的JAR文件来使用该类来找到相关的JAR文件。

构造Job对象后,我们指定输入和输出路径。通过调用FileInputFormat上的静态的addInputPath()方法,来指定输入路径,并且它可以是一个单个的文件,或者一个目录(这种情况下,输入将会由该目录的所有文件组成),或者是一个文件模式(正则表达式)。 顾名思义,addInputPath()可以被调用多次来添加多个路径。

输出路径(只有一个)是通过调用FileOutputFormat上的静态的setOutputPath()方法来指定的。 它指定了reduce函数输出文件的目录。 在运行作业之前,该目录应该存在,因为Hadoop会抱怨而不运行作业。此预防措施是为了防止数据丢失(意外地用一个作业的输出覆盖另一个长作业的输出会很烦人)。

接下来,我们通过setMapperClass()和setReducerClass()方法来指定map和reduce的类型。

该setOutputKeyClass()和setOutputValueClass()方法为reduce函数控制输出类型,并且必须和Reduce类的产出相匹配。 map输出类型默认为相同的类型,因此,如果mapper生成与reducer相同的类型,则不需要设置它们(就像我们的例子一样)。 但是,如果它们不同,则必须使用setMapOutputKeyClass()和setMapOutputValueClass()方法设置map输出类型。

输入类型是通过输入格式来控制的,由于我们使用的是默认的TextInputFormat,因此尚未明确设置输入格式。

在设置了定义map和reduce函数的类之后,我们就可以运行该作业了。在waitForCompletion()对方法Job提交作业并等待它完成。该方法的单个参数是指示是否生成详细输出的标志。如果为true,则作业将有关其进度的信息写入控制台。

waitForCompletion()方法的返回值是一个布尔值,指示成功(true)或失败(false),我们将其转换为程序的退出代码0或1。

小结今天我们学习了MapReduce编程模型。这个模型既简单又强大,简单是因为它只包含Map和Reduce两个过程,强大之处在于它可以实现大数据领域几乎所有的计算需求。这也正是MapReduce这个模型令人着迷的地方。

本文标签:编程模型

原文链接:https://www.xgfox.com/alpx/957.html

本文版权:如无特别标注,本站文章均为原创。