ssr网站开发,如何让别人在百度上搜到自己公司,手工制作小店铺,wordpress管理主体在MR程序的开发过程中#xff0c;经常会遇到输入数据不是HDFS或者数据输出目的地不是HDFS的#xff0c;MapReduce的设计已经考虑到这种情况#xff0c;它为我们提供了两个组建#xff0c;只需要我们自定义适合的InputFormat和OutputFormat#xff0c;就可以完成这个需求经常会遇到输入数据不是HDFS或者数据输出目的地不是HDFS的MapReduce的设计已经考虑到这种情况它为我们提供了两个组建只需要我们自定义适合的InputFormat和OutputFormat就可以完成这个需求这里简单的介绍一个从MongoDB中读数据并写出数据到MongoDB中的一种情况只是一个Demo所以数据随便找的一个。 一、自定义InputFormat MapReduce中Map阶段的数据输入是由InputFormat决定的我们查看org.apache.hadoop.mapreduce.InputFormat的源码可以看到以下代码内容我们可以看到除了实现InputFormat抽象类以外我们还需要自定义InputSplit和自定义RecordReader类这两个类的主要作用分别是split确定数据分片的大小以及数据的位置信息recordReader具体的读取数据。 public abstract class InputFormatK, V {public abstract ListInputSplit getSplits(JobContext context) throws IOException, InterruptedException; // 获取Map阶段的数据分片集合信息 public abstract RecordReaderK,V createRecordReader(InputSplit split, TaskAttemptContext context throws IOException, InterruptedException; // 创建具体的数据读取对象
} 1、自定义InputSplit 自定义InputSplit主要需要实现的方法有一下几个 public abstract class InputSplit { public abstract long getLength() throws IOException, InterruptedException; // 获取当前分片的长度大小 public abstract String[] getLocations() throws IOException, InterruptedException; // 获取当前分片的位置信息
} 2、自定义RecordReader 自定义RecordReader的主要实现方法有一下几个 public abstract class RecordReaderKEYIN, VALUEIN implements Closeable {public abstract void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; // 初始化如果在构造函数中初始化了那么该方法可以为空public abstract boolean nextKeyValue() throws IOException, InterruptedException; //是否存在下一个key/value如果存在返回true。否则返回false。public abstract KEYIN getCurrentKey() throws IOException, InterruptedException; // 获取当然keypublic abstract VALUEIN getCurrentValue() throws IOException, InterruptedException; // 获取当然valuepublic abstract float getProgress() throws IOException, InterruptedException; // 获取进度信息public abstract void close() throws IOException; // 关闭资源
} 二、自定义OutputFormat MapReduce中Reducer阶段的数据输出是由OutputFormat决定的决定数据的输出目的地和job的提交对象我们查看org.apache.hadoop.mapreduce.OutputFormat的源码可以看到以下代码内容我们可以看到除了实现OutputFormat抽象类以外我们还需要自定义RecordWriter和自定义OutputCommitter类其中OutputCommitter类由于不涉及到具体的输出目的地所以一般情况下不用重写可直接使用FileOutputcommitter对象RecordWriter类是具体的定义如何将数据写到目的地的。 public abstract class OutputFormatK, V { public abstract RecordWriterK, V getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException; // 获取具体的数据写出对象public abstract void checkOutputSpecs(JobContext context) throws IOException, InterruptedException; // 检查输出配置信息是否正确public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException; // 获取输出job的提交者对象
} 1、自定义RecordWriter 查看RecordWriter源码我们可以看到主要需要实现的有下列三个方法分别是 public abstract class RecordWriterK, V { public abstract void write(K key, V value) throws IOException, InterruptedException; // 具体的写数据的方法public abstract void close(TaskAttemptContext context) throws IOException, InterruptedException; // 关闭资源
} 三、详细代码 自定义InputFormatInputSplit 1 package com.gerry.mongo.hadoop2x.mr.mongodb.lib;2 3 import java.io.DataInput;4 import java.io.DataOutput;5 import java.io.IOException;6 import java.util.ArrayList;7 import java.util.List;8 import java.util.Map;9 10 import org.apache.hadoop.conf.Configurable;11 import org.apache.hadoop.conf.Configuration;12 import org.apache.hadoop.io.LongWritable;13 import org.apache.hadoop.io.Writable;14 import org.apache.hadoop.mapreduce.InputFormat;15 import org.apache.hadoop.mapreduce.InputSplit;16 import org.apache.hadoop.mapreduce.JobContext;17 import org.apache.hadoop.mapreduce.MRJobConfig;18 import org.apache.hadoop.mapreduce.RecordReader;19 import org.apache.hadoop.mapreduce.TaskAttemptContext;20 import org.apache.log4j.Logger;21 22 import com.mongodb.BasicDBObject;23 import com.mongodb.BasicDBObjectBuilder;24 import com.mongodb.DB;25 import com.mongodb.DBCollection;26 import com.mongodb.DBObject;27 import com.mongodb.Mongo;28 import com.mongodb.MongoException;29 30 public class MongoDBInputFormatT extends MongoDBWritable extends InputFormatLongWritable, T implements Configurable {31 private static final Logger LOG Logger.getLogger(MongoDBInputFormat.class);32 33 /**34 * 空的对象主要作用是不进行任何操作类似于NullWritable35 */36 public static class NullMongoDBWritable implements MongoDBWritable, Writable {37 Override38 public void write(DBCollection collection) throws MongoException {39 // TODO Auto-generated method stub40 }41 42 Override43 public void readFields(DBObject object) throws MongoException {44 // TODO Auto-generated method stub45 }46 47 Override48 public void write(DataOutput out) throws IOException {49 // TODO Auto-generated method stub50 }51 52 Override53 public void readFields(DataInput in) throws IOException {54 // TODO Auto-generated method stub55 }56 57 Override58 public DBObject fetchWriteDBObject(DBObject old) throws MongoException {59 // TODO Auto-generated method stub60 return old;61 }62 63 }64 65 /**66 * MongoDB的input split类67 */68 public static class MongoDBInputSplit extends InputSplit implements Writable {69 private long end 0;70 private long start 0;71 72 /**73 * 默认构造方法74 */75 public MongoDBInputSplit() {76 }77 78 /**79 * 便利的构造方法80 * 81 * param start82 * 集合中查询的文档开始行号83 * param end84 * 集合中查询的文档结束行号85 */86 public MongoDBInputSplit(long start, long end) {87 this.start start;88 this.end end;89 }90 91 public long getEnd() {92 return end;93 }94 95 public long getStart() {96 return start;97 }98 99 Override
100 public void write(DataOutput out) throws IOException {
101 out.writeLong(this.start);
102 out.writeLong(this.end);
103 }
104
105 Override
106 public void readFields(DataInput in) throws IOException {
107 this.start in.readLong();
108 this.end in.readLong();
109 }
110
111 Override
112 public long getLength() throws IOException, InterruptedException {
113 // 分片大小
114 return this.end - this.start;
115 }
116
117 Override
118 public String[] getLocations() throws IOException, InterruptedException {
119 // TODO 返回一个空的数组表示不进行数据本地化的优化那么map执行节点随机选择。
120 return new String[] {};
121 }
122
123 }
124
125 protected MongoDBConfiguration mongoConfiguration; // mongo相关配置信息
126 protected Mongo mongo; // mongo连接
127 protected String databaseName; // 连接的数据库名称
128 protected String collectionName; // 连接的集合名称
129 protected DBObject conditionQuery; // 选择条件
130 protected DBObject fieldQuery; // 需要的字段条件
131
132 Override
133 public ListInputSplit getSplits(JobContext job) throws IOException, InterruptedException {
134 DBCollection dbCollection null;
135 try {
136 dbCollection this.getDBCollection();
137 // 获取数量大小
138 long count dbCollection.count(this.getConditionQuery());
139 int chunks job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
140 long chunkSize (count / chunks); // 分片数量
141
142 // 开始分片只是简单的分配每个分片的数据量
143 ListInputSplit splits new ArrayListInputSplit();
144 for (int i 0; i chunks; i) {
145 MongoDBInputSplit split null;
146 if ((i 1) chunks) {
147 split new MongoDBInputSplit(i * chunkSize, count);
148 } else {
149 split new MongoDBInputSplit(i * chunkSize, (i * chunkSize) chunkSize);
150 }
151 splits.add(split);
152 }
153 return splits;
154 } catch (Exception e) {
155 throw new IOException(e);
156 } finally {
157 dbCollection null;
158 closeConnection(); // 关闭资源的连接
159 }
160 }
161
162 Override
163 public RecordReaderLongWritable, T createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
164 return createRecordReader((MongoDBInputSplit) split, context.getConfiguration());
165 }
166
167 protected RecordReaderLongWritable, T createRecordReader(MongoDBInputSplit split, Configuration conf) {
168 // 获取从mongodb中读取数据需要转换成的value class默认为NullMongoDBWritable
169 Class? extends MongoDBWritable valueClass this.mongoConfiguration.getValueClass();
170 return new MongoDBRecordReaderT(split, valueClass, conf, getDBCollection(), getConditionQuery(), getFieldQuery());
171 }
172
173 Override
174 public void setConf(Configuration conf) {
175 mongoConfiguration new MongoDBConfiguration(conf);
176 databaseName this.mongoConfiguration.getInputDatabaseName(); // 输入数据的数据库
177 collectionName this.mongoConfiguration.getInputCollectionName(); // 输入数据的集合
178 getMongo(); // 初始化
179 getConditionQuery(); // 初始化
180 getFieldQuery(); // 初始化
181 }
182
183 Override
184 public Configuration getConf() {
185 return this.mongoConfiguration.getConfiguration();
186 }
187
188 public Mongo getMongo() {
189 try {
190 if (null this.mongo) {
191 this.mongo this.mongoConfiguration.getMongoConnection();
192 }
193 } catch (Exception e) {
194 throw new RuntimeException(e);
195 }
196 return mongo;
197 }
198
199 public DBObject getConditionQuery() {
200 if (null this.conditionQuery) {
201 MapString, String conditions this.mongoConfiguration.getInputConditions();
202 BasicDBObjectBuilder builder new BasicDBObjectBuilder();
203 for (Map.EntryString, String entry : conditions.entrySet()) {
204 if (entry.getValue() ! null) {
205 builder.append(entry.getKey(), entry.getValue());
206 } else {
207 builder.push(entry.getKey());
208 }
209 }
210 if (builder.isEmpty()) {
211 this.conditionQuery new BasicDBObject();
212 } else {
213 this.conditionQuery builder.get();
214 }
215 }
216 return this.conditionQuery;
217 }
218
219 public DBObject getFieldQuery() {
220 if (fieldQuery null) {
221 String[] fields this.mongoConfiguration.getInputFieldNames();
222 if (fields ! null fields.length 0) {
223 BasicDBObjectBuilder builder new BasicDBObjectBuilder();
224 for (String field : fields) {
225 builder.push(field);
226 }
227 fieldQuery builder.get();
228 } else {
229 fieldQuery new BasicDBObject();
230 }
231 }
232 return fieldQuery;
233 }
234
235 protected DBCollection getDBCollection() {
236 DB db getMongo().getDB(this.databaseName);
237 if (this.mongoConfiguration.isEnableAuth()) {
238 String username this.mongoConfiguration.getUsername();
239 String password this.mongoConfiguration.getPassword();
240 if (!db.authenticate(username, password.toCharArray())) {
241 throw new RuntimeException(authenticate failure with the username: username ,pwd: password);
242 }
243 }
244 return db.getCollection(collectionName);
245 }
246
247 protected void closeConnection() {
248 try {
249 if (null ! this.mongo) {
250 this.mongo.close();
251 this.mongo null;
252 }
253 } catch (Exception e) {
254 LOG.debug(Exception on close, e);
255 }
256 }
257 } MongoDBInputFormat.java 自定义RecordReader package com.gerry.mongo.hadoop2x.mr.mongodb.lib;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;public class MongoDBRecordReaderT extends MongoDBWritable extends RecordReaderLongWritable, T {private Class? extends MongoDBWritable valueClass;private LongWritable key;private T value;private long pos;private Configuration conf;private MongoDBInputFormat.MongoDBInputSplit split;private DBCollection collection;private DBObject conditionQuery;private DBObject fieldQuery;private DBCursor cursor;public MongoDBRecordReader(MongoDBInputFormat.MongoDBInputSplit split, Class? extends MongoDBWritable valueClass, Configuration conf, DBCollection collection, DBObject conditionQuery,DBObject fieldQuery) {this.split split;this.valueClass valueClass;this.collection collection;this.conditionQuery conditionQuery;this.fieldQuery fieldQuery;this.conf conf;}Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {// do nothing}SuppressWarnings(unchecked)Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {try {if (key null) {key new LongWritable();}if (value null) {value (T) ReflectionUtils.newInstance(valueClass, conf);}if (null cursor) {cursor executeQuery();}if (!cursor.hasNext()) {return false;}key.set(pos split.getStart()); // 设置keyvalue.readFields(cursor.next()); // 设置valuepos;} catch (Exception e) {throw new IOException(Exception in nextKeyValue, e);}return true;}protected DBCursor executeQuery() {try {return collection.find(conditionQuery, fieldQuery).skip((int) split.getStart()).limit((int) split.getLength());} catch (IOException | InterruptedException e) {throw new RuntimeException(e);}}Overridepublic LongWritable getCurrentKey() throws IOException, InterruptedException {return this.key;}Overridepublic T getCurrentValue() throws IOException, InterruptedException {return this.value;}Overridepublic float getProgress() throws IOException, InterruptedException {return pos;}Overridepublic void close() throws IOException {if (collection ! null) {collection.getDB().getMongo().close();}}} MongoDBRecordReader.java 自定义OutputFormatRecordWriter package com.gerry.mongo.hadoop2x.mr.mongodb.lib;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;public class MongoDBOutputFormatK extends MongoDBWritable, V extends MongoDBWritable extends OutputFormatK, V {private static Logger LOG Logger.getLogger(MongoDBOutputFormat.class);/*** A RecordWriter that writes the reduce output to a MongoDB collection* * param K* param T*/public static class MongoDBRecordWriterK extends MongoDBWritable, V extends MongoDBWritable extends RecordWriterK, V {private Mongo mongo;private String databaseName;private String collectionName;private MongoDBConfiguration dbConf;private DBCollection dbCollection;private DBObject dbObject;private boolean enableFetchMethod;public MongoDBRecordWriter(MongoDBConfiguration dbConf, Mongo mongo, String databaseName, String collectionName) {this.mongo mongo;this.databaseName databaseName;this.collectionName collectionName;this.dbConf dbConf;this.enableFetchMethod this.dbConf.isEnableUseFetchMethod();getDbCollection();// 创建连接}protected DBCollection getDbCollection() {if (null this.dbCollection) {DB db this.mongo.getDB(this.databaseName);if (this.dbConf.isEnableAuth()) {String username this.dbConf.getUsername();String password this.dbConf.getPassword();if (!db.authenticate(username, password.toCharArray())) {throw new RuntimeException(authenticate failure, the username: username , pwd: password);}}this.dbCollection db.getCollection(this.collectionName);}return this.dbCollection;}Overridepublic void write(K key, V value) throws IOException, InterruptedException {if (this.enableFetchMethod) {this.dbObject key.fetchWriteDBObject(null);this.dbObject value.fetchWriteDBObject(this.dbObject);// 写数据this.dbCollection.insert(this.dbObject);// 在这里可以做一个缓存一起提交如果数据量大的情况下。this.dbObject null;} else {// 直接调用写方法key.write(dbCollection);value.write(dbCollection);}}Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {if (this.mongo ! null) {this.dbCollection null;this.mongo.close();}}}Overridepublic RecordWriterK, V getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {try {MongoDBConfiguration dbConf new MongoDBConfiguration(context.getConfiguration());String databaseName dbConf.getOutputDatabaseName();String collectionName dbConf.getOutputCollectionName();Mongo mongo dbConf.getMongoConnection();return new MongoDBRecordWriterK, V(dbConf, mongo, databaseName, collectionName);} catch (Exception e) {LOG.error(Create the record writer occur exception., e);throw new IOException(e);}}Overridepublic void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {// 不进行检测}Overridepublic OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {// 由于outputcommitter主要作用是提交jar分配jar的功能。所以我们这里直接使用FileOutputCommitterreturn new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context);}/*** 设置output属性* * param job* param databaseName* param collectionName*/public static void setOutput(Job job, String databaseName, String collectionName) {job.setOutputFormatClass(MongoDBOutputFormat.class);job.setReduceSpeculativeExecution(false);MongoDBConfiguration mdc new MongoDBConfiguration(job.getConfiguration());mdc.setOutputCollectionName(collectionName);mdc.setOutputDatabaseName(databaseName);}/*** 静止使用fetch方法* * param conf*/public static void disableFetchMethod(Configuration conf) {conf.setBoolean(MongoDBConfiguration.OUTPUT_USE_FETCH_METHOD_PROPERTY, false);}
} MongoDBOutputFormat.java 其他涉及到的java代码 package com.gerry.mongo.hadoop2x.mr.mongodb.lib;import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBInputFormat.NullMongoDBWritable;
import com.mongodb.Mongo;
import com.mongodb.ServerAddress;public class MongoDBConfiguration {public static final String BIND_HOST_PROPERTY mapreduce.mongo.host;public static final String BIND_PORT_PROPERTY mapreduce.mongo.port;public static final String AUTH_ENABLE_PROPERTY mapreduce.mongo.auth.enable;public static final String USERNAME_PROPERTY mapreduce.mongo.username;public static final String PASSWORD_PROPERTY mapreduce.mongo.password;public static final String PARTITION_PROPERTY mapreduce.mongo.partition;public static final String INPUT_DATABASE_NAME_PROPERTY mapreduce.mongo.input.database.name;public static final String INPUT_COLLECTION_NAME_PROPERTY mapreduce.mongo.input.collection.name;public static final String INPUT_FIELD_NAMES_PROPERTY mapreduce.mongo.input.field.names;public static final String INPUT_CONDITIONS_PROPERTY mapreduce.mongo.input.conditions;public static final String INPUT_CLASS_PROPERTY mapreduce.mongo.input.class;public static final String OUTPUT_DATABASE_NAME_PROPERTY mapreduce.mongo.output.database.name;public static final String OUTPUT_COLLECTION_NAME_PROPERTY mapreduce.mongo.output.collection.name;// 在recordwriter中到底是否调用fetch方法默认调用。如果设置为不调用那么就直接使用writer方法public static final String OUTPUT_USE_FETCH_METHOD_PROPERTY mapreduce.mongo.output.use.fetch.method;private Configuration conf;public MongoDBConfiguration(Configuration conf) {this.conf conf;}/*** 获取Configuration对象* * return*/public Configuration getConfiguration() {return this.conf;}/*** 设置连接信息* * param host* param port* return*/public MongoDBConfiguration configureDB(String host, int port) {return this.configureDB(host, port, false, null, null);}/*** 设置连接信息* * param host* param port* param enableAuth* param username* param password* return*/public MongoDBConfiguration configureDB(String host, int port, boolean enableAuth, String username, String password) {this.conf.set(BIND_HOST_PROPERTY, host);this.conf.setInt(BIND_PORT_PROPERTY, port);if (enableAuth) {this.conf.setBoolean(AUTH_ENABLE_PROPERTY, true);this.conf.set(USERNAME_PROPERTY, username);this.conf.set(PASSWORD_PROPERTY, password);}return this;}/*** 获取MongoDB的连接对象Connection对象* * return* throws UnknownHostException*/public Mongo getMongoConnection() throws UnknownHostException {return new Mongo(new ServerAddress(this.getBindHost(), this.getBindPort()));}/*** 获取设置的host* * return*/public String getBindHost() {return this.conf.get(BIND_HOST_PROPERTY, localhost);}/*** 获取设置的port* * return*/public int getBindPort() {return this.conf.getInt(BIND_PORT_PROPERTY, 27017);}/*** 获取是否开启安全验证默认的Mongodb是不开启的。* * return*/public boolean isEnableAuth() {return this.conf.getBoolean(AUTH_ENABLE_PROPERTY, false);}/*** 获取完全验证所需要的用户名* * return*/public String getUsername() {return this.conf.get(USERNAME_PROPERTY);}/*** 获取安全验证所需要的密码* * return*/public String getPassword() {return this.conf.get(PASSWORD_PROPERTY);}public String getPartition() {return conf.get(PARTITION_PROPERTY, |);}public MongoDBConfiguration setPartition(String partition) {conf.set(PARTITION_PROPERTY, partition);return this;}public String getInputDatabaseName() {return conf.get(INPUT_DATABASE_NAME_PROPERTY, test);}public MongoDBConfiguration setInputDatabaseName(String databaseName) {conf.set(INPUT_DATABASE_NAME_PROPERTY, databaseName);return this;}public String getInputCollectionName() {return conf.get(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, test);}public void setInputCollectionName(String tableName) {conf.set(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, tableName);}public String[] getInputFieldNames() {return conf.getStrings(MongoDBConfiguration.INPUT_FIELD_NAMES_PROPERTY);}public void setInputFieldNames(String... fieldNames) {conf.setStrings(MongoDBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames);}public MapString, String getInputConditions() {MapString, String result new HashMapString, String();String[] conditions conf.getStrings(INPUT_CONDITIONS_PROPERTY);if (conditions ! null conditions.length 0) {String partition this.getPartition();String[] values null;for (String condition : conditions) {values condition.split(partition);if (values ! null values.length 2) {result.put(values[0], values[1]);} else {result.put(condition, null);}}}return result;}public void setInputConditions(MapString, String conditions) {if (conditions ! null conditions.size() 0) {String[] values new String[conditions.size()];String partition this.getPartition();int k 0;for (Map.EntryString, String entry : conditions.entrySet()) {if (entry.getValue() ! null) {values[k] entry.getKey() partition entry.getValue();} else {values[k] entry.getKey();}}conf.setStrings(INPUT_CONDITIONS_PROPERTY, values);}}public Class? extends MongoDBWritable getValueClass() {return conf.getClass(INPUT_CLASS_PROPERTY, NullMongoDBWritable.class, MongoDBWritable.class);}public void setInputClass(Class? extends DBWritable inputClass) {conf.setClass(MongoDBConfiguration.INPUT_CLASS_PROPERTY, inputClass, DBWritable.class);}public String getOutputDatabaseName() {return conf.get(OUTPUT_DATABASE_NAME_PROPERTY, test);}public MongoDBConfiguration setOutputDatabaseName(String databaseName) {conf.set(OUTPUT_DATABASE_NAME_PROPERTY, databaseName);return this;}public String getOutputCollectionName() {return conf.get(MongoDBConfiguration.OUTPUT_COLLECTION_NAME_PROPERTY, test);}public void setOutputCollectionName(String tableName) {conf.set(MongoDBConfiguration.OUTPUT_COLLECTION_NAME_PROPERTY, tableName);}public boolean isEnableUseFetchMethod() {return conf.getBoolean(OUTPUT_USE_FETCH_METHOD_PROPERTY, true);}public void setOutputUseFetchMethod(boolean useFetchMethod) {conf.setBoolean(OUTPUT_USE_FETCH_METHOD_PROPERTY, useFetchMethod);}
} MongoDBConfiguration.java package com.gerry.mongo.hadoop2x.mr.mongodb.lib;import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MongoException;public interface MongoDBWritable {/*** 往mongodb的集合中写数据* * param collection* throws MongoException*/public void write(DBCollection collection) throws MongoException;/*** 获取要写的mongoDB对象* * param old* return* throws MongoException*/public DBObject fetchWriteDBObject(DBObject old) throws MongoException;/*** 从mongodb的集合中读数据* * param collection* throws MongoException*/public void readFields(DBObject object) throws MongoException;
} MongoDBWritable.java package com.gerry.mongo.hadoop2x.mr.mongodb.nw;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBConfiguration;
import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBInputFormat;
import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBOutputFormat;
import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBWritable;
import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MongoException;public class Demo {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf new Configuration();// 设置输入的mongodb的数据库和集合以及对应的输入对象value这里的数据库和集合要求存在否则是没有数据的当然没有数据不会出问题conf.set(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, users);conf.set(MongoDBConfiguration.INPUT_DATABASE_NAME_PROPERTY, db_java);conf.setClass(MongoDBConfiguration.INPUT_CLASS_PROPERTY, DemoInputValueAndOutputKey.class, MongoDBWritable.class);Job job Job.getInstance(conf, mongodb-demo);job.setJarByClass(Demo.class);job.setMapperClass(DemoMapper.class);job.setReducerClass(DemoReducer.class);job.setOutputKeyClass(DemoInputValueAndOutputKey.class);job.setOutputValueClass(DemoOutputValue.class);job.setMapOutputKeyClass(DemoInputValueAndOutputKey.class);job.setMapOutputValueClass(NullWritable.class);job.setInputFormatClass(MongoDBInputFormat.class);MongoDBOutputFormat.setOutput(job, foobar2, users); // 这个可以不存在
job.waitForCompletion(true);}public static class DemoOutputValue implements Writable, MongoDBWritable {private Date clientTime;private long count;Overridepublic void write(DBCollection collection) throws MongoException {throw new UnsupportedOperationException();}Overridepublic DBObject fetchWriteDBObject(DBObject old) throws MongoException {BasicDBObjectBuilder builder null;SetString keys new HashSetString();if (old ! null) {keys old.keySet();builder BasicDBObjectBuilder.start(old.toMap());} else {builder new BasicDBObjectBuilder();}// 添加当前对象的value值如果存在同样的key那么加序号builder.append(getKey(keys, time, 0), clientTime).append(getKey(keys, count, 0), this.count);return builder.get();}Overridepublic void readFields(DBObject object) throws MongoException {throw new UnsupportedOperationException();}Overridepublic void write(DataOutput out) throws IOException {out.writeLong(this.clientTime.getTime());out.writeLong(this.count);}Overridepublic void readFields(DataInput in) throws IOException {this.clientTime new Date(in.readLong());this.count in.readLong();}public Date getClientTime() {return clientTime;}public void setClientTime(Date clientTime) {this.clientTime clientTime;}public long getCount() {return count;}public void setCount(long count) {this.count count;}}public static class DemoInputValueAndOutputKey implements MongoDBWritable, WritableComparableDemoInputValueAndOutputKey {private String name;private Integer age;private String sex;Overridepublic void write(DataOutput out) throws IOException {if (this.name null) {out.writeBoolean(false);} else {out.writeBoolean(true);out.writeUTF(this.name);}if (this.age null) {out.writeBoolean(false);} else {out.writeBoolean(true);out.writeInt(this.age);}if (this.sex null) {out.writeBoolean(false);} else {out.writeBoolean(true);out.writeUTF(this.sex);}}Overridepublic void readFields(DataInput in) throws IOException {this.name in.readBoolean() ? in.readUTF() : null;this.age in.readBoolean() ? Integer.valueOf(in.readInt()) : null;this.sex in.readBoolean() ? in.readUTF() : null;}Overridepublic void write(DBCollection collection) throws MongoException {DBObject object new BasicDBObject();object.put(name, this.name);object.put(age, this.age.intValue());object.put(sex, this.sex);collection.insert(object);}Overridepublic void readFields(DBObject object) throws MongoException {this.name (String) object.get(name);this.age (Integer) object.get(age);this.sex (String) object.get(sex);}Overridepublic DBObject fetchWriteDBObject(DBObject old) throws MongoException {BasicDBObjectBuilder builder null;SetString keys new HashSetString();if (old ! null) {keys old.keySet();builder BasicDBObjectBuilder.start(old.toMap());} else {builder new BasicDBObjectBuilder();}// 添加当前对象的value值如果存在同样的key那么加序号if (this.name ! null) {builder.append(getKey(keys, name, 0), this.name);}if (this.age ! null) {builder.append(getKey(keys, age, 0), this.age.intValue());}if (this.sex ! null) {builder.append(getKey(keys, sex, 0), this.sex);}return builder.get();}Overridepublic String toString() {return DemoInputValue [name name , age age , sex sex ];}Overridepublic int compareTo(DemoInputValueAndOutputKey o) {int tmp;if (this.name null) {if (o.name ! null) {return -1;}} else if (o.name null) {return 1;} else {tmp this.name.compareTo(o.name);if (tmp ! 0) {return tmp;}}if (this.age null) {if (o.age ! null) {return -1;}} else if (o.age null) {return 1;} else {tmp this.age - o.age;if (tmp ! 0) {return tmp;}}if (this.sex null) {if (o.sex ! null) {return -1;}} else if (o.sex null) {return 1;} else {return this.sex.compareTo(o.sex);}return 0;}}/*** 直接输出* * author jsliuming* */public static class DemoMapper extends MapperLongWritable, DemoInputValueAndOutputKey, DemoInputValueAndOutputKey, NullWritable {Overrideprotected void map(LongWritable key, DemoInputValueAndOutputKey value, Context context) throws IOException, InterruptedException {context.write(value, NullWritable.get());}}/*** 写出数据只做一个统计操作* * author jsliuming* */public static class DemoReducer extends ReducerDemoInputValueAndOutputKey, NullWritable, DemoInputValueAndOutputKey, DemoOutputValue {private DemoOutputValue outputValue new DemoOutputValue();Overrideprotected void reduce(DemoInputValueAndOutputKey key, IterableNullWritable values, Context context) throws IOException, InterruptedException {long sum 0;for (SuppressWarnings(unused)NullWritable value : values) {sum;}outputValue.setClientTime(new Date());outputValue.setCount(sum);context.write(key, outputValue);}}/*** 转换key作用是当key存在keys集合中的时候在key后面添加序号* * param keys* param key* param index* return*/public static String getKey(SetString keys, String key, int index) {while (keys.contains(key)) {key key (index);}return key;}
} Demo 四、结果截图 转载于:https://www.cnblogs.com/liuming1992/p/4758504.html