在大数据时代,Hadoop MapReduce是一个非常重要的数据处理工具。然而,处理大量数据时,MapReduce的性能问题也非常明显。本文将介绍一些基于Hadoop的MapReduce性能优化实践技巧,并实现一个简单的示例代码来说明这些技巧的应用。
1. 数据倾斜问题的解决
在处理大数据时,洗牌阶段很容易成为MapReduce作业的性能瓶颈。洗牌的过程会将Map输出的结果根据默认的分区规则分发到不同的Reducer task中,而在默认的分区规则下,很容易出现一些Key被分配到同一个Reducer task的情况,这就导致了数据倾斜现象,造成了某些task的负荷非常高,而其它task的负荷却非常低的情况。
下面是解决数据倾斜问题的几种技巧:
1.1. Combiner的应用
在Map端对一组key进行聚合操作,可以减少洗牌的数据量
// javascriptcn.com 代码示例 public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); } } } public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
Combiner的应用:
// javascriptcn.com 代码示例 public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(WordCountMapper.class); job.setCombinerClass(WordCountReducer.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true);
1.2. 自定义分区
使用自定义分区,将数据分散到各个Reducer task中,减少数据倾斜
// javascriptcn.com 代码示例 public static class CustomPartitioner extends Partitioner<Text, IntWritable> { @Override public int getPartition(Text key, IntWritable value, int numPartitions) { String firstLetter = key.toString().substring(0, 1).toLowerCase(); if (numPartitions < 10) { return Integer.parseInt(firstLetter) % numPartitions; } else { return Integer.parseInt(firstLetter); } } } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(WordCountMapper.class); job.setPartitionerClass(CustomPartitioner.class); job.setNumReduceTasks(Integer.parseInt(args[2])); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true);
2. 拆分小文件的技巧
MapReduce的输入数据最好是以几个较大的文件或一个文件夹为单位进行处理,而不是一个小文件一个小文件的处理。可以将多个小文件合并成一个大文件,再使用多个Mapper来处理这个大文件。
3. 记忆性的技巧
对于一些有记忆性的操作,需要充分利用MapReduce中的一些机制,如Mapper端的累加器。
// javascriptcn.com 代码示例 public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private final static String SEARCH_WORD = "hello"; private int count = 0; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); if (SEARCH_WORD.equals(word.toString())) { count++; } context.write(word, one); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { context.getCounter("wordCount", "countHello").increment(count); } } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true);
Counters: 1 wordCount countHello=703
4. 压缩数据的技巧
MapReduce处理的数据往往都是非常大的,利用压缩机制可以有效地减小网络I/O和磁盘I/O的负荷,提高作业的效率。
job.getConfiguration().setBoolean("mapreduce.map.output.compress", true); job.getConfiguration().setClass("mapreduce.map.output.compress.codec", GzipCodec.class, CompressionCodec.class); job.getConfiguration().setBoolean("mapreduce.output.fileoutputformat.compress", true); job.getConfiguration().setClass("mapreduce.output.fileoutputformat.compress.codec", GzipCodec.class, CompressionCodec.class); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setCompressOutput(job, true);
结语
MapReduce是一个非常强大的数据处理框架,在处理大规模数据时,需要充分地利用其提供的机制和技巧来优化作业的性能。本文介绍的技巧只是冰山一角,还有很多其它的技巧可以使用。因此,要成为一名优秀的MapReduce开发人员,需要不断提高自己的技能和经验,在实践中不断地总结和积累。
来源:JavaScript中文网 ,转载请注明来源 本文地址:https://www.javascriptcn.com/post/6530c8f87d4982a6eb258c7a