免费表格模板网站,html5网站实例,免费视频app软件哪个好,张家港做企业网站项目整体介绍
对类似WordCount案例的词频统计#xff0c;并将统计结果按出现次数降序排列。
网上有很多帖子#xff0c;均用的相似方案#xff0c;重写某某方法然后。。。运行起来可能会报这样那样的错误#xff0c;这里实现了一种解决方案#xff0c;分享出来供大家参考…项目整体介绍
对类似WordCount案例的词频统计并将统计结果按出现次数降序排列。
网上有很多帖子均用的相似方案重写某某方法然后。。。运行起来可能会报这样那样的错误这里实现了一种解决方案分享出来供大家参考编写两个MapReduce程序第一个程序进行词频统计第二个程序进行降序处理由于是降序还需要自定义对象在对象内部实现降序排序。
一、项目背景及数据集说明
现有某电商网站用户对商品的收藏数据记录了用户收藏的商品id以及收藏日期名为buyer_favorite1。buyer_favorite1包含买家id商品id收藏日期这三个字段数据以“\t”分割样例展现如下
二、编写MapReduce程序统计每个买家收藏商品数量。即统计买家id出现的次数
前置说明
1.配置好Hadoop集群环境并开启相应服务、 2.在hdfs对应路径上先上传好文件可以自己根据文件路径定义这里是hdfs://localhost:9000/mymapreduce1/in/buyer_favorite1。同时再定义好输出路径 3.这里是整个程序词频降序的入口若只是想统计词频请注释掉WordCountSortDESC.mainJob2();
package mapreduce;import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCount {public static void main(String[] args) {Configuration conf new Configuration();conf.set(yarn,resourcemanager, bymd2e674ec1e78);try {Job job Job.getInstance(conf, 111);job.setJobName(WordCount);job.setJarByClass(WordCount.class);job.setMapperClass(doMapper.class); // 这里就是设置下job使用继承doMapper类与定义的内容保持一致job.setReducerClass(doReducer.class); // 同上设置Reduce类型job.setMapOutputKeyClass(Text.class); // 如果map的输出和reduce的输出不一样这里要分别定义好格式job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);Path in new Path(hdfs://localhost:9000/mymapreduce1/in/buyer_favorite1);Path out new Path(hdfs://localhost:9000/mymapreduce1/out);FileInputFormat.addInputPath(job, in);FileOutputFormat.setOutputPath(job, out);if (job.waitForCompletion(true)) {System.out.println(WordCount completition);WordCountSortDESC.mainJob2();System.out.println(diaoyong);}} catch (Exception e) {e.printStackTrace();}// System.exit(job.waitForCompletion(true) ? 0 : 1);}// 第一个Object表示输入key的类型、是该行的首字母相对于文本文件的首地址的偏移量;// 第二个Text表示输入value的类型、存储的是文本文件中的一行以回车符为行结束标记;// 第三个Text表示输出键的类型第四个IntWritable表示输出值的类型public static class doMapper extendsMapperLongWritable, Text, Text, IntWritable {public static final IntWritable one new IntWritable(1);public static Text word new Text();Override// 前面两个Object key,Text value就是输入的key和value第三个参数Context// context是可以记录输入的key和value。protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// StringTokenizer是Java工具包中的一个类用于将字符串进行拆分StringTokenizer tokenizer new StringTokenizer(value.toString(),\t);// 返回当前位置到下一个分隔符之间的字符串, 并把字符串设置成Text格式word.set(tokenizer.nextToken());context.write(word, one);}}// 参数依次表示是输入键类型输入值类型输出键类型输出值类型public static class doReducer extendsReducerText, IntWritable, Text, Text {Override// 输入的是键值类型其中值类型为归并后的结果输出结果为Context类型protected void reduce(Text key, IterableIntWritable values,Context context) throws IOException, InterruptedException {int sum 0;for (IntWritable value : values) {sum value.get();}context.write(key, new Text(Integer.toString(sum)));}}
}
三、核心问题再次编写MapReduce程序将上一步统计的结果降序排列
前置说明
1.这里将上一步统计的结果作为输入进行第二次mapreduce程序的运行。因此要注意输入路径与上一步的输出路径保持一致。 2.由于是降序排列只能自定义FlowBean对象内部实现排序方式。否则升序可以利用shuffle机制默认的排序策略不用自定义对象排序这里不再叙述。
package mapreduce;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCountSortDESC {public static void mainJob2() {Configuration conf new Configuration();conf.set(yarn,resourcemanager, bymd2e674ec1e78);try {Job job Job.getInstance(conf, 1111);job.setJobName(WordCountSortDESC);job.setJarByClass(WordCountSortDESC.class);job.setMapperClass(TwoMapper.class); // 这里就是设置下job使用继承doMapper类与定义的内容保持一致job.setReducerClass(TwoReducer.class); // 同上设置Reduce类型job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);Path in new Path(hdfs://localhost:9000/mymapreduce1/out);Path out new Path(hdfs://localhost:9000/mymapreduce1/out555);FileInputFormat.addInputPath(job, in);FileOutputFormat.setOutputPath(job, out);if (job.waitForCompletion(true)) {System.out.println(DESC Really Done);}} catch (Exception e) {System.out.println(errormainJob2-----------);}}public static class TwoMapper extends MapperObject, Text, FlowBean, Text {private FlowBean outK new FlowBean();private Text outV new Text();Overrideprotected void map(Object key, Text value, Context context)throws IOException, InterruptedException {// 由于真实的数据存储在文件块上这里是因为数据量较小可以保证只在一个文件块FileSplit fs (FileSplit) context.getInputSplit();if (fs.getPath().getName().contains(part-r-00000)) {// 1 获取一行数据String line value.toString();// 2 按照\t,切割数据String[] split line.split(\t);// 3 封装outK outVoutK.setNumber(Long.parseLong(split[1]));outV.set(split[0]);// 4 写出outK outVcontext.write(outK, outV);} else {System.out.println(error-part-r-------------------);}}}public static class TwoReducer extendsReducerFlowBean, Text, Text, FlowBean {Overrideprotected void reduce(FlowBean key, IterableText values,Context context) throws IOException, InterruptedException {// 遍历values集合,循环写出,避免总流量相同的情况for (Text value : values) {// 调换KV位置,反向写出context.write(value, key);}}}public static class FlowBean implements WritableComparableFlowBean {private long number;// 提供无参构造public FlowBean() {}public long getNumber() {return number;}public void setNumber(long number) {this.number number;}// 实现序列化和反序列化方法,注意顺序一定要一致Overridepublic void write(DataOutput out) throws IOException {out.writeLong(this.number);}Overridepublic void readFields(DataInput in) throws IOException {this.number in.readLong();}Overridepublic String toString() {return number \t;}Overridepublic int compareTo(FlowBean o) {// 按照总流量比较,倒序排列if (this.number o.number) {return -1;} else if (this.number o.number) {return 1;} else {return 0;}}}}四、结果展示
执行查看文件命令
hadoop fs -cat /mymapreduce1/out555/part-r-00000可以发现已经进行了降序排列其他数据集结果应类似。