当前位置: 首页 > news >正文

icp网站建设个人网页策划书

icp网站建设,个人网页策划书,建筑装饰工程,无锡网站建设设计一段时间内#xff0c;大家都是自己在storm的节点中实现对hbase的操作#xff0c;不管是普通的topo还是在trident中都是这样#xff1b;不知道从那个版本起#xff0c;在storm的压缩包中就多出了好几个jar包#xff0c;把针对habse#xff0c;mysql#xff0c;mongodb等…      一段时间内大家都是自己在storm的节点中实现对hbase的操作不管是普通的topo还是在trident中都是这样不知道从那个版本起在storm的压缩包中就多出了好几个jar包把针对habsemysqlmongodb等等的数据库的操作都写好了框架不需要我们在去自己实现了这里就先解析一下strom-habse这个jar包在普通的topo中是怎么实现的 package org.apache.storm.hbase.bolt;import org.apache.storm.Config; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.commons.lang.Validate; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.storm.hbase.bolt.mapper.HBaseMapper; import org.apache.storm.hbase.common.HBaseClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.io.IOException; import java.util.HashMap; import java.util.Map;/** * ClassName: AbstractHBaseBolt * Description: 抽象的HbaseBolt,在该bolt中不提供任何的功能只是对一些公用的初始值进行了赋值具体的功能由他的子类负责一个子类负责保存一个子类负责查询 */ public abstract class AbstractHBaseBolt extends BaseRichBolt {//这里继承了storm的BaseRichBoltprivate static final Logger LOG LoggerFactory.getLogger(AbstractHBaseBolt.class);protected OutputCollector collector;//hbaseClient 不参与序列化,这是storm-hbase自己实现的HbaseClient,用来连接和访问hbase数据库;在文中后面的地方会说protected transient HBaseClient hBaseClient;//这个bolt所操作的hbase中的表名protected String tableName;//这是storm-hbase中定一个一个接口作用就是把一个接收到的tuple转换为hbase对应的rowkey和columnprotected HBaseMapper mapper;protected String configKey;//会在子类中进行赋值表示对hbase的配置信息在strom的map中的key值/*使用strom-hbase的时候在初始化topology的时候在storm的conf中把habse的相关配置设置到一个map中然后通过这个key在bolt中取到*/public AbstractHBaseBolt(String tableName, HBaseMapper mapper) {Validate.notEmpty(tableName, Table name can not be blank or null);Validate.notNull(mapper, mapper can not be null);this.tableName tableName;this.mapper mapper;}Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {this.collector collector;final Configuration hbConfig HBaseConfiguration.create();MapString, Object conf (MapString, Object)map.get(this.configKey);if(conf null) {//在使用storm-hbase的时候需要在storm的conf中初始化一个map用来存放hbase对应的配置信息throw new IllegalArgumentException(HBase configuration not found using key this.configKey );}if(conf.get(hbase.rootdir) null) {LOG.warn(No hbase.rootdir value found in configuration! Using HBase defaults.);}for(String key : conf.keySet()) {//设置相关的hbase的配置信息hbConfig.set(key, String.valueOf(conf.get(key)));}MapString, Object hbaseConfMap new HashMapString, Object(conf);//conf是一个持久化的map所以这里复制一份给hbaseClient使用//为了能够向后兼容需要把TOPOLOGY_AUTO_CREDENTIALS赋值到hbase的配置信息中有了这个参数nimbus给每个worker自己的凭证信息然后worker这个凭证信息去访问habse在开启kebers认证的时候用的吧hbaseConfMap.put(Config.TOPOLOGY_AUTO_CREDENTIALS, map.get(Config.TOPOLOGY_AUTO_CREDENTIALS));this.hBaseClient new HBaseClient(hbaseConfMap, hbConfig, tableName);//创建hbaseClient}Overridepublic void cleanup() {//正确关闭hbaseClienttry {hBaseClient.close();} catch (IOException e) {LOG.error(HBase Client Close Failed , e);}} }上面的代码是一个抽象的hbase的bolt和我们写bolt的时候一样继承了BaseRichBolt虽然我偶尔集成basebasicBolt在代码中的prepare方法中主要做了下面的事情 1.初始化了collector因为只有在运行到prepare方法的时候才能够获得到collector的实例 2.初始化了hBaseClient在这里初始化hBaseCilent是因为他是transient修饰的不会被序列化所以在nimbus下发任务的时候该对象是null只能在prepare中实例化这么做的原因是hBaseClient在网络序列化传输的过程中存在很多无法反序列化的情况(具体一点就举一个例子在hbaseClient中会建立客户端和集群之间的代理对象进行rpc通信所以nimbus不可能把他建立的rpc通信通过网络传输赋值给某个worker因为就算worker拿到了这个client对象也并没有建立rpc通信) 在构造方法中则赋值了可以在网络之间传输的tablename和mapper 在cleanup方法中主要负责安全地关闭hbaseClient 下面来看他的两个子类他们分别用来写入hbase和查询habse主要逻辑就是实现execute方法了。 1向hbase写入数据的bolt源码与注释如下 package org.apache.storm.hbase.bolt;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; import org.apache.storm.utils.BatchHelper; import org.apache.storm.utils.TupleUtils; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Mutation; import org.apache.storm.hbase.bolt.mapper.HBaseMapper; import org.apache.storm.hbase.common.ColumnList; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.util.Map; import java.util.List; import java.util.LinkedList;/*** Basic bolt for writing to HBase.向habse写入数据的最基础的bolt** Note: Each HBaseBolt defined in a topology is tied to a specific table.注意每一个habseBolt都与habse中一张特定的表绑定在一起**/ public class HBaseBolt extends AbstractHBaseBolt {private static final long serialVersionUID 5638715724596618454L;private static final Logger LOG LoggerFactory.getLogger(HBaseBolt.class);private static final int DEFAULT_FLUSH_INTERVAL_SECS 1;//默认的保存周期为1秒boolean writeToWAL true;//是否写入到滚动日志在hbase中滚动日志的存在能够确保在断电等紧急情况发生后重新开机数据不丢失但是会降低吞吐量ListMutation batchMutations;//要保存到hbase的数据可能是存放数据也可能是计数int flushIntervalSecs DEFAULT_FLUSH_INTERVAL_SECS;int batchSize;//在batchHelper中的batchSize的大小默认为0BatchHelper batchHelper;//storm中自带的批处理工具public HBaseBolt(String tableName, HBaseMapper mapper) {super(tableName, mapper);this.batchMutations new LinkedList();}public HBaseBolt writeToWAL(boolean writeToWAL) {this.writeToWAL writeToWAL;return this;}public HBaseBolt withConfigKey(String configKey) {this.configKey configKey;return this;}public HBaseBolt withBatchSize(int batchSize) {this.batchSize batchSize;return this;}public HBaseBolt withFlushIntervalSecs(int flushIntervalSecs) {this.flushIntervalSecs flushIntervalSecs;return this;}Overridepublic MapString, Object getComponentConfiguration() {//设置每隔flushIntervalSecs发送一次心跳信息的tuplereturn TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalSecs);}Overridepublic void execute(Tuple tuple) {try {if (batchHelper.shouldHandle(tuple)) {//接收到的tuple是需要处理的数据tuple(接收到的是业务tuple)byte[] rowKey this.mapper.rowKey(tuple);//通过mapper来获得hbase中对应的rowkeyColumnList cols this.mapper.columns(tuple);//把tuple中的列相关的数据映射为hbase的col这个ColumnList是封装过的封装了hbase的普通的列和计数列两种后面会说/*这里正式将封装过的结果映射为一个mutation的list然后直接调用hbase的API对这个list进行批量保存*/ListMutation mutations hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL);batchMutations.addAll(mutations);//放到总的batchMutations中batchHelper.addBatch(tuple);//将要批量保存的tuple缓存到batch中}if (batchHelper.shouldFlush()) {//要保存数据到hbase中当接收到tick tuple或者在上面的代码执行后缓存的tuple达到了batchsize的时候执行this.hBaseClient.batchMutate(batchMutations);//调用hbaseClient进行批量保存LOG.debug(acknowledging tuples after batchMutate);batchHelper.ack();//批量确认batchMutations.clear();//清空}} catch(Exception e){batchHelper.fail(e);//处理失败batchMutations.clear();//清空 这里还不太确定在失败以后整个batch的tuple都会失败重发所以要清空}}Overridepublic void prepare(SuppressWarnings(rawtypes) Map map, TopologyContext topologyContext, OutputCollector collector) {super.prepare(map, topologyContext, collector);//这个必须有要调用父类中的prepare方法做很多事情this.batchHelper new BatchHelper(batchSize, collector);//趋势化一个batchHelper,大小就是配置的batchsize}Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {} }(2)从hbase中读取信息 package org.apache.storm.hbase.bolt;import org.apache.commons.lang.Validate; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.storm.hbase.bolt.mapper.HBaseMapper; import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria; import org.apache.storm.hbase.bolt.mapper.HBaseValueMapper; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.TupleUtils;import com.google.common.collect.Lists;/*** Basic bolt for querying from HBase.** Note: Each HBaseBolt defined in a topology is tied to a specific table.**/ public class HBaseLookupBolt extends AbstractHBaseBolt {private static final long serialVersionUID 8253062226922790455L;/*这个是用来将从habse查询到的result转化为tuple以及声明输出的field字段的需要自己实现后面介绍*/private HBaseValueMapper rowToTupleMapper;/*这个主要用于辅助get查询里面分装了要查询的columFamily 和 columFamily:qualifier 两数据以便于在get的时候可以只指定列族也可以指定特定的列后面会介绍*/private HBaseProjectionCriteria projectionCriteria;public HBaseLookupBolt(String tableName, HBaseMapper mapper, HBaseValueMapper rowToTupleMapper){super(tableName, mapper);Validate.notNull(rowToTupleMapper, rowToTupleMapper can not be null);this.rowToTupleMapper rowToTupleMapper;}public HBaseLookupBolt withConfigKey(String configKey){this.configKey configKey;return this;}public HBaseLookupBolt withProjectionCriteria(HBaseProjectionCriteria projectionCriteria) {this.projectionCriteria projectionCriteria;return this;}Overridepublic void execute(Tuple tuple) {if (TupleUtils.isTick(tuple)) {//如果是心跳信息的tuple那么直接忽略不处理collector.ack(tuple);//直接确认这个tick tuplereturn;}byte[] rowKey this.mapper.rowKey(tuple);Get get hBaseClient.constructGetRequests(rowKey, projectionCriteria);//构建查询try {/*其实操作很简单 就是 table.get(get)操作他这里看着有些不对劲的地方就是调用批量查询方法来查询单个的get这是为了匹配他们自己封装的接口也就是没有多封装一下*/Result result hBaseClient.batchGet(Lists.newArrayList(get))[0];for(Values values : rowToTupleMapper.toValues(tuple, result)) {//把result转换为tuple,并逐一发送this.collector.emit(tuple, values);}this.collector.ack(tuple);//发送确认消息} catch (Exception e) {//出错处理this.collector.reportError(e);this.collector.fail(tuple);}}Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {rowToTupleMapper.declareOutputFields(outputFieldsDeclarer);//声明输出field字段} }接着是hbaseClient的代码注释 package org.apache.storm.hbase.common;import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria; import org.apache.storm.hbase.security.HBaseSecurityUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.io.Closeable; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.List; import java.util.Map;public class HBaseClient implements Closeable{private static final Logger LOG LoggerFactory.getLogger(HBaseClient.class);private HTable table;//Hbase中的table对象public HBaseClient(MapString, Object map , final Configuration configuration, final String tableName) {try {UserProvider provider HBaseSecurityUtil.login(map, configuration);//这里使用了Hbase的安全登录方式来进行登录this.table provider.getCurrent().getUGI().doAs(new PrivilegedExceptionActionHTable() {Overridepublic HTable run() throws IOException {return new HTable(configuration, tableName);//创建hbase中的table}});} catch(Exception e) {throw new RuntimeException(HBase bolt preparation failed: e.getMessage(), e);}}public ListMutation constructMutationReq(byte[] rowKey, ColumnList cols, Durability durability) {//创建一个mutation的LIST用来保存/更新ListMutation mutations Lists.newArrayList();if (cols.hasColumns()) {//如果有标准的column要保存Put put new Put(rowKey);//创建put对象put.setDurability(durability);//设置持久行for (ColumnList.Column col : cols.getColumns()) {if (col.getTs() 0) {//有时间戳的加时间戳put.add(col.getFamily(),col.getQualifier(),col.getTs(),col.getValue());} else {//没有时间戳的不加时间戳put.add(col.getFamily(),col.getQualifier(),col.getValue());}}mutations.add(put);//把put缓存起来}if (cols.hasCounters()) {//如果有计数列要保存Increment inc new Increment(rowKey);//创建一个increment对象inc.setDurability(durability);//设置持久性for (ColumnList.Counter cnt : cols.getCounters()) {inc.addColumn(cnt.getFamily(),cnt.getQualifier(),cnt.getIncrement());}mutations.add(inc);}if (mutations.isEmpty()) {//如果即没有计数也没有标准的column那就添加一个空的---也就是只保存rowkeymutations.add(new Put(rowKey));}return mutations;}public void batchMutate(ListMutation mutations) throws Exception {//批量提交mutation列表中的插入和更新操作Object[] result new Object[mutations.size()];try {table.batch(mutations, result);} catch (InterruptedException e) {LOG.warn(Error performing a mutation to HBase., e);throw e;} catch (IOException e) {LOG.warn(Error performing a mutation to HBase., e);throw e;}}public Get constructGetRequests(byte[] rowKey, HBaseProjectionCriteria projectionCriteria) {//创建查询操作Get get new Get(rowKey);//创建一个habse的get对象if (projectionCriteria ! null) {for (byte[] columnFamily : projectionCriteria.getColumnFamilies()) {//获取要查询的列族get.addFamily(columnFamily);}for (HBaseProjectionCriteria.ColumnMetaData columnMetaData : projectionCriteria.getColumns()) {//获取要查询的列get.addColumn(columnMetaData.getColumnFamily(), columnMetaData.getQualifier());}}return get;}public Result[] batchGet(ListGet gets) throws Exception {//批量查好多个gettry {return table.get(gets);} catch (Exception e) {LOG.warn(Could not perform HBASE lookup., e);throw e;}}Overridepublic void close() throws IOException {//关闭操作table.close();} }在hbaseClient中用到了HBaseSecurityUtil其代码注释如下 package org.apache.storm.hbase.security;import static org.apache.storm.Config.TOPOLOGY_AUTO_CREDENTIALS;import java.io.IOException; import java.net.InetAddress; import java.util.List; import java.util.Map;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory;/*** This class provides util methods for storm-hbase connector communicating* with secured HBase.*/ public class HBaseSecurityUtil {private static final Logger LOG LoggerFactory.getLogger(HBaseSecurityUtil.class);public static final String STORM_KEYTAB_FILE_KEY storm.keytab.file;public static final String STORM_USER_NAME_KEY storm.kerberos.principal;private static UserProvider legacyProvider null;SuppressWarnings(rawtypes)public static UserProvider login(Map conf, Configuration hbaseConfig) throws IOException {//Allowing keytab based login for backward compatibility.(为了向后兼容允许使用keytab)if (UserGroupInformation.isSecurityEnabled() (conf.get(TOPOLOGY_AUTO_CREDENTIALS) null ||!(((List) conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHBase.class.getName())))) {//如果开启了安全认证并且不是自动的安全认证登录那么进行登录LOG.info(Logging in using keytab as AutoHBase is not specified for TOPOLOGY_AUTO_CREDENTIALS);//insure that if keytab is used only one login per process executedif(legacyProvider null) {synchronized (HBaseSecurityUtil.class) {if(legacyProvider null) {//初始化一个userProviderlegacyProvider UserProvider.instantiate(hbaseConfig);/*下面就是一堆安全登录的代码有兴趣自己看*/String keytab (String) conf.get(STORM_KEYTAB_FILE_KEY);if (keytab ! null) {hbaseConfig.set(STORM_KEYTAB_FILE_KEY, keytab);}String userName (String) conf.get(STORM_USER_NAME_KEY);if (userName ! null) {hbaseConfig.set(STORM_USER_NAME_KEY, userName);}legacyProvider.login(STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY,InetAddress.getLocalHost().getCanonicalHostName());}}}return legacyProvider;} else {//如果布开启安全认证或者是自动安全认证那么就直接实例化一个userProviderreturn UserProvider.instantiate(hbaseConfig);}} }然后是HBaseMapper的代码注释 package org.apache.storm.hbase.bolt.mapper;import org.apache.storm.tuple.Tuple; import org.apache.storm.hbase.common.ColumnList;import java.io.Serializable;/*** Maps a codeorg.apache.storm.tuple.Tuple/code object* to a row in an HBase table.* 这是一个接口需要自己实现主要提供了两个方法一个是如何从tuple中获取rowKey一个是如何从tuple中获取columns* 自带一个简单实现在下面的代码中会看到*/ public interface HBaseMapper extends Serializable {/*** Given a tuple, return the HBase rowkey.** param tuple* return*/byte[] rowKey(Tuple tuple);/*** Given a tuple, return a list of HBase columns to insert.** param tuple* return*/ColumnList columns(Tuple tuple);}他的简单实现如下package org.apache.storm.hbase.bolt.mapper;import static org.apache.storm.hbase.common.Utils.toBytes; import static org.apache.storm.hbase.common.Utils.toLong;import org.apache.storm.hbase.common.ColumnList; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple;public class SimpleHBaseMapper implements HBaseMapper {private static final long serialVersionUID 1L;private String rowKeyField;//hbase中的rowKey在tuple中对应的fieldprivate byte[] columnFamily;//列族private Fields columnFields;//hbase中普通列对应的tuple中的字段数组private Fields counterFields;//hbase中计数列对应的tuple中的字段数组public SimpleHBaseMapper(){}public SimpleHBaseMapper withRowKeyField(String rowKeyField){//就是一堆set方法,不过链式调用起来很爽的感觉···this.rowKeyField rowKeyField;return this;}public SimpleHBaseMapper withColumnFields(Fields columnFields){this.columnFields columnFields;return this;}public SimpleHBaseMapper withCounterFields(Fields counterFields){this.counterFields counterFields;return this;}public SimpleHBaseMapper withColumnFamily(String columnFamily){this.columnFamily columnFamily.getBytes();return this;}Overridepublic byte[] rowKey(Tuple tuple) {//返回rowKey的值Object objVal tuple.getValueByField(this.rowKeyField);return toBytes(objVal);}Overridepublic ColumnList columns(Tuple tuple) {//这个columnList也是storm-hbase自定义的见下面ColumnList cols new ColumnList();if(this.columnFields ! null){//如果普通的列的字段不为空那么给添加一个hbase中的列数据{列族--列--值}for(String field : this.columnFields){cols.addColumn(this.columnFamily, field.getBytes(), toBytes(tuple.getValueByField(field)));}}if(this.counterFields ! null){//如果计数列的字段不为空那么添加一个 计数值{列族--列--计数值(一定是long类型的)}for(String field : this.counterFields){cols.addCounter(this.columnFamily, field.getBytes(), toLong(tuple.getValueByField(field)));}}return cols;} }在很多地方用到了ColumnList其代码注释如下package org.apache.storm.hbase.common;import java.util.ArrayList; import java.util.List;/*** Represents a list of HBase columns.代表一个hbase的column的列表** There are two types of columns, istandard/i and icounter/i.有两种类型的column分别是标准的和计数的** Standard columns have icolumn family/i (required), iqualifier/i (optional),* itimestamp/i (optional), and a ivalue/i (optional) values.* 标准的列有column family必须有qualifier可选时间错可选以及值可选 ** Counter columns have icolumn family/i (required), iqualifier/i (optional),* and an iincrement/i (optional, but recommended) values.*计数列有columnfamily必须有qualifier可选以及一个要增加的数值可选但是建议填上*** Inserts/Updates can be added via the codeaddColumn()/code and codeaddCounter()/code* methods.*插入/更新操作可以通过addColumn()和addCounter()来添加到对象中**/ public class ColumnList {public static abstract class AbstractColumn {//一个抽象的column提供了最基本的famliy和qualifierbyte[] family, qualifier;AbstractColumn(byte[] family, byte[] qualifier){this.family family;this.qualifier qualifier;}public byte[] getFamily() {return family;}public byte[] getQualifier() {return qualifier;}}public static class Column extends AbstractColumn {//标准的columnbyte[] value;long ts -1;//默认的时间戳为-1Column(byte[] family, byte[] qualifier, long ts, byte[] value){super(family, qualifier);this.value value;this.ts ts;}public byte[] getValue() {return value;}public long getTs() {return ts;}}public static class Counter extends AbstractColumn {//计数的columnlong incr 0;//默认计数值为0Counter(byte[] family, byte[] qualifier, long incr){super(family, qualifier);this.incr incr;}public long getIncrement() {return incr;}}private ArrayListColumn columns;private ArrayListCounter counters;private ArrayListColumn columns(){if(this.columns null){this.columns new ArrayListColumn();}return this.columns;}private ArrayListCounter counters(){if(this.counters null){this.counters new ArrayListCounter();}return this.counters;}/*** Add a standard HBase column.** param family* param qualifier* param ts* param value* return*/public ColumnList addColumn(byte[] family, byte[] qualifier, long ts, byte[] value){//添加一个标准的column到对象中columns().add(new Column(family, qualifier, ts, value));return this;}/*** Add a standard HBase column* param family* param qualifier* param value* return*/public ColumnList addColumn(byte[] family, byte[] qualifier, byte[] value){columns().add(new Column(family, qualifier, -1, value));return this;}/*** Add a standard HBase column given an instance of a class that implements* the codeIColumn/code interface.* param column* return*/public ColumnList addColumn(IColumn column){return this.addColumn(column.family(), column.qualifier(), column.timestamp(), column.value());}/*** Add an HBase counter column.** param family* param qualifier* param incr* return*/public ColumnList addCounter(byte[] family, byte[] qualifier, long incr){//添加一个计数columncounters().add(new Counter(family, qualifier, incr));return this;}/*** Add an HBase counter column given an instance of a class that implements the* codeICounter/code interface.* param counter* return*/public ColumnList addCounter(ICounter counter){return this.addCounter(counter.family(), counter.qualifier(), counter.increment());}/*** Query to determine if we have column definitions.** return*/public boolean hasColumns(){return this.columns ! null;}/*** Query to determine if we have counter definitions.** return*/public boolean hasCounters(){return this.counters ! null;}/*** Get the list of column definitions.** return*/public ListColumn getColumns(){return this.columns;}/*** Get the list of counter definitions.* return*/public ListCounter getCounters(){return this.counters;}}在ColumnList中有通过IColumn和ICounter来添加标准column和计数column的方法对应的接口如下/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* License); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an AS IS BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package org.apache.storm.hbase.common;/*** Interface definition for classes that support being written to HBase as* a regular column.**/ public interface IColumn {byte[] family();byte[] qualifier();byte[] value();long timestamp(); }/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* License); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an AS IS BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package org.apache.storm.hbase.common;/*** Interface definition for classes that support being written to HBase as* a counter column.**/ public interface ICounter {byte[] family();byte[] qualifier();long increment(); }HBaseProjectionCriteria的代码注释如下package org.apache.storm.hbase.bolt;import org.apache.commons.lang.Validate; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.storm.hbase.bolt.mapper.HBaseMapper; import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria; import org.apache.storm.hbase.bolt.mapper.HBaseValueMapper; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.TupleUtils;import com.google.common.collect.Lists;/*** Basic bolt for querying from HBase.** Note: Each HBaseBolt defined in a topology is tied to a specific table.**/ public class HBaseLookupBolt extends AbstractHBaseBolt {private static final long serialVersionUID 8253062226922790455L;/*这个是用来将从habse查询到的result转化为tuple以及声明输出的field字段的需要自己实现后面介绍*/private HBaseValueMapper rowToTupleMapper;/*这个主要用于辅助get查询里面分装了要查询的columFamily 和 columFamily:qualifier 两数据以便于在get的时候可以只指定列族也可以指定特定的列后面会介绍*/private HBaseProjectionCriteria projectionCriteria;public HBaseLookupBolt(String tableName, HBaseMapper mapper, HBaseValueMapper rowToTupleMapper){super(tableName, mapper);Validate.notNull(rowToTupleMapper, rowToTupleMapper can not be null);this.rowToTupleMapper rowToTupleMapper;}public HBaseLookupBolt withConfigKey(String configKey){this.configKey configKey;return this;}public HBaseLookupBolt withProjectionCriteria(HBaseProjectionCriteria projectionCriteria) {this.projectionCriteria projectionCriteria;return this;}Overridepublic void execute(Tuple tuple) {if (TupleUtils.isTick(tuple)) {//如果是心跳信息的tuple那么直接忽略不处理collector.ack(tuple);//直接确认这个tick tuplereturn;}byte[] rowKey this.mapper.rowKey(tuple);Get get hBaseClient.constructGetRequests(rowKey, projectionCriteria);//构建查询try {/*其实操作很简单 就是 table.get(get)操作他这里看着有些不对劲的地方就是调用批量查询方法来查询单个的get这是为了匹配他们自己封装的接口也就是没有多封装一下*/Result result hBaseClient.batchGet(Lists.newArrayList(get))[0];for(Values values : rowToTupleMapper.toValues(tuple, result)) {//把result转换为tuple,并逐一发送this.collector.emit(tuple, values);}this.collector.ack(tuple);//发送确认消息} catch (Exception e) {//出错处理this.collector.reportError(e);this.collector.fail(tuple);}}Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {rowToTupleMapper.declareOutputFields(outputFieldsDeclarer);//声明输出field字段} }最后是HBaseValueMapper的代码注释package org.apache.storm.hbase.bolt.mapper;import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.ITuple; import org.apache.storm.tuple.Values; import org.apache.hadoop.hbase.client.Result;import java.io.Serializable; import java.util.List;//这是一个接口主要定义了如何把hbase中的查询结果映射人tuple中的values以及如何声明tuple的输出字段 public interface HBaseValueMapper extends Serializable {/**** param input tuple.* param result HBase lookup result instance.* return list of values that should be emitted by the lookup bolt.* throws Exception*/public ListValues toValues(ITuple input, Result result) throws Exception;/*** declares the output fields for the lookup bolt.* param declarer*/void declareOutputFields(OutputFieldsDeclarer declarer); }
http://www.pierceye.com/news/50753/

相关文章:

  • 南京市建设执业资格中心网站大连网站建设主页
  • 网站域名地址查询浙江建设工程信息网官网入口网址
  • 快手做任务网站无货源电商选品软件
  • 音乐网站开发企业部门网站建设流程
  • 个体户 网站建设wordpress 插件 更新
  • 深圳市网站建设公电商网站规划的开发背景
  • 上街区网站建设视频直播网站开发运营步骤
  • 网站后期维护流程人武部正规化建设的意义
  • 不规则网站模板每天做特卖的网站是哪个
  • 做微信的网站秀客搜索引擎营销广告
  • 网站开发平台介绍wordpress 添加侧边栏
  • 网站建设开题报告数据库建立30个免费货源网站
  • 网站建设pdf 下载工商银行建设银行招商银行网站
  • 做网站优化如何遍文章seo顾问张智伟
  • 设计企业网站首页成都广告传媒公司前十名
  • 建设网站哪些好做网站 后端是谁来做的
  • 北京做网站好的公司青岛专业设计网站公司
  • 嘉兴做网站seo的国家企业信用系统查询系统
  • 专业做招聘的网站有哪些贵州省建设厅公示网站
  • 新网站制作怎么样seo整站优化一年价格多少
  • 阿里云建站流程天元建设集团有限公司承兑汇票兑付
  • 网站网站在国外做影视网站用什么网盘最好
  • 广州做手机网站咨询网站开发组件拖拽
  • seo关键词优化提高网站排名网站地图对网站有什么意义
  • 企业电子网站的建设案例十堰外贸网站建设
  • 青岛专业网站开发百度网站推广电话
  • 公司刚做网站在那里找图片做前端转行可以找啥工作
  • 环球易购做中东的网站网站上做播放器流量算谁的
  • 国内团购网站做的最好的是北京软件开发年薪
  • 商务贸易网站建设万网搜