在大数据处理中,MapReduce 是一个强大的工具,但当面对大量小文件时,其效率会受到显著影响,为了提高处理效率和减少资源消耗,合并小文件是一个常见且必要的步骤。
一、小文件合并的必要性与方法
1、必要性:HDFS 作为 Hadoop 的分布式文件系统,默认将每个文件存储为一个独立的数据块(默认大小为 128MB),这意味着,如果有大量的小文件(例如每个文件小于 128MB),每个文件都会占用一个数据块,导致 NameNode 的元数据压力增大,从而影响整个集群的性能,大量的小文件会导致 MapReduce 作业生成过多的 map 任务,增加任务调度和管理的开销,合并小文件是优化 HDFS 性能和 MapReduce 作业效率的重要手段。
2、方法
数据采集阶段合并:在数据采集的时候,客户端就将小文件或小批数据合成大文件再上传 HDFS,这种方法可以从根本上减少小文件的数量,提高后续数据处理的效率。
HDFS 上使用 MapReduce 程序合并:在业务处理之前,在 HDFS 上使用 MapReduce 程序对小文件进行合并,这种方法适用于已经存在于 HDFS 上的小文件,可以通过自定义 InputFormat 和 RecordReader 来实现。
MapReduce 处理时合并:在 MapReduce 处理时,可采 用 CombineTextInputFormat 提高效率,这种方法通过在 map 端进行数据聚合,减少 reduce 阶段的输入数据量,从而提高整体处理效率。
二、自定义 InputFormat 合并小文件
自定义 InputFormat 是合并小文件的一种有效方法,通过实现 FileInputFormat 和 RecordReader,可以实现对小文件的完整读取和封装,然后在输出时使用 SequenceFileOutputFormat 将多个小文件合并成一个大文件。
1、自定义 WholeFileInputFormat
isSplitable 方法:重写 isSplitable 方法,设置小文件不进行切片,这样可以确保每个小文件作为一个整体被读取。
createRecordReader 方法:重写 createRecordReader 方法,返回自定义的 WholeFileRecordReader。
2、自定义 WholeFileRecordReader
initialize 方法:初始化方法,设置文件切片和配置信息。
nextKeyValue 方法:核心逻辑,读取文件内容并封装到 BytesWritable 中。
getCurrentKey 方法:返回当前键,通常为空或文件名。
getCurrentValue 方法:返回当前值,即文件内容。
getProgress 方法:返回进度,通常为 0 或 1。
close 方法:关闭文件流。
三、代码实现
以下是一个简单的示例代码,展示了如何实现自定义的 WholeFileInputFormat 和 WholeFileRecordReader:
import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{ @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } @Override public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { WholeFileRecordReader reader = new WholeFileRecordReader(); reader.initialize(inputSplit, taskAttemptContext); return reader; } } class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable>{ private FileSplit fileSplit; private Configuration configuration; private BytesWritable bytesWritable = new BytesWritable(); private boolean nextKeyValue = false; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.fileSplit = (FileSplit)split; this.configuration = context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!nextKeyValue) { byte[] contents = new byte[(int)fileSplit.getLength()]; FileSystem fs = fileSplit.getPath().getFileSystem(configuration); FSDataInputStream fis = fs.open(fileSplit.getPath()); IOUtils.readFully(fis, contents, 0, contents.length); bytesWritable.set(contents, 0, contents.length); nextKeyValue = true; return true; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return bytesWritable; } @Override public float getProgress() throws IOException, InterruptedException { return nextKeyValue ? 1 : 0; } @Override public void close() throws IOException { } }
四、Hive 小文件合并策略
Hive 在处理大量小文件时,也会面临与 MapReduce 类似的问题,Hive 提供了一些参数来控制小文件的合并策略,以提高查询性能和减少 NameNode 的压力。
1、hive.mergejob.maponly:默认为 true,hadoop 版本支持 CombineFileInputFormat,则启动 Maponly job for merge,否则启动 MapReduce merge job,map 端 combine file 是比较高效的做法。
2、hive.merge.mapfiles:默认为 true,正常的 maponly job 后,是否启动 merge job 来合并 map 端输出的结果。
3、hive.merge.mapredfiles:默认为 false,正常的 mapreduce job 后,是否启动 merge job 来合并 reduce 端输出的结果,建议开启。
4、hive.merge.smallfiles.avgsize:默认为 16MB,如果不是 partitioned table,输出 table 文件的平均大小小于这个值,启动 merge job,如果是 partitioned table,则分别计算每个 partition 下文件平均大小,只 merge 平均大小小于这个值的 partition,这个值只有当 hive.merge.mapfiles 或 hive.merge.mapredfiles 设定为 true 时,才有效。
5、hive.exec.reducers.bytes.per.reducer:默认为 1G,如果用户不主动设置 mapred.reduce.tasks 数,则会根据 input directory 计算出所有读入文件的 input summary size,然后除以这个值算出 reduce number。
6、hive.merge.size.per.task:默认为 256MB,merge job 后每个文件的目标大小(targetSize),用之前 job 输出文件的 total size 除以这个值,就可以决定 merge job 的 reduce 数目,merge job 的 map 端相当于 identity map,shuffle 到 reduce,每个 reduce dump 一个文件,通过这种方式控制文件的数量和大小。
7、mapred.max.split.size:默认为 256MB,一个 split 最大的大小。
8、mapred.min.split.size.per.node:默认为 1 byte,一个节点上(datanode)split 至少的大小。
9、mapred.min.split.size.per.rack:默认为 1 byte,同一个交换机(rack locality)下 split 至少的大小,通过这三个数的调节,组成了一串 CombineFileSplit 用户可以通过增大 mapred.max.split.size 的值来减少 Map Task 数量。
MapReduce 合并小文件是一项重要的优化措施,可以有效提高数据处理效率和集群性能,通过合理选择合并方法和策略,可以显著减少 NameNode 的元数据压力,降低 MapReduce 作业的资源消耗,提高整体数据处理速度。
原创文章,作者:未希,如若转载,请注明出处:https://www.kdun.com/ask/1237322.html
本网站发布或转载的文章及图片均来自网络,其原创性以及文中表达的观点和判断不代表本网站。如有问题,请联系客服处理。
发表回复