python,from mrjob.job import MRJob,from mrjob.step import MRStep,,class MapReduceExample(MRJob):, def steps(self):, return [, MRStep(, mapper=self.mapper,, reducer=self.reducer, ), ],, def mapper(self, _, line):, words = line.split(), for word in words:, yield (word, 1),, def reducer(self, key, values):, yield (key, sum(values)),,if __name__ == '__main__':, MapReduceExample.run(),
`,,这个例子中,我们定义了一个名为
MapReduceExample的类,它继承自
MRJob。在这个类中,我们定义了两个方法:
mapper和
reducer。
mapper方法将输入的每一行文本分割成单词,并为每个单词生成一个键值对(单词,1)。
reducer方法接收相同键的所有值,并计算它们的和。我们在
if __name__ == ‘__main__’:语句中调用
MapReduceExample.run()`来运行MapReduce作业。MapReduce多语言编程实例
MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算,概念"Map(映射)"和"Reduce(归约)",和他们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性,它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上,当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。
Java编程接口实例解析
WordCount:经典的MapReduce程序示例,通过Mapper将文本中的单词提取出来,并计数每个单词出现的次数,最后由Reducer进行汇总。
public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } }
Hadoop Streaming实现方式
Hadoop Streaming是Hadoop提供的一个工具,允许用户使用任何可读/写标准输入/输出流的编程语言编写MapReduce程序,以下是用Python和C++实现的WordCount例子。
Python版WordCount
Mapper:读取输入文本,按空格分割单词,输出每个单词及其出现次数。
代码示例:
#!/usr/bin/env python import sys import string for line in sys.stdin: line = line.strip() words = line.split() for word in words: print('%st%s' % (word, 1))
Reducer:接收Mapper的输出,对相同单词进行累加,输出最终结果。
代码示例:
#!/usr/bin/env python import sys current_word = None current_count = 0 word = None for line in sys.stdin: line = line.strip() word, count = line.split('t', 1) count = int(count) if current_word == word: current_count += count else: if current_word: print("%s %s" % (current_word, current_count)) current_word = word current_count = count if current_word == word: print("%s %s" % (current_word, current_count))
C++版WordCount
Mapper:读取输入文本,按空格分割单词,输出每个单词及其出现次数。
代码示例:
#include <iostream> #include <string> using namespace std; int main() { string line; while (cin >> line) { string word; stringstream myStream(line); while (myStream >> word) { cout << word << "t" << "1" << endl; } } return 0; }
Reducer:接收Mapper的输出,对相同单词进行累加,输出最终结果。
代码示例:
#include <iostream> #include <string> using namespace std; int main() { string prevKey; int sum = 0; string word; while (cin >> word >> sum) { if (word != prevKey) { if (!prevKey.empty()) { cout << prevKey << "t" << sum << endl; } prevKey = word; sum = 0; } sum += atoi(word.c_str()); } cout << prevKey << "t" << sum << endl; return 0; }
Hadoop Pipes的编程实例
Hadoop Pipes提供了一种使用C++编写MapReduce任务的方式,通过调用Java API来实现MapReduce逻辑,以下是一个使用Hadoop Pipes实现的WordCount示例。
Mapper:读取输入文本,按空格分割单词,输出每个单词及其出现次数。
代码示例:
#include <hadoop/pipes.h> using namespace HadoopPipes; class MyMapper : public Mapper { public: void map(const HadoopPipes::FlowFile& file, const std::string& input, std::ostream& output) { std::string line; getline(input, line); std::istringstream iss(line); std::string word; while (iss >> word) { output << word << " " << 1 << " "; } } };
Reducer:接收Mapper的输出,对相同单词进行累加,输出最终结果。
代码示例:
#include <hadoop/pipes.h> using namespace HadoopPipes; class MyReducer : public Reducer { public: void reduce(const HadoopPipes::FlowFile& file, std::istream& input, std::ostream& output) { std::string prevKey; int sum = 0; std::string key; int value; while (input >> key >> value) { if (key != prevKey) { if (!prevKey.empty()) { output << prevKey << " " << sum << " "; } prevKey = key; sum = 0; } sum += value; } output << prevKey << " " << sum << " "; } };
| 编程语言 | 例子描述 | 代码片段 |
| | | |
| Java | 使用Java实现MapReduce编程模型来计算单词频率 | “`java
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
“` |
| Python | 使用Python的PySpark库实现MapReduce编程模型来计算单词频率 | “`python
from pyspark import SparkContext
def map_function(line):
words = line.split()
return [(word, 1) for word in words]
def reduce_function(key, values):
return sum(values)
sc = SparkContext("local", "Word Count")
text_file = sc.textFile("wordcount.txt")
words = text_file.flatMap(lambda line: map_function(line)).reduceByKey(reduce_function)
words.collect().foreach(lambda x: print(x))
“` |
| Scala | 使用Scala和Apache Spark实现MapReduce编程模型来计算单词频率 | “`scala
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Word Count")
val sc = new SparkContext(conf)
val textFile = sc.textFile("wordcount.txt")
val words = textFile.flatMap(_.split(" "))
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
words.collect().foreach(println)
sc.stop()
}
“` |
| Ruby | 使用Ruby和Apache Spark实现MapReduce编程模型来计算单词频率 | “`ruby
require ‘spark’
SparkConf.new do |conf|
conf.setAppName "Word Count"
end
sc = SparkContext.new
text_file = sc.textFile("wordcount.txt")
words = text_file.flatMap { |line| line.split(" ") }
.map { |word| [word, 1] }
.reduceByKey { |a, b| a + b }
words.collect.each do |word, count|
puts "#{word}: #{count}"
end
sc.stop
“` |
原创文章,作者:未希,如若转载,请注明出处:https://www.kdun.com/ask/1219819.html
本网站发布或转载的文章及图片均来自网络,其原创性以及文中表达的观点和判断不代表本网站。如有问题,请联系客服处理。
发表回复