python,from mapreduce import iterable_MapReduce,,def mapper(x):, return x, 1,,def reducer(x, y):, return x + y,,data = [1, 2, 3, 4, 5],result = iterable_MapReduce(data, mapper, reducer),print(result),
“MapReduce是一种处理和生成大数据集的编程模型,广泛用于分布式计算环境,它最早由Google提出,并成为Apache Hadoop的核心组件之一,MapReduce将任务分解成两个主要阶段:Map(映射)和Reduce(归约),通过这种方式,可以有效地处理大量数据,下面将以统计每个买家收藏商品数量为例,介绍MapReduce的实现过程。
MapReduce框架的工作原理
1、资源管理:ResourceManager负责集群中所有资源的统一管理和分配,它接收来自NodeManager的汇报,建立ApplicationMaster,并将资源派送给ApplicationMaster。
2、节点管理:NodeManager是ResourceManager在每台机器上的代理,负责容器管理,并监控它们的资源使用情况,同时向ResourceManager提供这些资源使用报告。
3、应用管理:ApplicationMaster负责向ResourceManager申请资源,请求NodeManager启动Container,并告诉Container做什么事情。
4、容器运行:Container是YARN中资源的抽象,它封装了某个节点上一定量的资源(CPU和内存),Container由ApplicationMaster向ResourceManager申请的,由ResourceManager中的资源调度器异步分配给ApplicationMaster。
示例代码详解
以下是一个用Java编写的MapReduce程序,用于统计每个买家收藏的商品数量,这个示例假设输入文件包含三列数据:买家ID、商品ID和收藏日期,各列之间用制表符(t)分隔。
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FavoriteCount { // Mapper类 public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("t"); // 获取买家ID作为key String buyerId = fields[0]; context.write(new Text(buyerId), new IntWritable(1)); } } // Reducer类 public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } // Driver类 public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "favorite count"); job.setJarByClass(FavoriteCount.class); job.setMapperClass(MyMapper.class); job.setCombinerClass(MyReducer.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
代码解析
1、导入必要的包:需要导入Hadoop相关的类和接口,如Configuration
、Path
、Text
、IntWritable
、Job
、Mapper
和Reducer
等。
2、定义Mapper类:继承自Mapper
类,重写map
方法,在map
方法中,读取一行文本并将其拆分成多个字段,以买家ID为键,输出值为1。
3、定义Reducer类:继承自Reducer
类,重写reduce
方法,在reduce
方法中,将所有相同键的值累加起来,得到每个买家收藏的商品总数。
4、Driver类:设置作业的配置信息,包括输入输出路径、Mapper和Reducer类等,最后调用job.waitForCompletion()
来执行作业。
MapReduce作业运行流程
1、提交作业:用户通过客户端提交MapReduce作业到集群。
2、初始化作业:ResourceManager创建一个ApplicationMaster来管理作业的执行。
3、分配资源:ResourceManager与NodeManager协作,为ApplicationMaster和各个任务分配资源。
4、执行任务:ApplicationMaster启动Container来执行具体的Map和Reduce任务,Map任务的输出会经过shuffle和sort阶段,再传递给Reduce任务。
5、汇归纳果:Reduce任务完成后,结果会被写入HDFS中。
常见问题解答(FAQs)
1、问题一:为什么MapReduce适合大数据处理?
答案:MapReduce通过将任务分解成小的、独立的子任务,可以在大规模的分布式环境中并行处理数据,这种设计使得它可以高效地处理和分析海量数据集,即使单节点失败也不会影响整体任务的完成。
2、问题二:MapReduce中的Combiner有什么作用?
答案:Combiner是一个可选的中间聚合步骤,它在Map任务的输出和Reduce任务的输入之间进行局部聚合,这可以减少网络传输的数据量,提高整体性能,在上面的示例中,可以在Map阶段之后添加一个Combiner,对本地的中间结果进行初步汇总。
| 部分 | 代码 | 说明 |
| | | |
|Mapper | “`python
def mapper(input_value):
# 输入值是一个字符串,"hello world"
words = input_value.split() # 分割字符串为单词列表
for word in words:
yield (word, 1) # 生成单词和计数的键值对
“` | Mapper函数接收一个字符串作为输入,分割成单词,并为每个单词生成一个键值对,其中键是单词,值是1。 |
|Shuffle and Sort | (这一步通常由MapReduce框架自动处理) | MapReduce框架会将Mapper输出的键值对根据键进行排序和分组,为Reducer准备输入。 |
|Reducer | “`python
def reducer(mapped_values):
# mapped_values 是一个迭代器,包含相同键的所有值
word, counts = zip(*mapped_values) # 将键和值组合
total_count = sum(counts) # 计算总计数
yield (word, total_count) # 生成单词和总计数的键值对
“` | Reducer函数接收一组键值对,其中键是单词,值是一个包含该单词计数的列表,Reducer计算每个单词的总计数,并生成一个新的键值对。 |
|Driver | “`python
if __name__ == "__main__":
input_data = "hello world hello mapreduce"
mapped = mapper(input_data)
reduced = reducer(mapped)
for word, count in reduced:
print(f"{word}: {count}")
“` | Driver是程序的入口点,它调用Mapper和Reducer,并打印出最终的结果。 |
这个简单的MapReduce程序首先通过Mapper将输入字符串分割成单词,并为每个单词计数,Reducer统计每个单词的总出现次数,Driver打印出每个单词及其出现次数。
这个例子没有实现完整的MapReduce框架,如分布式处理、错误处理、并行化等,但它提供了一个基本的MapReduce处理流程,在实际应用中,MapReduce通常会使用如Hadoop这样的框架来处理大规模数据集。
原创文章,作者:未希,如若转载,请注明出处:https://www.kdun.com/ask/1184789.html
本网站发布或转载的文章及图片均来自网络,其原创性以及文中表达的观点和判断不代表本网站。如有问题,请联系客服处理。
发表回复