MapReduce中如何实现多个Map函数的串联?

MapReduce是一个编程模型,用于处理和生成大数据集,通常在分布式系统上运行。它包括两个主要阶段:Map(映射)和Reduce(归约)。,,1. **Map 阶段**:输入数据被拆分成多个独立的块,每个块由一个Map任务处理。Map函数将输入的键值对转换成中间的键值对集合。,2. **Shuffle 和 Sort 阶段**:Map任务完成后,系统会对中间输出进行排序和分区,以便相同键的所有值都集中在一起。,3. **Reduce 阶段**:Reduce任务接收到排序和分区后的数据,并对每个唯一的键应用Reduce函数,将其所有关联的值合并成一个更小的集合。,,这种模型特别适用于大规模数据处理任务,如文本分析、日志处理和科学计算等。

链式MapReduce的概念

MapReduce中如何实现多个Map函数的串联?

Hadoop2.0开始支持链式MapReduce作业,类似于工厂的生产线,每一个阶段都有特定的任务要处理,通过这样进一步的分工,从而提高了生成效率,这些Mapper可以像水流一样,一级一级向后处理,有点类似于Linux的管道,前一个Mapper的输出结果直接可以作为下一个Mapper的的输入,形成一个流水线,整个Job中只能有一个Reducer,在Reducer前面可以有一个或者多个Mapper,在Reducer的后面可以有0个或者多个Mapper。

链式Map

| 类型 | 描述 |

|||

| 顺序链接MapReduce作业 | 类似于Unix中的管道:mapreduce1 | mapreduce2 | mapreduce3 ……,每一个阶段创建一个job,并将当前输入路径设为前一个的输出,在最后阶段删除链上生成的中间数据。 |

| 具有复杂依赖的MapReduce链接 | 若mapreduce1处理一个数据集,mapreduce2 处理另一个数据集,而mapreduce3对前两个做内部连结,这种情况通过Job和JobControl类管理非线性作业间的依赖,如x.addDependingJob(y)意味着x在y完成前不会启动。 |

| 预处理和后处理的链接 | 一般将预处理和后处理写为Mapper任务,可以自己进行链接或使用ChainMapper和ChainReducer类,生成得作业表达式类似于:MAP+ | REDUCE | MAP如以下作业 Map1 | Map2 | Reduce | Map3 | Map4,把Map2和Reduce视为MapReduce作业核心,Map1作为前处理,Map3,Map4作为后处理,ChainMapper使用模式:(预处理作业),ChainReducer使用模式:(设置Reducer并添加后处理Mapper) |

实验案例

实验环境

操作系统:Linux Ubuntu 19.04

JDK版本:jdk7u75linuxx64

Hadoop版本:hadoop2.6.0cdh5.4.5

开发工具:eclipsejavajunoSR2linuxgtkx86_64

实验步骤

1、启动Hadoop:切换到/apps/hadoop/sbin目录下,执行./startall.sh命令启动Hadoop。

2、创建本地目录:在Linux本地新建/data/mapreduce10目录,用于存放实验数据。

3、下载数据文件:使用wget命令从指定网址下载文本文件goods_0和项目所需的依赖包hadoop2lib.tar.gz。

4、上传数据文件:在HDFS上新建/mymapreduce10/in目录,然后将本地/data/mapreduce10目录下的goods_0文件导入到HDFS的/mymapreduce10/in目录中。

5、新建Java Project:在Eclipse中新建名为mapreduce10的项目,并在该项目下新建mapreduce包以及ChainMapReduce类。

6、添加依赖jar包:将hadoop2lib目录中的jar包拷贝到项目的hadoop2lib目录下,并添加到项目的Build Path中。

7、编写程序代码:编写ChainMapReduce类的代码,实现链式MapReduce的逻辑。

8、执行程序:运行ChainMapReduce程序,查看结果数据。

MapReduce中如何实现多个Map函数的串联?

实验结果

经过链式MapReduce的处理,最终得到的结果数据如下:

商品名称 点击量
帽子 27.0
鞋子 30.0

链式MapReduce操作的概念

在以往的MapReduce案例中,无论是简单的WordCount还是比较复杂的使用MR统计社交共同好友的MapReduce作业都仅仅包含一个Map类和Reducer类,这就使得MR作业在实现某些复杂的程序时会遇到“有心无力”的尴尬问题,为此,Hadoop的MR有一个链式操作应运而生,所谓的链式操作一句话概括就是:在一个MapReduce作业中可以存在多个Map类,但是至多只能存在一个Reducer类,且可以在Reduce操作后继续执行Map操作,通过合理地增加Map类可以使得MapReduce作业可以处理非常复杂的业务以及实现复杂的算法。

需求分析与数据准备

现有某电商一天商品浏览情况数据goods_0,功能为在第一个Mapper里面过滤掉点击量大于600的商品,在第二个Mapper中过滤掉点击量在100~600之间的商品,Reducer里面进行分类汇总并输出,在Reducer后的Mapper里过滤掉商品名长度大于或等于3的商品。

实验数据如下:

商品名称 点击量
袜子 189
毛衣 600
裤子 780
鞋子 30
呢子外套 90
牛仔外套 130
羽绒服 7
帽子 21
帽子 6
羽绒服 12

代码编写

//第一个阶段的Mapper
public class Map1 extends Mapper<LongWritable, Text, Text, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] fields = value.toString().split("t");
        String productName = fields[0];
        int clickCount = Integer.parseInt(fields[1]);
        if (clickCount <= 600) {
            context.write(new Text(productName), new IntWritable(clickCount));
        }
    }
}
//第二个阶段的Mapper
public class Map2 extends Mapper<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
        if (value.get() < 100) {
            context.write(key, value);
        }
    }
}
//Reducer阶段
public class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
}
//第三个阶段的Mapper
public class Map3 extends Mapper<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
        if (key.toString().length() < 3) {
            context.write(key, value);
        }
    }
}

FAQs

问题1:链式MapReduce有哪些优点?

解答:链式MapReduce的优点主要包括以下几点:

1、提高效率:通过流水线式的处理方式,每个Mapper的输出直接作为下一个Mapper的输入,减少了中间数据的存储和读取开销,提高了处理效率。

2、灵活性强:可以根据具体需求灵活配置多个Mapper和Reducer,实现复杂的业务逻辑和算法。

3、易于维护:通过将复杂的任务分解为多个小任务,每个任务相对独立,便于开发和维护。

问题2:链式MapReduce有什么局限性?

解答:链式MapReduce的局限性主要体现在以下几个方面:

1、Reduce次数限制:整个Job中只能有一个Reducer,这在一定程度上限制了处理逻辑的复杂度。

2、代码量增加:由于需要编写多个Mapper类,代码量相对较大,增加了开发和维护的难度。

3、调试难度增加:多个Mapper串联在一起,调试过程中需要逐个检查每个Mapper的输出,增加了调试的难度。

原创文章,作者:未希,如若转载,请注明出处:https://www.kdun.com/ask/1095665.html

(0)
未希的头像未希新媒体运营
上一篇 2024-09-28 15:00
下一篇 2024-09-28 15:02

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

云产品限时秒杀。精选云产品高防服务器,20M大带宽限量抢购  >>点击进入