MapReduce是一种编程模型,用于处理和生成大数据集,它由两个主要阶段组成:Map阶段和Reduce阶段,在Map阶段,输入数据被分割成多个独立的块,然后每个块被映射到一个键值对,在Reduce阶段,所有具有相同键的键值对被组合在一起,并应用一个规约函数以生成最终结果。
以下是一个简单的MapReduce示例,使用Java编写,并使用了Hadoop库来执行MapReduce任务,在这个示例中,我们将计算文本文件中单词的出现次数。
我们需要创建一个名为WordCountMapper
的类,该类继承自Mapper
类,并实现map
方法。map
方法接收一个键值对作为输入,其中键是输入数据的偏移量,值是输入数据的一部分,在这个例子中,我们不需要键,所以我们将其忽略。
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split("\s+"); for (String w : words) { word.set(w); context.write(word, one); } } }
我们需要创建一个名为WordCountReducer
的类,该类继承自Reducer
类,并实现reduce
方法。reduce
方法接收一个键和一个迭代器,其中迭代器包含与该键关联的所有值,在这个例子中,我们将累加这些值以计算单词的出现次数。
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } }
我们需要创建一个名为WordCount
的主类,该类继承自Configured
类,并实现run
方法,在这个方法中,我们将配置和运行MapReduce作业。
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount extends Configured implements Tool { public int run(String[] args) throws Exception { Configuration conf = getConf(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(WordCountMapper.class); job.setCombinerClass(WordCountReducer.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new WordCount(), args); System.exit(exitCode); } }
要运行这个MapReduce作业,你需要将上述代码编译成一个jar文件,并将其提交给Hadoop集群,你还需要提供一个输入文件和一个输出目录。
hadoop jar wordcount.jar WordCount input.txt output
这将计算input.txt
文件中每个单词的出现次数,并将结果写入output
目录。
原创文章,作者:未希,如若转载,请注明出处:https://www.kdun.com/ask/843492.html
本网站发布或转载的文章及图片均来自网络,其原创性以及文中表达的观点和判断不代表本网站。如有问题,请联系客服处理。
发表回复