链式MapReduce操作的概念
在Hadoop MapReduce中,一个作业通常包含一个Mapper类和一个Reducer类,这种结构在处理简单任务时非常有效,但在面对复杂业务逻辑时可能显得力不从心,为了解决这个问题,Hadoop提供了链式MapReduce操作,允许在一个作业中包含多个Mapper类,但至多只能有一个Reducer类,通过增加Mapper的数量,可以更灵活地处理复杂的数据处理需求。
链式MapReduce实战案例
需求分析
假设有一批用户点击网页的日志数据,需要统计每个网页被点击的次数,但要过滤掉黑名单用户的点击数据,具体需求包括:
1、加载黑名单数据。
2、过滤掉黑名单用户的点击记录。
3、将过滤后的数据映射为键值对并进行聚合。
4、过滤掉点击次数少于3次的统计信息。
数据准备
点击日志(clicklog.txt):包含用户ID和点击的网页ID。
黑名单数据(blacklist.txt):包含需要过滤的用户ID。
示例数据如下:
// clicklog.txt 20190612 1315 a1 20190612 2654 b1 ... // blacklist.txt 1111 man 23 2333 man 21 ...
代码编写
1、第一个Map类:加载黑名单数据并将其存入内存,然后读取点击日志,将用户ID与黑名单进行左外连接,如果用户在黑名单中,将其标记为黑名单;否则标记为NULL。
2、第二个Map类:过滤掉黑名单用户的点击记录,只保留非黑名单用户的点击记录,并将其映射为键值对。
3、Reducer类:对过滤后的点击记录进行聚合,计算每个网页的点击总数。
4、第三个Map类:过滤掉点击次数少于3次的统计结果,并将最终结果写入HDFS。
以下是部分代码示例:
// 第一个Map类 public static class FilterMapper1 extends Mapper<LongWritable, Text, Text, IntWritable> { private final Text outKey = new Text(); private final IntWritable outValue = new IntWritable(1); private HashSet<String> blacklist = new HashSet<>(); @Override protected void setup(Context context) throws IOException, InterruptedException { BufferedReader reader = new BufferedReader(new FileReader("blacklist.txt")); String line; while ((line = reader.readLine()) != null) { blacklist.add(line.split("t")[0]); // 添加黑名单用户ID到集合中 } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("t"); String userId = fields[1]; if (!blacklist.contains(userId)) { outKey.set(fields[2]); // 设置页面ID为key context.write(outKey, outValue); // 输出键值对 } } }
// 第二个Map类 public static class FilterMapper2 extends Mapper<Text, IntWritable, Text, IntWritable> { @Override protected void map(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } if (sum >= 3) { context.write(key, new IntWritable(sum)); // 只输出点击次数大于等于3的键值对 } } }
// Reducer类 public static class SumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); // 输出聚合结果 } }
// 第三个Map类 public static class FilterMapper3 extends Mapper<Text, IntWritable, Text, IntWritable> { @Override protected void map(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } if (sum >= 3) { context.write(key, new IntWritable(sum)); // 只输出点击次数大于等于3的键值对 } } }
测试与归纳
在完成代码编写后,需要进行打包上传和测试,测试过程中要确保所有步骤都能正确执行,并验证最终结果是否符合预期,通过链式MapReduce操作,可以有效地处理复杂的数据处理任务,提高系统的灵活性和效率。
FAQs
1、问题:为什么链式MapReduce操作中的Reducer只能有一个?
答案:在链式MapReduce操作中,Reducer只能有一个是因为Reduce阶段的主要作用是对中间结果进行汇总和聚合,如果存在多个Reducer,会导致结果不一致和难以管理,设计上限制了只能有一个Reducer来保证结果的正确性和一致性。
2、问题:链式MapReduce操作适用于哪些场景?
答案:链式MapReduce操作适用于需要对数据进行多阶段处理的复杂场景,例如数据清洗、过滤、转换和聚合等,通过增加多个Mapper,可以在不同阶段对数据进行不同的处理,从而实现复杂的业务逻辑,典型的应用场景包括日志分析、数据清洗和复杂的数据统计分析等。
阶段 | 输入 | Map函数 | 输出 |
Map 1 | 输入数据集(文本文件) | 读取数据行,将每行分割成单词,输出键值对(单词,1) | {word1, 1}, {word2, 1}, …, {wordN, 1} |
Map 2 | Map 1的输出 | 对Map 1的输出进行进一步处理,将相同的单词合并计数,输出键值对(单词,计数) | {word1, count1}, {word2, count2}, …, {wordN, countN} |
Map 3 | Map 2的输出 | 对Map 2的输出进行其他处理,按单词长度排序,输出键值对(单词长度,{单词,计数}) | {length1, {word1, count1}}, {length2, {word2, count2}}, …, {lengthN, {wordN, countN}} |
这个归纳展示了MapReduce任务中一个Map链的三个Map阶段,每个Map阶段对输入数据集进行不同的处理,最终生成一个包含多个键值对的输出,在实际应用中,MapReduce任务可能包含更多Map阶段,每个阶段对输入数据进行不同的转换和计算。
原创文章,作者:未希,如若转载,请注明出处:https://www.kdun.com/ask/1182720.html
本网站发布或转载的文章及图片均来自网络,其原创性以及文中表达的观点和判断不代表本网站。如有问题,请联系客服处理。
发表回复