python,from mrjob.job import MRJob,,class MRWordFrequencyCount(MRJob):,, def mapper(self, _, line):, for word in line.split():, yield word, 1,, def reducer(self, key, values):, yield key, sum(values),,if __name__ == '__main__':, MRWordFrequencyCount.run(),
“,,这个代码用于统计文本中每个单词的出现频率。MapReduce 是一种编程模型和相关实现,用于处理和生成大规模数据集,它通过将任务分解为更小的子任务(映射),然后对这些子任务的结果进行排序和汇总(归约)来处理大量数据,以下是一个使用 MapReduce 统计词频的样例代码,包括详细的解释和两个常见问题的解答。
MapReduce 统计词频样例代码
Mapper 类
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split("\s+"); for (String w : words) { if (w.length() > 0) { word.set(w); context.write(word, one); } } } }
Reducer 类
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } }
Driver 类
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountDriver { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: WordCount <input path> <output path>"); System.exit(1); } Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setCombinerClass(WordCountReducer.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
解释
Mapper 类解释
WordCountMapper
类继承自Mapper
,并实现了map
方法。
map
方法接收输入键值对(LongWritable
和Text
),其中LongWritable
是行号,Text
是行内容。
行内容被拆分成单词数组,每个单词被转换为一个Text
对象,并输出一个键值对(单词,1)。
Reducer 类解释
WordCountReducer
类继承自Reducer
,并实现了reduce
方法。
reduce
方法接收一个键(单词)和一个值的迭代器(单词出现次数的集合)。
将所有值相加,得到单词的总出现次数,并输出一个键值对(单词,总次数)。
Driver 类解释
WordCountDriver
类设置并运行 MapReduce 作业。
配置作业的输入路径和输出路径。
设置 Mapper、Combiner 和 Reducer 类。
提交作业并等待完成。
表格:MapReduce 工作流程
步骤 | 描述 |
1 | 输入数据分片并分发到各个 Map 任务。 |
2 | 每个 Map 任务处理输入数据并生成中间键值对。 |
3 | Shuffle 和 Sort 阶段,将中间键值对按键排序并传输到 Reduce 任务。 |
4 | 每个 Reduce 任务接收中间键值对,进行归约操作,生成最终结果。 |
5 | 将最终结果写入输出文件。 |
FAQs
Q1:如何更改 MapReduce 程序中的输入和输出路径?
A1:可以通过修改WordCountDriver
类的main
方法中的args
参数来更改输入和输出路径。
public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: WordCount <input path> <output path>"); System.exit(1); } // 输入路径和输出路径通过命令行参数传入 Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setCombinerClass(WordCountReducer.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); // 输入路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径 System.exit(job.waitForCompletion(true) ? 0 : 1); }
在运行程序时,可以通过命令行传递新的路径参数,如下所示:
hadoop jar yourjarfile.jar input/path output/path
Q2:如何在 MapReduce 程序中添加自定义逻辑?
A2:可以在Mapper
或Reducer
类中添加自定义逻辑,在WordCountMapper
类的map
方法中,可以添加额外的逻辑来过滤特定的单词或执行其他操作,以下是一个简单的示例,过滤掉长度小于 3 的单词:
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split("\s+"); for (String w : words) { if (w.length() > 3) { // 只处理长度大于3的单词 word.set(w); context.write(word, one); } } }
通过这种方式,可以在 MapReduce 程序中添加任何需要的自定义逻辑。
原创文章,作者:未希,如若转载,请注明出处:https://www.kdun.com/ask/1236681.html
本网站发布或转载的文章及图片均来自网络,其原创性以及文中表达的观点和判断不代表本网站。如有问题,请联系客服处理。
发表回复