网站开发项目方案,较便宜的网站建设,网站风格类型是,设计教育机构一、简介
求和是统计中最常使用到的#xff0c;现在使用Mapreduce在海量数据中统计数据的求和。 二、例子
#xff08;1#xff09;实例描述 给出三个文件#xff0c;每个文件中都存储了若干个数值#xff0c;求所有数值中的求和。
样例输入#xff1a; …一、简介
求和是统计中最常使用到的现在使用Mapreduce在海量数据中统计数据的求和。 二、例子
1实例描述 给出三个文件每个文件中都存储了若干个数值求所有数值中的求和。
样例输入 1file1
1
2
3
7
9
-99
2 2file2
11
2
23
17
9
199
22 3file3
21
12
3
17
2
39
12 期望输出
314 2问题分析 实现统计海量数据的求和不能将所有的数据加载到内存计算只能使用类似外部排序的方式加载一部分数据统计求和接着加载另一部分进行统计。
3实现步骤
1Map过程 首先使用默认的TextInputFormat类对输入文件进行处理得到文本中每行的偏移量及其内容。显然Map过程首先必须分析输入的key,value对得到数值然后在mapper中统计单个分块的求和。
2Reduce过程 经过map方法处理后Reduce过程将获取每个mapper的求和进行统计分行统计出总的求和。 3关键代码
package com.mk.mapreduce;import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;public class SumValue {public static class SumValueMapper extends MapperLongWritable, Text, IntWritable, NullWritable {private int sumValue 0;Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {if (StringUtils.isBlank(value.toString())) {System.out.println(空白行);return;}int v Integer.parseInt(value.toString().trim());sumValue sumValue v;}Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {context.write( new IntWritable(sumValue), NullWritable.get());}}public static class SumValueReducer extends Reducer IntWritable, NullWritable,IntWritable, NullWritable {private int sumValue 0;Overrideprotected void reduce(IntWritable key, IterableNullWritable values, Context context) throws IOException, InterruptedException {int v key.get();sumValue sumValue v;}Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {context.write( new IntWritable(sumValue), NullWritable.get());}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {String uri hdfs://192.168.150.128:9000;String input /sumValue/input;String output /sumValue/output;Configuration conf new Configuration();if (System.getProperty(os.name).toLowerCase().contains(win))conf.set(mapreduce.app-submission.cross-platform, true);FileSystem fileSystem FileSystem.get(URI.create(uri), conf);Path path new Path(output);fileSystem.delete(path, true);Job job new Job(conf, SumValue);job.setJar(./out/artifacts/hadoop_test_jar/hadoop-test.jar);job.setJarByClass(SumValue.class);job.setMapperClass(SumValueMapper.class);job.setReducerClass(SumValueReducer.class);job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.addInputPaths(job, uri input);FileOutputFormat.setOutputPath(job, new Path(uri output));boolean ret job.waitForCompletion(true);System.out.println(job.getJobName() ----- ret);}
}