链式MapReduce的概念
Hadoop2.0开始支持链式MapReduce作业,类似于工厂的生产线,每一个阶段都有特定的任务要处理,通过这样进一步的分工,从而提高了生成效率,这些Mapper可以像水流一样,一级一级向后处理,有点类似于Linux的管道,前一个Mapper的输出结果直接可以作为下一个Mapper的的输入,形成一个流水线,整个Job中只能有一个Reducer,在Reducer前面可以有一个或者多个Mapper,在Reducer的后面可以有0个或者多个Mapper。
链式Map
| 类型 | 描述 |
|||
| 顺序链接MapReduce作业 | 类似于Unix中的管道:mapreduce1 | mapreduce2 | mapreduce3 ……,每一个阶段创建一个job,并将当前输入路径设为前一个的输出,在最后阶段删除链上生成的中间数据。 |
| 具有复杂依赖的MapReduce链接 | 若mapreduce1处理一个数据集,mapreduce2 处理另一个数据集,而mapreduce3对前两个做内部连结,这种情况通过Job和JobControl类管理非线性作业间的依赖,如x.addDependingJob(y)意味着x在y完成前不会启动。 |
| 预处理和后处理的链接 | 一般将预处理和后处理写为Mapper任务,可以自己进行链接或使用ChainMapper和ChainReducer类,生成得作业表达式类似于:MAP+ | REDUCE | MAP如以下作业 Map1 | Map2 | Reduce | Map3 | Map4,把Map2和Reduce视为MapReduce作业核心,Map1作为前处理,Map3,Map4作为后处理,ChainMapper使用模式:(预处理作业),ChainReducer使用模式:(设置Reducer并添加后处理Mapper) |
实验案例
实验环境
操作系统:Linux Ubuntu 19.04
JDK版本:jdk7u75linuxx64
Hadoop版本:hadoop2.6.0cdh5.4.5
开发工具:eclipsejavajunoSR2linuxgtkx86_64
实验步骤
1、启动Hadoop:切换到/apps/hadoop/sbin目录下,执行./startall.sh
命令启动Hadoop。
2、创建本地目录:在Linux本地新建/data/mapreduce10目录,用于存放实验数据。
3、下载数据文件:使用wget命令从指定网址下载文本文件goods_0和项目所需的依赖包hadoop2lib.tar.gz。
4、上传数据文件:在HDFS上新建/mymapreduce10/in目录,然后将本地/data/mapreduce10目录下的goods_0文件导入到HDFS的/mymapreduce10/in目录中。
5、新建Java Project:在Eclipse中新建名为mapreduce10的项目,并在该项目下新建mapreduce包以及ChainMapReduce类。
6、添加依赖jar包:将hadoop2lib目录中的jar包拷贝到项目的hadoop2lib目录下,并添加到项目的Build Path中。
7、编写程序代码:编写ChainMapReduce类的代码,实现链式MapReduce的逻辑。
8、执行程序:运行ChainMapReduce程序,查看结果数据。
实验结果
经过链式MapReduce的处理,最终得到的结果数据如下:
商品名称 | 点击量 |
帽子 | 27.0 |
鞋子 | 30.0 |
链式MapReduce操作的概念
在以往的MapReduce案例中,无论是简单的WordCount还是比较复杂的使用MR统计社交共同好友的MapReduce作业都仅仅包含一个Map类和Reducer类,这就使得MR作业在实现某些复杂的程序时会遇到“有心无力”的尴尬问题,为此,Hadoop的MR有一个链式操作应运而生,所谓的链式操作一句话概括就是:在一个MapReduce作业中可以存在多个Map类,但是至多只能存在一个Reducer类,且可以在Reduce操作后继续执行Map操作,通过合理地增加Map类可以使得MapReduce作业可以处理非常复杂的业务以及实现复杂的算法。
需求分析与数据准备
现有某电商一天商品浏览情况数据goods_0,功能为在第一个Mapper里面过滤掉点击量大于600的商品,在第二个Mapper中过滤掉点击量在100~600之间的商品,Reducer里面进行分类汇总并输出,在Reducer后的Mapper里过滤掉商品名长度大于或等于3的商品。
实验数据如下:
商品名称 | 点击量 |
袜子 | 189 |
毛衣 | 600 |
裤子 | 780 |
鞋子 | 30 |
呢子外套 | 90 |
牛仔外套 | 130 |
羽绒服 | 7 |
帽子 | 21 |
帽子 | 6 |
羽绒服 | 12 |
代码编写
//第一个阶段的Mapper public class Map1 extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("t"); String productName = fields[0]; int clickCount = Integer.parseInt(fields[1]); if (clickCount <= 600) { context.write(new Text(productName), new IntWritable(clickCount)); } } } //第二个阶段的Mapper public class Map2 extends Mapper<Text, IntWritable, Text, IntWritable> { @Override protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException { if (value.get() < 100) { context.write(key, value); } } } //Reducer阶段 public class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } //第三个阶段的Mapper public class Map3 extends Mapper<Text, IntWritable, Text, IntWritable> { @Override protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException { if (key.toString().length() < 3) { context.write(key, value); } } }
FAQs
问题1:链式MapReduce有哪些优点?
解答:链式MapReduce的优点主要包括以下几点:
1、提高效率:通过流水线式的处理方式,每个Mapper的输出直接作为下一个Mapper的输入,减少了中间数据的存储和读取开销,提高了处理效率。
2、灵活性强:可以根据具体需求灵活配置多个Mapper和Reducer,实现复杂的业务逻辑和算法。
3、易于维护:通过将复杂的任务分解为多个小任务,每个任务相对独立,便于开发和维护。
问题2:链式MapReduce有什么局限性?
解答:链式MapReduce的局限性主要体现在以下几个方面:
1、Reduce次数限制:整个Job中只能有一个Reducer,这在一定程度上限制了处理逻辑的复杂度。
2、代码量增加:由于需要编写多个Mapper类,代码量相对较大,增加了开发和维护的难度。
3、调试难度增加:多个Mapper串联在一起,调试过程中需要逐个检查每个Mapper的输出,增加了调试的难度。
原创文章,作者:未希,如若转载,请注明出处:https://www.kdun.com/ask/1095665.html
本网站发布或转载的文章及图片均来自网络,其原创性以及文中表达的观点和判断不代表本网站。如有问题,请联系客服处理。
发表回复