当前位置: 首页 > news >正文

免费app模板下载网站小米商城wordpress

免费app模板下载网站,小米商城wordpress,百度网页版链接地址,英山做网站多少钱一、相关API的handler二、RestBulkAction#xff0c;组装bulkRequest调用TransportBulkAction三、TransportBulkAction 会把数据分发到数据该到的数据节点1、把数据按分片分组#xff0c;按分片分组数据再发送到指定的数据节点(1) 计算此文档发往哪个分片1)根据索引是否是分区… 一、相关API的handler二、RestBulkAction组装bulkRequest调用TransportBulkAction三、TransportBulkAction 会把数据分发到数据该到的数据节点1、把数据按分片分组按分片分组数据再发送到指定的数据节点(1) 计算此文档发往哪个分片1)根据索引是否是分区索引返回不同的索引路由对象2) 文档没有id会自动给文档创建id3)根据不同的索引路由对象id和routing决定此文档发往哪个分片 (2)、通过taskManager注册Task执行action.execute发送到数据节点 四、数据节点(TransportShardBulkAction)处理处理来自主节点的数据1、针对此节点上索引分片进行操作(1) 组装Engine.Index(2)先添加到Lucene成功后再添加到translog 下面的图来自ElasticSearch——刷盘原理流程这篇文章主要讲的是客户端发送bulk命令到保存到Lucene和translog的过程源码不涉及到把数据刷到磁盘的逻辑也不讲解存储在Lucene的数据结构 一、相关API的handler 在ActionModule.java中 //主节点处理谁分发到不同数据节点node的逻辑actions.register(BulkAction.INSTANCE, TransportBulkAction.class);//node节点接收到主节点分发的数据后的处理actions.register(TransportShardBulkAction.TYPE, TransportShardBulkAction.class);//主节点接收客户端的请求的handerregisterHandler.accept(new RestBulkAction(settings));二、RestBulkAction组装bulkRequest调用TransportBulkAction public class RestBulkAction extends BaseRestHandler {Overridepublic ListRoute routes() {return List.of(new Route(POST, /_bulk),new Route(PUT, /_bulk),new Route(POST, /{index}/_bulk),new Route(PUT, /{index}/_bulk),Route.builder(POST, /{index}/{type}/_bulk).deprecated(TYPES_DEPRECATION_MESSAGE, RestApiVersion.V_7).build(),Route.builder(PUT, /{index}/{type}/_bulk).deprecated(TYPES_DEPRECATION_MESSAGE, RestApiVersion.V_7).build());}Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {if (request.getRestApiVersion() RestApiVersion.V_7 request.hasParam(type)) {request.param(type);}BulkRequest bulkRequest new BulkRequest();String defaultIndex request.param(index);String defaultRouting request.param(routing);FetchSourceContext defaultFetchSourceContext FetchSourceContext.parseFromRestRequest(request);String defaultPipeline request.param(pipeline);String waitForActiveShards request.param(wait_for_active_shards);if (waitForActiveShards ! null) {bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));}Boolean defaultRequireAlias request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, null);bulkRequest.timeout(request.paramAsTime(timeout, BulkShardRequest.DEFAULT_TIMEOUT));bulkRequest.setRefreshPolicy(request.param(refresh));bulkRequest.add(request.requiredContent(),defaultIndex,defaultRouting,defaultFetchSourceContext,defaultPipeline,defaultRequireAlias,allowExplicitIndex,request.getXContentType(),request.getRestApiVersion());return channel - client.bulk(bulkRequest, new RestStatusToXContentListener(channel));} }Overridepublic void bulk(final BulkRequest request, final ActionListenerBulkResponse listener) {execute(BulkAction.INSTANCE, request, listener);}其中BulkAction.INSTANCE会通过最上面的actions转到TransportBulkAction.class 三、TransportBulkAction 会把数据分发到数据该到的数据节点 public class TransportBulkAction extends HandledTransportActionBulkRequest, BulkResponse {Overrideprotected void doExecute(Task task, org.elasticsearch.action.bulk.BulkRequest bulkRequest, ActionListenerorg.elasticsearch.action.bulk.BulkResponse listener) {/** 这是在传输过程中调用的因此我们可以快速检查索引内存压力但我们不想让传输线程保持繁忙。然后一旦我们有了索引压力* 我们就会分叉到其中一个写入线程池。我们这样做是因为处理批量请求可能会变得昂贵原因如下* 在将子请求分派给分片时我们可能需要压缩它们。LZ4 速度超快但速度足够慢最好不要在传输线程上执行此操作尤其是对于大型子请求。* 我们可以检测到这些情况然后才分叉但这要正确处理起来很复杂而且分叉的开销相当低。*/final int indexingOps bulkRequest.numberOfActions();final long indexingBytes bulkRequest.ramBytesUsed();final boolean isOnlySystem isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);final Releasable releasable indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);final ActionListenerBulkResponse releasingListener ActionListener.runBefore(listener, releasable::close);final String executorName isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE;//通过线程池调用threadPool.executor(Names.WRITE).execute(new ActionRunnable(releasingListener) {Overrideprotected void doRun() {doInternalExecute(task, bulkRequest, executorName, releasingListener);}});}protected void doInternalExecute(Task task, org.elasticsearch.action.bulk.BulkRequest bulkRequest, String executorName, ActionListenerBulkResponse listener) {//省略代码//在开始之前尝试创建我们在批量处理期间需要的所有索引。// Step 1: 收集请求中的所有索引final MapString, Boolean indices bulkRequest.requests.stream() // 删除请求不应尝试创建索引如果索引不存在除非使用外部版本控制.filter(request - request.opType() ! DocWriteRequest.OpType.DELETE|| request.versionType() VersionType.EXTERNAL|| request.versionType() VersionType.EXTERNAL_GTE).collect(Collectors.toMap(DocWriteRequest::index, DocWriteRequest::isRequireAlias, (v1, v2) - v1 || v2));// Step 2: 筛选索引列表以查找当前不存在的索引。final MapString, IndexNotFoundException indicesThatCannotBeCreated new HashMap();SetString autoCreateIndices new HashSet();ClusterState state clusterService.state();for (Map.EntryString, Boolean indexAndFlag : indices.entrySet()) {final String index indexAndFlag.getKey();boolean shouldAutoCreate indexNameExpressionResolver.hasIndexAbstraction(index, state) false;//只有当我们不要求它是别名时我们才应该自动创建if (shouldAutoCreate (indexAndFlag.getValue() false)) {autoCreateIndices.add(index);}}// Step 3: 创建所有缺失的索引如果有任何缺失。在所有创建返回后启动批量if (autoCreateIndices.isEmpty()) {executeBulk(task, bulkRequest, startTime, listener, executorName, responses, indicesThatCannotBeCreated);} else { //省略代码for (String index : autoCreateIndices) {//省略代码遍历创建索引 }}}void executeBulk(Task task,BulkRequest bulkRequest,long startTimeNanos,ActionListenerBulkResponse listener,String executorName,AtomicArrayBulkItemResponse responses,MapString, IndexNotFoundException indicesThatCannotBeCreated) {//创建一个BulkOperation对象执行doRun方法new BulkOperation(task, bulkRequest, listener, executorName, responses, startTimeNanos, indicesThatCannotBeCreated).run();}} 1、把数据按分片分组按分片分组数据再发送到指定的数据节点 private final class BulkOperation extends ActionRunnableBulkResponse {Overrideprotected void doRun() {//省略代码Metadata metadata clusterState.metadata();//按 ShardId - Operations 映射对请求进行分组MapShardId, ListBulkItemRequest requestsByShard new HashMap();//遍历请求的每一条数据for (int i 0; i bulkRequest.requests.size(); i) {DocWriteRequest? docWriteRequest bulkRequest.requests.get(i);//省略代码IndexAbstraction ia null;//请求是要把文档加入到索引boolean includeDataStreams docWriteRequest.opType() DocWriteRequest.OpType.CREATE;try {//给定的请求解析索引ia concreteIndices.resolveIfAbsent(docWriteRequest);//获取具体的写入索引final Index concreteIndex docWriteRequest.getConcreteWriteIndex(ia, metadata);//判断索引是否关闭if (addFailureIfIndexIsClosed(docWriteRequest, concreteIndex, i, metadata)) {continue;}//获取索引的路由信息其中返回的indexRouting是new UnpartitionedIndexRouting indexRouting concreteIndices.routing(concreteIndex);//这里如果文档没有带id则会给文档生成一个iddocWriteRequest.process(indexRouting);//获取分片ID 里面IdAndRoutingOnly调用的是Unpartitioned获取分片idint shardId docWriteRequest.route(indexRouting);//请求和分片ID封装为BulkItemRequest对象computeIfAbsent是如果不存在则新建ListBulkItemRequest shardRequests requestsByShard.computeIfAbsent(new ShardId(concreteIndex, shardId),shard - new ArrayList());//并将其添加到requestsByShard中对应的分片请求列表中。shardRequests.add(new BulkItemRequest(i, docWriteRequest));} catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException | ResourceNotFoundException e) {String name ia ! null ? ia.getName() : docWriteRequest.index();BulkItemResponse.Failure failure new BulkItemResponse.Failure(name, docWriteRequest.id(), e);BulkItemResponse bulkItemResponse BulkItemResponse.failure(i, docWriteRequest.opType(), failure);responses.set(i, bulkItemResponse);// make sure the request gets never processed againbulkRequest.requests.set(i, null);}}//没有要添加的数据直接返回了if (requestsByShard.isEmpty()) {listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));return;}//下面就知道是按照分片ID分别分发请求final AtomicInteger counter new AtomicInteger(requestsByShard.size());String nodeId clusterService.localNode().getId();for (Map.EntryShardId, ListBulkItemRequest entry : requestsByShard.entrySet()) {final ShardId shardId entry.getKey();final ListBulkItemRequest requests entry.getValue();BulkShardRequest bulkShardRequest new BulkShardRequest(shardId,bulkRequest.getRefreshPolicy(),requests.toArray(new BulkItemRequest[requests.size()]));bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());bulkShardRequest.timeout(bulkRequest.timeout());bulkShardRequest.routedBasedOnClusterVersion(clusterState.version());if (task ! null) {bulkShardRequest.setParentTask(nodeId, task.getId());}client.executeLocally(TransportShardBulkAction.TYPE, bulkShardRequest, new ActionListener() {//成功后的响应处理Overridepublic void onResponse(BulkShardResponse bulkShardResponse) {for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) { if (bulkItemResponse.getResponse() ! null) {bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());}responses.set(bulkItemResponse.getItemId(), bulkItemResponse);}//所有的分片请求都完成后则完成组装响应报文if (counter.decrementAndGet() 0) {finishHim();}}//失败的处理逻辑Overridepublic void onFailure(Exception e) {// create failures for all relevant requestsfor (BulkItemRequest request : requests) {final String indexName request.index();DocWriteRequest? docWriteRequest request.request();BulkItemResponse.Failure failure new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e);responses.set(request.id(), BulkItemResponse.failure(request.id(), docWriteRequest.opType(), failure));}//所有的分片请求都完成后则完成组装响应报文if (counter.decrementAndGet() 0) {finishHim();}}private void finishHim() {listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));}});}bulkRequest null; // allow memory for bulk request items to be reclaimed before all items have been completed允许在所有项目完成之前回收批量请求项的内存} }(1) 计算此文档发往哪个分片 其中上面的关键代码块是下面 //获取索引的路由信息其中返回的indexRouting是new UnpartitionedIndexRouting indexRouting concreteIndices.routing(concreteIndex);//这里会针对文档的id做一些处理比如会判断是否存在不存在是抛异常还是创建一个新的docWriteRequest.process(indexRouting);//获取分片ID 里面IdAndRoutingOnly调用的是Unpartitioned获取分片idint shardId docWriteRequest.route(indexRouting);首先indexRouting的对象下面的Partitioned或者Unpartitioned 1)根据索引是否是分区索引返回不同的索引路由对象 public static IndexRouting fromIndexMetadata(IndexMetadata metadata) {if (false metadata.getRoutingPaths().isEmpty()) {return new ExtractFromSource(metadata);}//代码检查索引元数据是否是分区索引//如果是则创建一个分区索引路由对象Partitioned并返回if (metadata.isRoutingPartitionedIndex()) {return new Partitioned(metadata);}//以上条件都不满足则创建一个非分区索引路由对象Unpartitioned并返回return new Unpartitioned(metadata);}2) 文档没有id会自动给文档创建id Overridepublic void process(IndexRouting indexRouting) {indexRouting.process(this);}不管Partitioned还是Unpartitioned都继承自IdAndRoutingOnly private abstract static class IdAndRoutingOnly extends IndexRouting {Overridepublic void process(IndexRequest indexRequest) {//往索引添加文档的id不能为空字符串但是可以为null后续会自动创建idif (.equals(indexRequest.id())) {throw new IllegalArgumentException(if _id is specified it must not be empty);}// generate id if not already providedif (indexRequest.id() null) {indexRequest.autoGenerateId();}} }public void autoGenerateId() {assert id null;assert autoGeneratedTimestamp UNSET_AUTO_GENERATED_TIMESTAMP : timestamp has already been generated!;assert ifSeqNo UNASSIGNED_SEQ_NO;assert ifPrimaryTerm UNASSIGNED_PRIMARY_TERM;autoGeneratedTimestamp Math.max(0, System.currentTimeMillis());String uid UUIDs.base64UUID();id(uid);}3)根据不同的索引路由对象id和routing决定此文档发往哪个分片 其中route的接口如下 Overridepublic int route(IndexRouting indexRouting) {return indexRouting.indexShard(id, routing, contentType, source);}private abstract static class IdAndRoutingOnly extends IndexRouting {protected abstract int shardId(String id, Nullable String routing);Overridepublic int indexShard(String id, Nullable String routing, XContentType sourceType, BytesReference source) {if (id null) {throw new IllegalStateException(id is required and should have been set by process);}checkRoutingRequired(id, routing);return shardId(id, routing);} } 其中shardId有两种实现分别是Partitioned还是Unpartitioned private static class Unpartitioned extends IdAndRoutingOnly {Unpartitioned(IndexMetadata metadata) {super(metadata);}//优先routing如果没有则用idOverrideprotected int shardId(String id, Nullable String routing) {return hashToShardId(effectiveRoutingToHash(routing null ? id : routing));}Overridepublic void collectSearchShards(String routing, IntConsumer consumer) {consumer.accept(hashToShardId(effectiveRoutingToHash(routing)));}}private static class Partitioned extends IdAndRoutingOnly {private final int routingPartitionSize;Partitioned(IndexMetadata metadata) {super(metadata);this.routingPartitionSize metadata.getRoutingPartitionSize();}//其中routing不能为nullOverrideprotected int shardId(String id, Nullable String routing) {if (routing null) {throw new IllegalArgumentException(A routing value is required for gets from a partitioned index);}int offset Math.floorMod(effectiveRoutingToHash(id), routingPartitionSize);return hashToShardId(effectiveRoutingToHash(routing) offset);}Overridepublic void collectSearchShards(String routing, IntConsumer consumer) {int hash effectiveRoutingToHash(routing);for (int i 0; i routingPartitionSize; i) {consumer.accept(hashToShardId(hash i));}}}下面只看Unpartitioned /*** Convert a routing value into a hash.* 将路由值转换为哈希值。*/private static int effectiveRoutingToHash(String effectiveRouting) {return Murmur3HashFunction.hash(effectiveRouting);}/*** Hash function based on the Murmur3 algorithm, which is the default as of Elasticsearch 2.0.* 基于 Murmur3 算法的哈希函数这是 Elasticsearch 2.0 的默认算法。*/ public final class Murmur3HashFunction {private Murmur3HashFunction() {// no instance}public static int hash(String routing) {final byte[] bytesToHash new byte[routing.length() * 2];for (int i 0; i routing.length(); i) {final char c routing.charAt(i);final byte b1 (byte) c, b2 (byte) (c 8);assert ((b1 0xFF) | ((b2 0xFF) 8)) c; // no information lossbytesToHash[i * 2] b1;bytesToHash[i * 2 1] b2;}return hash(bytesToHash, 0, bytesToHash.length);}public static int hash(byte[] bytes, int offset, int length) {return StringHelper.murmurhash3_x86_32(bytes, offset, length, 0);} }/*** Convert a hash generated from an {code (id, routing}) pair into a* shard id. 将从 {code id routing} 对生成的哈希转换为分片 ID。*/protected final int hashToShardId(int hash) {return Math.floorMod(hash, routingNumShards) / routingFactor;}这样就指定了文档的分片id (2)、通过taskManager注册Task执行action.execute发送到数据节点 client.executeLocally(TransportShardBulkAction.TYPE, bulkShardRequest, new ActionListener()public Request extends ActionRequest, Response extends ActionResponse Task executeLocally(ActionTypeResponse action,Request request,ActionListenerResponse listener) {return taskManager.registerAndExecute(transport,transportAction(action),request,localConnection,new SafelyWrappedActionListener(listener));}后面的逻辑就不梳理了直接看TransportShardBulkAction.TYPE 四、数据节点(TransportShardBulkAction)处理处理来自主节点的数据 public class TransportShardBulkAction extends TransportWriteActionBulkShardRequest, BulkShardRequest, BulkShardResponse {//这里处理属于属于当前节点分片的数据请求是从主节点上过来的Overrideprotected void dispatchedShardOperationOnPrimary(BulkShardRequest request,IndexShard primary,ActionListenerPrimaryResultBulkShardRequest, BulkShardResponse listener) {ClusterStateObserver observer new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext());performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, (update, shardId, mappingListener) - {assert update ! null;assert shardId ! null;mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), update, mappingListener);}, mappingUpdateListener - observer.waitForNextChange(new ClusterStateObserver.Listener() {//省略代码}}), listener, threadPool, executor(primary), postWriteRefresh, postWriteAction);}}performOnPrimary 直接看这个 public static void performOnPrimary(org.elasticsearch.action.bulk.BulkShardRequest request,IndexShard primary,UpdateHelper updateHelper,LongSupplier nowInMillisSupplier,MappingUpdatePerformer mappingUpdater,ConsumerActionListenerVoid waitForMappingUpdate,ActionListenerPrimaryResultBulkShardRequest, BulkShardResponse listener,ThreadPool threadPool,String executorName,Nullable PostWriteRefresh postWriteRefresh,Nullable ConsumerRunnable postWriteAction) {new ActionRunnable(listener) {private final Executor executor threadPool.executor(executorName);private final BulkPrimaryExecutionContext context new BulkPrimaryExecutionContext(request, primary);final long startBulkTime System.nanoTime();Overrideprotected void doRun() throws Exception {//只要所有的请求没有执行完while (context.hasMoreOperationsToExecute()) {if (executeBulkItemRequest(context,updateHelper,nowInMillisSupplier,mappingUpdater,waitForMappingUpdate,ActionListener.wrap(v - executor.execute(this), this::onRejection)) false) {//我们正在等待另一个线程上的映射更新一旦完成它将再次调用此操作因此我们在这里爆发。return;}assert context.isInitial(); // either completed and moved to next or reset 要么完成并移至下一个要么重置}primary.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime); finishRequest();}Overridepublic void onRejection(Exception e) {//省略代码}private void finishRequest() {//省略代码}}.run();}1、针对此节点上索引分片进行操作 static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context,UpdateHelper updateHelper,LongSupplier nowInMillisSupplier,MappingUpdatePerformer mappingUpdater,ConsumerActionListenerVoid waitForMappingUpdate,ActionListenerVoid itemDoneListener) throws Exception {//则获取IndexRequest对象并创建SourceToParse对象将相应参数传递给primary的applyIndexOperationOnPrimary方法进行索引操作final IndexRequest request context.getRequestToExecute();final SourceToParse sourceToParse new SourceToParse(request.id(),request.source(),request.getContentType(),request.routing(),request.getDynamicTemplates());//把文档数据保存到分片返回结果保存结果result primary.applyIndexOperationOnPrimary(version,request.versionType(),sourceToParse,request.ifSeqNo(),request.ifPrimaryTerm(),request.getAutoGeneratedTimestamp(),request.isRetry());//从结果中得到需要更新索引Mapper的映射则更新索引的mapperif (result.getResultType() Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {//省略代码} return true;}(1) 组装Engine.Index public Engine.IndexResult applyIndexOperationOnPrimary(long version,VersionType versionType,SourceToParse sourceToParse,long ifSeqNo,long ifPrimaryTerm,long autoGeneratedTimestamp,boolean isRetry) throws IOException {assert versionType.validateVersionForWrites(version);//针对索引的操作包括更新TransLogreturn applyIndexOperation(getEngine(),UNASSIGNED_SEQ_NO,getOperationPrimaryTerm(),version,versionType,ifSeqNo,ifPrimaryTerm,autoGeneratedTimestamp,isRetry,Engine.Operation.Origin.PRIMARY,sourceToParse);}private Engine.IndexResult applyIndexOperation(Engine engine,long seqNo,long opPrimaryTerm,long version,Nullable VersionType versionType,long ifSeqNo,long ifPrimaryTerm,long autoGeneratedTimeStamp,boolean isRetry,Engine.Operation.Origin origin,SourceToParse sourceToParse) throws IOException {assert opPrimaryTerm getOperationPrimaryTerm(): op term [ opPrimaryTerm ] shard term [ getOperationPrimaryTerm() ];ensureWriteAllowed(origin);Engine.Index operation;try {//组装indexoperation prepareIndex(mapperService,sourceToParse,seqNo,opPrimaryTerm,version,versionType,origin,autoGeneratedTimeStamp,isRetry,ifSeqNo,ifPrimaryTerm,getRelativeTimeInNanos());Mapping update operation.parsedDoc().dynamicMappingsUpdate();if (update ! null) {return new Engine.IndexResult(update, operation.parsedDoc().id());}} catch (Exception e) {//省略代码}return index(engine, operation);}private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOException {active.set(true);final Engine.IndexResult result;final Engine.Index preIndex indexingOperationListeners.preIndex(shardId, index);try {//省略代码//InternalEngine.index 逐条写入doc// Engine 封装了Lucene和translog的调用,对外提供读写接口.result engine.index(preIndex);//省略代码} catch (Exception e) {//省略代码indexingOperationListeners.postIndex(shardId, preIndex, e);throw e;}indexingOperationListeners.postIndex(shardId, preIndex, result);return result;}其中engine.index的子类是InternalEngine.index方法 (2)先添加到Lucene成功后再添加到translog Overridepublic IndexResult index(Index index) throws IOException {assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field();final boolean doThrottle index.origin().isRecovery() false;try (ReleasableLock releasableLock readLock.acquire()) {//如果是主分片上的操作则生成新的Index对象if (index.origin() Operation.Origin.PRIMARY) {index new Index(index.uid(),index.parsedDoc(),generateSeqNoForOperationOnPrimary(index),index.primaryTerm(),index.version(),index.versionType(),index.origin(),index.startTime(),index.getAutoGeneratedIdTimestamp(),index.isRetry(),index.getIfSeqNo(),index.getIfPrimaryTerm());final boolean toAppend plan.indexIntoLucene plan.useLuceneUpdateDocument false;if (toAppend false) {advanceMaxSeqNoOfUpdatesOnPrimary(index.seqNo());}} else {//其他分片就标记为已见markSeqNoAsSeen(index.seqNo());}if (plan.indexIntoLucene || plan.addStaleOpToLucene) {//把数据更新到Lucene中indexResult indexIntoLucene(index, plan);} else {indexResult new IndexResult(plan.versionForIndexing,index.primaryTerm(),index.seqNo(),plan.currentNotFoundOrDeleted,index.id());}}if (index.origin().isFromTranslog() false) {final Translog.Location location;//如果更新Lucene成功则把索引数据放入到translog中if (indexResult.getResultType() Result.Type.SUCCESS) {location translog.add(new Translog.Index(index, indexResult));} //省略代码indexResult.setTranslogLocation(location);}//省略代码indexResult.setTook(relativeTimeInNanosSupplier.getAsLong() - index.startTime());indexResult.freeze();return indexResult;}文档添加到Lucene import org.apache.lucene.index.IndexWriter;private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) throws IOException {try {if (plan.addStaleOpToLucene) { //添加addStaleDocs(index.docs(), indexWriter);} else if (plan.useLuceneUpdateDocument) { //更新assert assertMaxSeqNoOfUpdatesIsAdvanced(index.uid(), index.seqNo(), true, true);updateDocs(index.uid(), index.docs(), indexWriter);} else {// document does not exists, we can optimize for create, but double check if assertions are runningassert assertDocDoesNotExist(index, canOptimizeAddDocument(index) false);addDocs(index.docs(), indexWriter);}return new IndexResult(plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted, index.id());} catch (Exception ex) {//省略代码}}private void addStaleDocs(final ListLuceneDocument docs, final IndexWriter indexWriter) throws IOException {for (LuceneDocument doc : docs) {doc.add(softDeletesField); // soft-deleted every document before adding to Lucene}if (docs.size() 1) {indexWriter.addDocuments(docs);} else {indexWriter.addDocument(docs.get(0));}}在写入到transLog日志中会先转成new Translog.Index 再添加到translog public Location add(final Operation operation) throws IOException {final ReleasableBytesStreamOutput out new ReleasableBytesStreamOutput(bigArrays);try {writeOperationWithSize(out, operation);final BytesReference bytes out.bytes();try (ReleasableLock ignored readLock.acquire()) {ensureOpen();//省略代码return current.add(bytes, operation.seqNo());}} } private ReleasableBytesStreamOutput buffer;/***将给定的字节添加到具有指定序列号的转录日志中;返回字节写入到的位置。* param data the bytes to write 要写入的字节数* param seqNo the sequence number associated with the operation 与操作关联的序列号* return the location the bytes were written to 字节写入到的位置* throws IOException if writing to the translog resulted in an I/O exception*/public Translog.Location add(final BytesReference data, final long seqNo) throws IOException {//首先检查缓冲的字节数是否超过了forceWriteThreshold阈值如果超过了则调用writeBufferedOps方法将缓冲的操作写入。long bufferedBytesBeforeAdd this.bufferedBytes;if (bufferedBytesBeforeAdd forceWriteThreshold) {writeBufferedOps(Long.MAX_VALUE, bufferedBytesBeforeAdd forceWriteThreshold * 4);}final Translog.Location location;synchronized (this) {ensureOpen();//代码确保buffer不为nullif (buffer null) {buffer new ReleasableBytesStreamOutput(bigArrays);}//数据写入缓冲区。然后更新minSeqNo和maxSeqNo的值assert bufferedBytes buffer.size();final long offset totalOffset;totalOffset data.length();data.writeTo(buffer);assert minSeqNo ! SequenceNumbers.NO_OPS_PERFORMED || operationCounter 0;assert maxSeqNo ! SequenceNumbers.NO_OPS_PERFORMED || operationCounter 0;minSeqNo SequenceNumbers.min(minSeqNo, seqNo);maxSeqNo SequenceNumbers.max(maxSeqNo, seqNo);//并将seqNo添加到nonFsyncedSequenceNumbers中。操作计数器递增nonFsyncedSequenceNumbers.add(seqNo);operationCounter;assert assertNoSeqNumberConflict(seqNo, data);//然后使用generation、offset和数据长度创建一个Translog.Location对象。location new Translog.Location(generation, offset, data.length());//调用operationListener.operationAdded方法通知操作监听器有新的操作添加并更新bufferedBytes的值。operationListener.operationAdded(data, seqNo, location);bufferedBytes buffer.size();}return location;}介绍一下Translog类 /**Translog 是每个索引的分片组件它以持久的方式记录所有未提交的索引操作。 在 Elasticsearch 中每个 {link org.elasticsearch.index.engine.InternalEngine} 都有一个 Translog 实例。 此外从 Elasticsearch 2.0 开始引擎还会在每次提交时记录一个 {link *TRANSLOG_UUID_KEY}以确保 lucene 索引与事务日志文件之间的强关联。 此 UUID 用于防止从属于其他引擎的事务日志中意外恢复。每个 Translog 只有一个 translog 文件打开供 translog 生成 ID 随时引用的写入。 此 ID 将写入 {code translog.ckp} 文件该文件旨在适合单个磁盘块因此文件的写入是原子的。 检查点文件在 translog 的每个 fsync 操作上写入并记录写入的操作数、当前 translog 的文件生成、其 fsync 偏移量以字节为单位以及其他重要统计信息。当当前转录文件达到特定大小 {link IndexSettingsINDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING} 时或者当新旧操作之间明确分离时在主要术语更改时 将重新打开当前文件进行只读并创建一个新的只写文件。 任何非最新的、只读的 translog 文件总是有一个与之关联的 {code translog-{gen}.ckp}它是其上一个 {code translog.ckp} 的同步副本因此在灾难恢复中最后一个 fsync 偏移量、操作数等仍会保留。 **/ public class Translog extends AbstractIndexShardComponent implements IndexShardComponent, Closeable { }
http://www.pierceye.com/news/880179/

相关文章:

  • 微信扫码抢红包网站做渝网互联重庆网站制作
  • 用wordpress开发网站缪斯设计官网
  • 黄南州wap网站建设公司旅游类网站做百度竞价
  • 中国电力建设集团有限公司网站wordpress购买
  • 深圳工装公司网站优化顺义案例
  • 四川省工程建设信息官方网站个人域名注册免费
  • 网站建设用源码徐州金网网站建设
  • 老哥们给个关键词威海网站seo
  • 贵州网站备案延庆网站建设师
  • 做网站怎么上词网站建设战略伙伴
  • 绵阳网站推广排名给网站网站做代理
  • 网站轮播代码北京的公司有哪些
  • 网上书城网站开发外文参考文献wordpress禁用谷歌字体插件
  • 团购模板网站全网营销型网站建设模板
  • ac域名网站邯郸中国建设银行网站
  • 广州seo网站开发微信营销的优势
  • 宝塔可以做二级域名网站么有哪些好的做兼职网站
  • 网站建设公司落寞特殊符号网名生成器
  • 设计分享网站在闲鱼可以做网站吗
  • 安全等级保护每个网站必须做吗实惠的网站建设公司
  • vue可以做pc端网站山东专业企业网站建设
  • 推广网站软文高中网站制作
  • 做h的动漫在线观看网站安阳市有几个区几个县
  • 帝国cms地方门户网站模板怀集县住房和城乡规划建设网站
  • 河北住房建设厅网站首页怎么自己建设网站
  • 政务网站建设及安全下载app赚钱的平台
  • 如何制作一个论坛网站都江堰建设局网站
  • 网站建设seo优化浙江广17网站一起做网店
  • WordPress子站站群优化教程网站推广排名
  • 毕设做购物网站容易吗网上商城是什么意思