网站开发用什么电脑,群辉 wordpress 端口号,青岛君哲网站建设公司怎么样,推广软件哪个赚钱当需要大批量的向Hbase导入数据时#xff0c;我们可以使用Hbase Bulkload的方式#xff0c;这种方式是先生成Hbase的底层存储文件 HFile#xff0c;然后直接将这些 HFile 移动到Hbase的存储目录下。它相比调用Hbase 的 put 接口添加数据#xff0c;处理效率更快并且对Hbase… 当需要大批量的向Hbase导入数据时我们可以使用Hbase Bulkload的方式这种方式是先生成Hbase的底层存储文件 HFile然后直接将这些 HFile 移动到Hbase的存储目录下。它相比调用Hbase 的 put 接口添加数据处理效率更快并且对Hbase 运行影响更小。下面假设我们有一个 CSV 文件是存储用户购买记录的。它一共有三列 order_idconsumerproduct。我们需要将这个文件导入到Hbase里其中 order_id 作为Hbase 的 row key。12345bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator$\x01-Dimporttsv.columnsHBASE_ROW_KEY,cf:consumer,cf:product -Dimporttsv.bulk.output bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles 可以看到批量导入只需要上述两部 生成 HFile 文件 和 加载 HFile 文件。下面我们来深入了解其原理底层实现原理生成 HFile 是调用了 MapReduce 来实现的。它有两种实现方式虽然最后生成的 HFile 是一样的但中间过程却是不一样。现在我们先回顾下 MapReduce 的编程模型主要分为下列组件InputFormat负责读取数据源并且将数据源切割成多个分片分片的数目等于Map的数目Mapper负责接收分片生成中间结果K 为数据的 key 值类型V为数据的 value 值类型ReducerMapper的数据会按照 key 值分组Reducer接收的数据格式OutputFormat负责将Reducer生成的数据持久化比如存储到 hdfs。MapReduce 实现 一MapReducer 程序中各个组件的实现类如下所示InputFormat 类TextInputFormat数据输出格式 LongWritableText(数据所在行号行数据)Mapper 类TsvImporterTextMapper数据输出格式 ImmutableBytesWritable, Text(row key行数据)Reduce 类TextSortReducer数据输出格式 ImmutableBytesWritable, KeyValue (row key单列数据)OutputFormat 类HFileOutputFormat2负责将结果持久化 HFile执行过程如下TextInputFormat 会读取数据源文件按照文件在 hdfs 的 Block 切割每个Block对应着一个切片Mapper 会解析每行数据然后从中解析出 row key生成(row key 行数据)Reducer 会解析行数据为每列生成 KeyValue。这里简单说下 KeyValue它是 Hbase 存储每列数据的格式 详细原理后面会介绍到。如果一个 row key 对应的列过多它会将列分批处理。处理完一批数据之后会写入(nullnull)这一条特殊的数据表示 HFileOutputFormat2 在持久化的过程中需要新创建一个 HFile。这里简单的说下 TextSortReducer它的原理与下面的实现方式二使用到的 PutSortReducer 相同只不过从 Map 端接收到的数据为原始的行数据。如果 row key 对应的数据过多时它也会使用 TreeSet 来去重TreeSet 保存的数据最大字节数不能超过1GB。如果超过了那么就会分批输。MapReduce 实现 二MapReducer 程序中各个组件的实现类如下所示InputFormat 类TextInputFormat数据输出格式 LongWritableText(数据所在行号数据)Mapper 类TsvImporterMapper数据输出格式 ImmutableBytesWritablePut (row keyPut)Combiner 类PutCombinerReducer 类PutSortReducer数据输出格式 ImmutableBytesWritable, KeyValue(row key单列数据)OutputFormat 类HFileOutputFormat2负责将结果持久化 HFile这里使用了 Combiner它的作用是在 Map 端进行一次初始的 reduce 操作起到聚合的作用这样就减少了 Reduce 端与 Map 端的数据传输提高了运行效率。执行过程如下TextInputFormat 会读取数据源文件原理同实现 一Mapper 会解析每行数据然后从中解析出 row key并且生成 Put 实例。生成(row key Put)Combiner 会按照 row key 将多个 Put 进行合并它也是分批合并的。Reducer 会遍历 Put 实例为每列生成 KeyValue 并且去重。这里讲下PutSortReducer的具体实现下面的代码经过简化去掉了KeyValue中关于Tag的处理1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556public class PutSortReducer extends ReducerImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue { // the cell creator private CellCreator kvCreator; Override protected void reduce( ImmutableBytesWritable row, java.lang.Iterable puts, Reducer ImmutableBytesWritable, KeyValue.Context context) throws java.io.IOException, InterruptedException { // 这里指定了一个阈值默认为10GB过大。如果puts中不重复的数据过大就会按照这个阈值分批处理 long threshold context.getConfiguration().getLong( putsortreducer.row.threshold, 1L * (130)); Iterator iter puts.iterator(); // 开始遍历 puts列表 while (iter.hasNext()) { // 这个TreeSet就是用来去重的比如向同个qualifier添加值 TreeSet map new TreeSet(CellComparator.getInstance()); // 记录map里保存的数据长度 long curSize 0; // 遍历 puts列表直到不重复的数据不超过阈值 while (iter.hasNext() curSize threshold) { // 从列表中获取值 Put p iter.next(); // 遍历这个Put的所有列值一个Put包含了多列这些列由Cell表示 for (List cells: p.getFamilyCellMap().values()) { for (Cell cell: cells) { KeyValue kv null; kv KeyValueUtil.ensureKeyValue(cell); } if (map.add(kv)) { // 如果这列值没有重复那么添加到TreeSet中并且更新curSize的值 curSize kv.heapSize(); } } } } // 将map里的数据调用context.write方法输出 int index 0; for (KeyValue kv : map) { context.write(row, kv); if (index % 100 0) context.setStatus(Wrote index); } // 如果还有那么说明此行数据过大那么就会输出一条特殊的记录(null, null) if (iter.hasNext()) { // force flush because we cannot guarantee intra-row sorted order context.write(null, null); } } }}从上面的代码可以看到PutSortReducer会使用到TreeSet去重TreeSet会保存数据默认不超过 1GB。如果当Reducer的内存设置过小时并且数据过大时是有可能会造成内存溢出。如果遇到这种情况可以通过减少阈值或者增大Reducer的内存。两种实现方式比较第一种方式实现简单它从Map 端传递到 Reduce 端的中间结果的数据格式很紧凑如果是数据源重复的数据不多建议使用这种。第二种方式实现相对复杂它从Map 端传递到 Reduce 端的中间结果的数据格式使用 Put 来表示它的数据存储比原始的数据要大。但是它使用了 Combiner 来初步聚合减小了 Map 端传递到 Reduce 端的数据大小。如果是数据源重复比较多建议采用第二种方式。Hbase 默认采用第二种方式如果用户想使用第一种方式需要在运行命令时指定 importtsv.mapper.class 的值为 org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper。数据源解析Mapper 接收到数据后需要解析每行数据从中读取各列的值。它会按照分割符来切割数据然后根据指定的列格式生成每列的数据。客户在使用命令时通过 importtsv.separator 参数指定分隔符通过 importtsv.columns 参数指定列格式。在客户端指定的列名中 有些会有着特殊含义比如 HBASE_ROW_KEY 代表着该列是作为 row keyHBASE_TS_KEY 代表着该列作为数据的 timestampHBASE_ATTRIBUTES_KEY 代表着该列是属性列等。TsvParser 类负责解析数据它定义在 ImportTsv 类里。这里需要注意下它不支持负责的 CSV 格式只是简单的根据分隔符作为列的划分根据换行符作为每条数据的划分。它的原理比较简单这里不再详细介绍。Reducer的数目选择我们知道MapReduce程序的一般瓶颈在于 reduce 阶段如果我们能够适当增加 reduce 的数目一般能够提高运行效率(如果数据倾斜不严重)。我们还知道 Hbase 支持超大数据量的表它会将表的数据自动切割分布在不同的服务上。这些数据切片在 Hbase 里称为Region 每个Region只负责一段 row key 范围的数据。Hbase 在批量导入的时候会去获取表的 Region 分布情况然后将 Reducer 的数目 设置为 Region 数目。如果在导入数据之前还没有创建表Hbase会自动创建但是创建的表的region数只有一个。所以在生成HFile之前我们可以自行创建表并指定 Reigion 的分布情况那么就能提高 Reducer 的数目。Reducer 的数目决定是在 HFileOutputFormat2 的 configureIncrementalLoad 方法里。它会读取表的 region 分布情况然后调用 setNumReduceTasks 方法设置 reduce 数目。下面的代码经过简化12345678910111213141516171819202122232425public class HFileOutputFormat2 extends FileOutputFormatImmutableBytesWritable, Cell { public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor, RegionLocator regionLocator) throws IOException { ArrayList singleTableInfo new ArrayList(); singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator)); configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class); } static void configureIncrementalLoad(Job job, List multiTableInfo, Class extends OutputFormat, ? cls) throws IOException { // 这里虽然支持多表但是批量导入时只会使用单表 List regionLocators new ArrayList( multiTableInfo.size()); for( TableInfo tableInfo : multiTableInfo ) { // 获取region分布情况 regionLocators.add(tableInfo.getRegionLocator()); ...... } // 获取region的row key起始大小 List startKeys getRegionStartKeys(regionLocators, writeMultipleTables); // 设置reduce的数目 job.setNumReduceTasks(startKeys.size()); } }Hbase 数据存储格式Hbase的每列数据都是单独存储的都是以 KeyValue 的形式。KeyValue 的数据格式如下图所示123----------------------------------------------- keylength | valuelength | key | value | Tags-----------------------------------------------其中 key 的格式如下123---------------------------------------------------------------------------------------------- rowlength | row | columnfamilylength | columnfamily | columnqualifier | timestamp | keytype ----------------------------------------------------------------------------------------------Tags的格式如下123------------------------- tagslength | tagsbytes -------------------------tagsbytes 可以包含多个 tag每个 tag 的格式如下123---------------------------------- taglength | tagtype | tagbytes----------------------------------Reducer 会使用 CellCreator 类负责生成 KeyValue。CellCreator 的原理很简单这里不再详细介绍。生成 HFileHFileOutputFormat2 负责将Reduce的结果持久化成 HFile 文件。持久化目录的格式如下1234567.|---- column_family_1| |---- uuid_1| ---- uuid_2|---- column_family_2| |---- uuid3| ---- uuid4每个 column family 对应一个目录这个目录会有多个 HFile 文件。HFileOutputFormat2 会创建 RecordWriter 实例所有数据的写入都是通过 RecordWriter。12345678910111213141516171819public class HFileOutputFormat2 extends FileOutputFormatImmutableBytesWritable, Cell { Override public RecordWritergetRecordWriter( final TaskAttemptContext context) throws IOException, InterruptedException { // 调用createRecordWriter方法创建 return createRecordWriter(context, this.getOutputCommitter(context)); } static RecordWriter createRecordWriter(final TaskAttemptContext context, final OutputCommitter committer) throws IOException { // 实例化一个匿名类 return new RecordWriter() { ...... } }}可以看到 createRecordWriter 方法返回了一个匿名类。继续看看这个匿名类的定义123456789101112// 封装了StoreFileWriter记录了写入的数据长度static class WriterLength { long written 0; StoreFileWriter writer null;}class RecordWriterImmutableBytesWritable, V() { // key值为表名和column family组成的字节value为对应的writer private final Mapbyte[], WriterLength writers new TreeMap(Bytes.BYTES_COMPARATOR); // 是否需要创建新的HFile private boolean rollRequested false;}从上面 WriterLength 类的定义我们可以知道 RecordWriter的底层原理是调用了StoreFileWriter的接口。对于StoreFile我们回忆下Hbase的写操作它接收客户端的写请求首先写入到内存中MemoryStore然后刷新到磁盘生成StoreFile。如果该表有两个column family就会有两个MemoryStore和两个StoreFile对应于不同的column family。所以 RecordWriter 类有个哈希表记录着每个 column family 的 StoreFileWriter。(这里说的 StoreFile 也就是 HFile)因为 HFile 支持不同的压缩算法不同的块大小RecordWriter 会根据配置获取HFile的格式然后创建对应的 StoreFileWriter。下面创建 StoreFileWriter 时只指定了文件目录StoreFileWriter会在这个目录下使用 uuid 生成一个唯一的文件名。1234567891011121314151617181920212223242526272829303132333435363738class RecordWriterImmutableBytesWritable, V() { // favoredNodes 表示创建HFile文件希望尽可能在这些服务器节点上 private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration conf, InetSocketAddress[] favoredNodes) throws IOException { // 根据表名和column family生成唯一字节 byte[] tableAndFamily getTableNameSuffixedWithFamily(tableName, family); Path familydir new Path(outputDir, Bytes.toString(family)); WriterLength wl new WriterLength(); // 获取HFile的压缩算法 Algorithm compression compressionMap.get(tableAndFamily); // 获取bloom过滤器信息 BloomType bloomType bloomTypeMap.get(tableAndFamily); // 获取HFile其他的配置 ..... // 生成HFile的配置信息 HFileContextBuilder contextBuilder new HFileContextBuilder() .withCompression(compression) .withChecksumType(HStore.getChecksumType(conf)) .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) .withBlockSize(blockSize); HFileContext hFileContext contextBuilder.build(); // 实例化 StoreFileWriter f (null favoredNodes) { wl.writer new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs) .withOutputDir(familydir).withBloomType(bloomType) .withComparator(CellComparator.getInstance()).withFileContext(hFileContext).build(); } else { wl.writer new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) .withOutputDir(familydir).withBloomType(bloomType) .withComparator(CellComparator.getInstance()).withFileContext(hFileContext) .withFavoredNodes(favoredNodes).build(); } // 添加到 writers集合中 this.writers.put(tableAndFamily, wl); return wl; }}继续看看 RecordWriter 的写操作1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768class RecordWriterImmutableBytesWritable, V() { Override public void write(ImmutableBytesWritable row, V cell) Cell kv cell; // 收到空数据表示需要立即刷新到磁盘并且创建新的HFile if (row null kv null) { // 刷新到磁盘 rollWriters(null); return; } // 根据table和column family生成唯一值 byte[] tableAndFamily getTableNameSuffixedWithFamily(tableNameBytes, family); // 获取对应的writer WriterLength wl this.writers.get(tableAndFamily); if (wl null) { // 如果为空那么先创建对应的文件目录 Path writerPath null; writerPath new Path(outputDir, Bytes.toString(family)); fs.mkdirs(writerPath); } // 检测当前HFile的大小是否超过了最大值默认为10GB if (wl ! null wl.written length maxsize) { this.rollRequested true; } // 如果当前HFile过大那么需要将它刷新到磁盘 if (rollRequested Bytes.compareTo(this.previousRow, rowKey) ! 0) { rollWriters(wl); } // 创建writer if (wl null || wl.writer null) { if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { // 如果开启了位置感知那么就会去获取row所在的region的地址 HRegionLocation loc null; loc locator.getRegionLocation(rowKey); InetSocketAddress initialIsa new InetSocketAddress(loc.getHostname(), loc.getPort()); // 创建writer指定了偏向节点 wl getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa}) } else { // 创建writer wl getNewWriter(tableNameBytes, family, conf, null); } } wl.writer.append(kv); wl.written length; this.previousRow rowKey; } private void rollWriters(WriterLength writerLength) throws IOException { if (writerLength ! null) { // 关闭当前writer closeWriter(writerLength); } else { // 关闭所有family对应的writer for (WriterLength wl : this.writers.values()) { closeWriter(wl); } } this.rollRequested false; } private void closeWriter(WriterLength wl) throws IOException { if (wl.writer ! null) { close(wl.writer); } wl.writer null; wl.written 0; }}RecordWriter在写入数据时如果遇到一条 row key 和 value 都为 null 的数据时这条数据有着特殊的含义表示writer应该立即 flush。在每次创建RecordWriter时它会根据此时row key 的值找到所属 Region 的服务器地址然后尽量在这台服务器上创建新的HFile文件。加载 HFile上面生成完 HFile 之后我们还需要调用第二条命令完成加载 HFile 过程。这个过程分为两步切割数据量大的 HFile 文件和发送加载请求让服务器完成。切割 HFile首先它会遍历目录下的每个 HFile 首先检查 HFile 里面数据的 family 在 Hbase 表里是否存在。获取HFile 数据的起始 row key找到 Hbase 里对应的 Region然后比较两者之间的 row key 范围如果 HFile 的 row key 范围比 Region 大也就是 HFile 的结束 row key 比这个 Region 的 结束 row Key 大那么需要将这个 HFile 切割成两份切割值为 Region 的结束 row key。继续从上一部切割生成的两份HFile中选择第二份 HFile(它的row key 大于 Regioin 的结束 row key)将它继续按照第二步切割直到所有HFile的 row key范围都能在一个Region里。在割切HFile的过程中还会检查 column family 对应的 HFile数目。如果一个 column family 对应的 HFile 数目过多默认数目为32程序就会报错。但是这个值通过指定 hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily来设置更大的值。发送加载请求当完成了HFile的切割后最后的导入动作是发送 BulkLoadHFileRequest 请求给 Hbase 服务端。Hbase 服务端会处理该请求完成HFile加载。其他至于我研究 Hbase Bulkload 的原因是在使用过程中发生了 Out Of Memory 的错误。虽然经过排查发现和 Hbase Bulkload 的原理没什么关系不过在此也顺便提一下希望能帮到遇到类似情况的人。首先说下我使用的Hadoop 版本是 CDH 5.12.2。经过排查发现是因为 Hbase Bulkload 底层用的 MapReduce 模式为本地模式而不是集群 Yarn 的方式。我们知道 MapReduce 程序选择哪一种方式可以通过 mapreduce.framework.name 配置项指定。虽然在 CDH 的 Yarn 配置页面里设置了该配置为 yarn但是 Hbase Bulkload 仍然使用本地模式。后来发现 Yarn 组件下有个 Gateway 的角色实例这是个特殊的角色它负责 Yarn 客户端的配置部署。而恰好这台主机没有安装所以在使用 Hbase Bulkload 时没有读取到 Yarn 的配置。解决方法是在 CDH 界面添加 Gateway 实例就好了。