网站维护等,wordpress地图生成,单页网站的制作,seo诊断分析工具1 总体流程上图说明了一条insert语句最后如何被副本同步到的流程#xff08;图中ck集群为单shard#xff0c;双副本#xff09;。#xff08;1#xff09;从客户端发出#xff0c;写入ck#xff08;2#xff09;ck提交LogEntry到Keeper#xff08;3#xff09;另外一…1 总体流程上图说明了一条insert语句最后如何被副本同步到的流程图中ck集群为单shard双副本。1从客户端发出写入ck2ck提交LogEntry到Keeper3另外一个副本从Keeper拉取LogEntry到本地执行2 参数说明此部分介绍以下整个链路涉及的一些参数。mergetree settings1.zookeeper_session_expiration_check_period检查keeper session是否到期每个以上参数的时间检查一次默认为60S每个引擎为Replicated的MergeTree表在启动的时候会运行以下任务来检查与keeper 之间的session是否过期。创建复制表时内核会启动这个复制表的引擎之后在ReplicatedMergeTreeRestartingThread::runImpl()中启动各种后台调度任务1background_operations_assignee执行mergefetch操作2merge_selecting_task主要功能为选择合并的part3cleanup_thread线程清理过期part等这些任务的调度有点任务内递归的感觉都是任务执行的最后在继续重复上个任务只是任务的内容不一样。2.max_insert_delayed_streams_for_parallel_write当part所在的存储系统支持并行写入时这个参数默认为100否则为0。3.distributed_ddl_task_timeout设置来自集群中所有主机的 DDL 查询响应的超时时间。如果某个 DDL 请求未能在所有主机上执行完成响应中将包含一个 timeout 错误并且该请求将以异步模式执行。负值表示无限超时时间。3 示例表结构db
CREATE DATABASE replicated_db
ENGINE Replicated(/clickhouse/databases/replicated_db, {shard}, {replica})
table
CREATE TABLE replicated_db.replicated_table
(id UInt64,event_time DateTime,event_type String
)
ENGINE ReplicatedMergeTree(/clickhouse/tables/{shard}/replicated_table, {replica})
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_time, id)
SETTINGS index_granularity 8192
1 单节点生成LogEntry这里我们忽略掉词语法解析优化器计划生成层以及执行层的部分算子直接来到写数据到磁盘以及提交LogEntry的算子 - ReplicatedMergeTreeSinkImpl。这里的输入参数chunk就是插入的数据在内存中的组织结构。在ReplicatedMergeTreeSinkImplasync_insert::consume(Chunk chunk)中主要有以下步骤1.将插入的数据通过分区键拆成part此过程通过MergeTreeDataWriter::splitBlockIntoParts完成2.遍历每一个拆分出来的part1通过writeNewTempPart将这个拆分出来的part写到临时目录中。2在这个分支提交写入的part到keeper中如果开启了并行写入part会攒够一定的数量后整体提交到Keeper上这个默认数量为100。2 提交LogEntry到Keeper2.1 提交重试的参数控制1.insert_keeper_max_retries insert_keeper_max_retries 参数控制向复制表(Replicated MergeTree)插入数据时对 ClickHouse Keeper(或 ZooKeeper)请求的最大重试次数。默认值为20。只有以下三种错误会触发重试1network error2Keeper session timeout3request timeout2.insert_keeper_retry_initial_backoff_ms insert_keeper_retry_initial_backoff_ms 参数定义了在INSERT执行期间对失败的Keeper(ZooKeeper)请求进行重试时的初始退避等待时间(毫秒)。默认值为100ms。3.insert_keeper_retry_max_backoff_ms insert_keeper_retry_max_backoff_ms 参数设定了在 INSERT 查询执行期间对失败的 Keeper/ZooKeeper 请求进行重试时的最大退避等待时间上限(毫秒)。默认值为10s。2.2 提交流程注意这里提交的并不是写入的数据而是写入part的元信息。提交主要通过ReplicatedMergeTreeSinkImplasync_insert::commitPart完成。
block_id_path/clickhouse/tables/s1/replicated_table/blocks/202507_12141418273484448060_16681511374997932159
retries_ctl.retryLoop为提交的驱动提交的状态转化通过stage_switcher这个匿名函数完成初始时retry_context.stage为LOCK_AND_COMMIT所以进入commit_new_part_stagecommit_new_part_stage主要做了以下几件事1设置要提交part的基本信息例如block_numberpart 名。对于New Part来说block_number在一个复制表引擎中是全局递增的。2构造要在Keeper上执行的请求例如构造在Keeper上创建的LogEntry的请求通过get_logs_ops完成。对于一个New Part来说这个LogEntry的type为GET_PART还包括其他的一些信息例如create_time创建时间source_replica哪个副本创建的这个partnew_part_namepart名等等。最后将这个LogEntry封装为一个CreateRequest。一次Part的提交会带着很多其他的请求RemoveRequest有其他的CreateRequest有get_quorum_ops只有在副本大于2时会有携带请求。getCommitPartOps中的CreateRequest3开启事务在提交LogEntry到Keeper失败时回滚进行状态的恢复4将LogEntry发送到Keeper上由于是多个请求所以会调用ZooKeeper::multiImpl此处流程可用下图表示如果是写请求达到了followerfollower会转发给leader 非阻塞等待异步操作结果最大等待时间为args.operation_timeout_ms毫秒操作超时时间的参数Coordination/CoordinationSettings.cpp默认值为10SCommon/ZooKeeper/ZooKeeperConstants.h3 副本拉取LogEntry3.1 问题记录问题1创建表报Session was killed这个问题可以跳过暂时采用另一种方法解决在此保留以后有时间了继续追。创建表时报错Coordination::Exception: Session was killed原因时长时间未操作ch-client与Keeper之间的session断开。但是这有一个问题是虽然创建表失败但是建表的元信息可能会提交到Keeper上。此时你会发现虽然这个库并没有这个表但是无法创建再次创建表报错如下此时可以使用以下语句剔除在keeper上的元信息
SYSTEM DROP REPLICA r1 FROM ZKPATH /clickhouse/tables/s1/replicated_table;
剔除在keeper上的元信息后再次创建表会发现此时会卡在创建这里之后翻看副本2的日志发现副本2之前已经拉到了replicated_table这张表并为它创建了数据目录。解决去副本2上删除对应得表目录此时会发现replicated_table表已经成功创建。删除表同样有这个问题
最终解决需要调整session超时时间。根因不是这个参数。以下继续分析其中code为下一步调试Keeper看为什么会有这个错误码。这个错误码的设置位置1KeeperStateMachineStorage::processReconfiguration2各个预处理不同请求的地方preprocessTODO比较怀疑是不是我的两个ck使用的是不同版本的问题这个问题最后没追下去暂时只知道报错大概位置。问题2关于副本同步part失败的问题记录此时在副本r1上的replicated_table有一个part为202507_0_9_3。在副本2在同步这个part的过程中虽然它从keeper上取到了这个LogEnetry但是一直报错并且从num_tries可以得知这个任务已经重试了22次了。日志中的报错提示没有配置这个参数interserver_http_hostkeeper上存副本1的replicated_table这个表的part的地方/clickhouse/tables/s1/replicated_table/replicas/r1/parts调整完之后两个副本的parts目录下内容一致3.2 拉取LogEntry任务启动一段核心注释Storages\StorageReplicatedMergeTree.h
/** The replicated tables have a common log (/log/log-...). * Log - a sequence of entries (LogEntry) about what to do. * Each entry is one of: * - normal data insertion (GET), * - data insertion with a possible attach from local data (ATTACH), * - merge (MERGE), * - delete the partition (DROP). * * Each replica copies (queueUpdatingTask, pullLogsToQueue) entries from the log to its queue (/replicas/replica_name/queue/queue-...) * and then executes them (queueTask). * Despite the name of the queue, execution can be reordered, if necessary (shouldExecuteLogEntry, executeLogEntry). * In addition, the records in the queue can be generated independently (not from the log), in the following cases: * - when creating a new replica, actions are put on GET from other replicas (createReplica); * - if the part is corrupt (removePartAndEnqueueFetch) or absent during the check * (at start - checkParts, while running - searchForMissingPart), actions are put on GET from other replicas; * * The replica to which INSERT was made in the queue will also have an entry of the GET of this data. * Such an entry is considered to be executed as soon as the queue handler sees it. * * The log entry has a creation time. This time is generated by the clock of server that created entry * - the one on which the corresponding INSERT or ALTER query came. * * For the entries in the queue that the replica made for itself, * as the time will take the time of creation the appropriate part on any of the replicas. */
解释如下所有副本共享一个日志目录 /log/log-...每个日志项LogEntry描述一项操作。这个“日志”是指 ZooKeeper 中的节点 /log/log-0000001, /log/log-0000002 等。所有的副本会从这个共享日志中拉取操作如插入、合并、删除等。日志项类型包括定义在Storages\MergeTree\ReplicatedMergeTreeLogEntry.hGET常规插入数据ATTACH插入数据时可能会使用本地已有的数据MERGE后台合并多个 partDROP删除某个分区的数据。
每个副本会把这些日志项复制到自己的执行队列中/replicas/replica_name/queue/queue-00000...通过queueUpdatingTask周期性任务pullLogsToQueue()从 /log/ 拉取 log 到 /queue/
副本随后会启动后台线程执行队列里的任务queueTask()。
虽然叫“队列”但实际上执行顺序可以根据依赖进行重排由 shouldExecuteLogEntry() 控制依赖决定某 entry 是否可执行。
举个例子如果 MERGE 依赖的 part 还没拉取完成就会延后执行。某些队列任务并非从日志而来而是副本本地生成的比如创建新副本时会向队列中加入从其他副本 GET 所有已有 part 的任务如果发现某个 part 损坏removePartAndEnqueueFetch或缺失启动时用 checkParts()运行时用 searchForMissingPart()
也会生成 GET 请求从其他副本拉取缺失的 part。即使某个副本自己是写入的目标它也会有一个 GET 类型的 entry 表示这次插入。
这类 entry 在队列中会立即视为“已完成”因为本地已经有数据不需要再拉取。日志项有创建时间戳这个时间由“发起该写入的server”产生即 INSERT / ALTER 语句在哪个副本执行。对于某个副本自己给自己生成的队列项比如 GET 缺失 part会使用已有副本上该 part 的创建时间作为时间戳。
正如前文提到的当创建一个引擎为Replicated族的表时内核会启动这个复制表引擎之后在ReplicatedMergeTreeRestartingThread::runImpl()中启动各种后台任务拉取LogEntry的任务也在这个地方调度这个任务的主要内容如下所示(核心为queue.pullLogsToQueue)
void StorageReplicatedMergeTree::queueUpdatingTask(){ if (!queue_update_in_progress) { last_queue_update_start_time.store(time(nullptr)); queue_update_in_progress true; } try { auto zookeeper getZooKeeperAndAssertNotStaticStorage(); if (is_readonly) { /// Note that we need to check shutdown_prepared_called, not shutdown_called, since the table will be marked as readonly /// after calling StorageReplicatedMergeTree::flushAndPrepareForShutdown(). if (shutdown_prepared_called) return; throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, Table is in readonly mode (replica path: {}), cannot update queue, replica_path); } queue.pullLogsToQueue(zookeeper, queue_updating_task-getWatchCallback(), ReplicatedMergeTreeQueue::UPDATE); last_queue_update_finish_time.store(time(nullptr)); queue_update_in_progress false; } ......}
3.3 日志同步位点log-pointer首先创建一个复制表之后它在Keeper上的元数据都有哪些呢例如CREATE TABLE my_db.my_table ( ... ) ENGINE ReplicatedMergeTree(/clickhouse/tables/{shard}/my_table, {replica}) ORDER BY ...其中
{shard} s1
{replica} r1
所以表的 ZooKeeper 路径会解析为/clickhouse/tables/s1/my_table副本路径为/clickhouse/tables/s1/my_table/replicas/r1ZooKeeper 路径结构图
/clickhouse/
└── tables/└── s1/ ← shard s1└── my_table/ ← 表名├── log/ ← 主日志目录所有副本共享│ ├── log-0000000000│ ├── log-0000000001│ └── ...├── replicas/│ ├── r1/ ← 当前副本replica r1│ │ ├── queue/ ← 待处理的日志操作队列│ │ │ ├── queue-0000000000│ │ │ └── ...│ │ ├── log_pointer ← 当前副本已同步日志的游标│ │ ├── host ← 当前副本的主机地址信息│ │ ├── is_active ← 当前副本是否存活│ │ └── ...│ ├── r2/ ← 其他副本如果有│ └── ...├── mutations/ ← 所有的 mutation 操作├── block_numbers/ ← 每个分区的最大 block number├── temp/ ← 临时节点└── ...
在 ClickHouse Keeper中log_pointer 是 每个副本replica维护的一个游标cursor它的作用是在分布式表如 ReplicatedMergeTree中 记录该副本已经处理到哪个日志 entry。3.4 拉取LogEntry流程明白了log-pointer日志同步位点之后再看看Keeper是如何具体拉取LogEntry的。流程如下1.主表路径 /clickhouse/tables/{shard}/{table}/log/ 存放主日志所有副本共享。2.每个副本路径 /clickhouse/tables/{shard}/{table}/replicas/{replica}/log_pointer 存储该副本处理进度。3.副本执行拉取任务获取当前副本的 log_pointer读取 /log 目录下的所有日志节点过滤日志列表删除所有索引小于当前 log_pointer 指向的日志条目。如果过滤后log_entries不为空先sortfor循环逻辑批次划分以 current_multi_batch_size初始较小为批大小从 log_entries 中取一段连续日志作为本批处理目标。last 指向本批最后一个日志条目。更新循环索引和批大小。entry_idx 指向下批次起点批大小指数级增长最多增长到 MAX_MULTI_OPS加速同步过程。生成 ZooKeeper 路径列表批量读取日志数据
for (auto it begin; it ! end; it)get_paths.emplace_back(fs::path(zookeeper_path) / log / *it);
auto get_results zookeeper-get(get_paths);
构造 ZooKeeper 操作列表准备批量写入 queue 和更新指针
for (size_t i 0; i get_num; i)
{// 解析日志数据构造 LogEntry 对象copied_entries.emplace_back(LogEntry::parse(res.data, res.stat, format_version));// 创建 queue 节点的请求持久顺序节点ops.emplace_back(zkutil::makeCreateRequest(fs::path(replica_path) / queue/queue-, res.data, zkutil::CreateMode::PersistentSequential));// 处理 create_time更新 min_unprocessed_insert_time用于后续处理优先级等
}
更新 log_pointer 和 min_unprocessed_insert_time 的请求。更新本副本的日志处理进度指针指向最后处理的日志后一个索引。如果有最早插入时间更新同步写入。使用 ZooKeeper multi() 提交以上所有操作
auto responses zookeeper-multi(ops, /* check_session_valid */ true);将LogEntry加到复制表queue中
insertUnlocked(copied_entries[copied_entry_idx], unused, state_lock);唤醒表的后台任务执行线程去执行LogEntry任务
storage.background_operations_assignee.trigger();注意点
//这只是读到所有的任务的名字不读具体的内容Strings log_entries zookeeper-getChildrenWatch(fs::path(zookeeper_path) / log, nullptr, watch_callback);//读到log的具体内容auto get_results zookeeper-get(get_paths);
4 副本执行LogEntry拉取到LogEntry到queue中后最后会通过storage.background_operations_assignee.trigger()调度执行LogEntry的线程。调度任务的内容为
bool StorageReplicatedMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee assignee)
{cleanup_thread.wakeupEarlierIfNeeded();/// If replication queue is stopped exit immediately as we successfully executed the taskif (queue.actions_blocker.isCancelled())return false;/// This object will mark the element of the queue as running.ReplicatedMergeTreeQueue::SelectedEntryPtr selected_entry selectQueueEntry();if (!selected_entry)return false;auto job_type selected_entry-log_entry-type;/// Depending on entry type execute in fetches (small) pool or big merge_mutate poolif (job_type LogEntry::GET_PART || job_type LogEntry::ATTACH_PART){assignee.scheduleFetchTask(std::make_sharedExecutableLambdaAdapter([this, selected_entry] () mutable{return processQueueEntry(selected_entry);}, common_assignee_trigger, getStorageID()));return true;}if (job_type LogEntry::MERGE_PARTS){auto task std::make_sharedMergeFromLogEntryTask(selected_entry, *this, common_assignee_trigger);assignee.scheduleMergeMutateTask(task);return true;}if (job_type LogEntry::MUTATE_PART){auto task std::make_sharedMutateFromLogEntryTask(selected_entry, *this, common_assignee_trigger);assignee.scheduleMergeMutateTask(task);return true;}assignee.scheduleCommonTask(std::make_sharedExecutableLambdaAdapter([this, selected_entry]() mutable { return processQueueEntry(selected_entry); }, common_assignee_trigger, getStorageID()),/* need_trigger */ true);return true;
}这里主要说明任务的选择和执行1.从队列中选择一个待处理任务
ReplicatedMergeTreeQueue::SelectedEntryPtr selected_entry selectQueueEntry();
if (!selected_entry)return false;
2.根据任务类型选择线程池调度 类型GET_PART / ATTACH_PART
if (job_type LogEntry::GET_PART || job_type LogEntry::ATTACH_PART)
{assignee.scheduleFetchTask(...);return true;
}
类型MERGE_PARTS
if (job_type LogEntry::MERGE_PARTS)
{auto task std::make_sharedMergeFromLogEntryTask(...);assignee.scheduleMergeMutateTask(task);return true;
}
等等。以下我们聚焦于GET_PART任务的执行逻辑
processQueueEntry - executeLogEntry - executeFetch的核心流程为1.找到拥有 entry.new_part_name 或覆盖它的 part 的 其它副本replica/// Looking for covering part. After that entry.actual_new_part_name may be filled.String replica findReplicaHavingCoveringPart(entry, true);获取所有副本名并随机打乱防止偏好某个副本
Strings replicas zookeeper-getChildren(...);
std::shuffle(replicas.begin(), replicas.end(), thread_local_rng);
遍历所有副本跳过自身和不活跃副本
if (replica replica_name) continue;
if (active !zookeeper-exists(.../replica/is_active)) continue;
获取该副本上的所有 part并检查是否包含所需 part 或其覆盖 part如果找到完全一致的 part直接接受如果是覆盖的 part则选覆盖面最大的那个如 all_0_0_10 优于 all_0_0_3MergeTreePartInfo::contains 判断某个 part 是否逻辑上包含另一个。
Strings parts zookeeper-getChildren(.../replica/parts);for (const String part_on_replica : parts)
{if (part_on_replica part_name || MergeTreePartInfo::contains(part_on_replica, part_name, format_version)){if (largest_part_found.empty() || MergeTreePartInfo::contains(part_on_replica, largest_part_found, format_version)){largest_part_found part_on_replica;}}
}
如果找到了覆盖的 part还要做一个额外检查-queue.addFuturePartIfNotCoveredByThem这个函数暂未细看2.确定 fetch 的 part 名称
String part_name entry.actual_new_part_name.empty() ? entry.new_part_name : entry.actual_new_part_name;if (!entry.actual_new_part_name.empty())LOG_DEBUG(log, Will fetch part {} instead of {}, entry.actual_new_part_name, entry.new_part_name);
如果 findReplicaHavingCoveringPart 选中的 replica 拥有 更大的覆盖 part比如你需要的是 part_0_1_1, 它有的是 part_0_3_1则 entry.actual_new_part_name 会被设置成那个覆盖的部分。然后将其作为 fetch 的目标3.拼接 source_replica 的 ZooKeeper 路径
String source_replica_path fs::path(zookeeper_path) / replicas / replica;
构造这个副本在 ZooKeeper 中的路径例如
/clickhouse/tables/s1/my_table/replicas/r2
4.执行 fetchPart该函数会尝试将 part 拉取到本地执行以下操作1. 前置检查与准备工作如果是静态只读表禁止 fetch 操作。
if (isStaticStorage())throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, Table is in readonly mode due to static storage);如果不是 fetch 到 detached 目录先检查是否已有旧的同名 part可能是上次拉取失败的残留如有则触发后台清理线程。
if (!to_detached) {if (auto part getPartIfExists(...)) {cleanup_thread.wakeup();return false;}
}
检查是否有其它线程正在拉取这个 part。
std::lock_guard lock(currently_fetching_parts_mutex);
if (!currently_fetching_parts.insert(part_name).second) {return false; // 已在拉取中避免重复工作
}
2. 日志记录可以看到副本拉取过来的part对应的类型为DOWNLOAD_PART/// LoggingStopwatch stopwatch;MutableDataPartPtr part;DataPartsVector replaced_parts;ProfileEventsScope profile_events_scope;auto write_part_log [] (const ExecutionStatus execution_status){writePartLog(PartLogElement::DOWNLOAD_PART, execution_status, stopwatch.elapsed(),part_name, part, replaced_parts, nullptr,profile_events_scope.getSnapshot());};3.决定拉取方式clone or fetch如果目标 part 是一个 part mutation 的结果尝试查找其 source part并将其 checksums 与目标 part 的 checksums 进行比较。如果两者一致则可以直接 clone 本地的 part。DataPartPtr part_to_clone;{/// If the desired part is a result of a part mutation, try to find the source part and compare/// its checksums to the checksums of the desired part. If they match, we can just clone the local part./// If we have the source part, its part_info will contain covered_part_info.auto covered_part_info part_info;covered_part_info.mutation 0;auto source_part getActiveContainingPart(covered_part_info);/// Fetch for zero-copy replication is cheap and straightforward, so we dont use local clone hereif (source_part !is_zero_copy_part(source_part)){auto source_part_header ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(source_part-getColumns(), source_part-checksums);String part_path fs::path(source_replica_path) / parts / part_name;String part_znode zookeeper-get(part_path);std::optionalReplicatedMergeTreePartHeader desired_part_header;if (!part_znode.empty()){desired_part_header ReplicatedMergeTreePartHeader::fromString(part_znode);}else{String columns_str;String checksums_str;if (zookeeper-tryGet(fs::path(part_path) / columns, columns_str) zookeeper-tryGet(fs::path(part_path) / checksums, checksums_str)){desired_part_header ReplicatedMergeTreePartHeader::fromColumnsAndChecksumsZNodes(columns_str, checksums_str);}else{LOG_INFO(log, Not checking checksums of part {} with replica {}:{} because part was removed from ZooKeeper,part_name, source_zookeeper_name, source_replica_path);}}/// Checking both checksums and columns hash. For example we can have empty part/// with same checksums but different columns. And we attaching it exception will/// be thrown.if (desired_part_header source_part_header.getColumnsHash() desired_part_header-getColumnsHash() source_part_header.getChecksums() desired_part_header-getChecksums()){LOG_TRACE(log, Found local part {} with the same checksums and columns hash as {}, source_part-name, part_name);part_to_clone source_part;}}}远程 fetch获取源副本的 host 地址和端口信息准备 HTTP 拉取所需的认证信息和参数构造 get_part()使用 fetcher.fetchSelectedPart()。接下来看一下远程拉取在fetchSelectedPart中数据在构造HttpReadBuffer中已经获取到。主体流程如下1.准备临时下载目录如 tmp-fetch_part_name用于避免写入中直接影响数据目录后续成功后才正式提交。static const String TMP_PREFIX tmp-fetch_;String tmp_prefix tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;2.传统 Fetch 分支 - downloadPartToDiskdownloadPartToDisk中调用downloadBaseOrProjectionPartToDisk来取Part或者是Projection的Parttry{for (size_t i 0; i projections; i){String projection_name;readStringBinary(projection_name, in);MergeTreeData::DataPart::Checksums projection_checksum;auto projection_part_storage part_storage_for_loading-getProjection(projection_name .proj);projection_part_storage-createDirectories();downloadBaseOrProjectionPartToDisk(replica_path, projection_part_storage, in, output_buffer_getter, projection_checksum, throttler, sync);data_checksums.addFile(projection_name .proj, projection_checksum.getTotalSizeOnDisk(), projection_checksum.getTotalChecksumUInt128());}downloadBaseOrProjectionPartToDisk(replica_path, part_storage_for_loading, in, output_buffer_getter, data_checksums, throttler, sync);}downloadBaseOrProjectionPartToDisk中遍历part中的每一个文件,例如.bin , .mrk等等for (size_t i 0; i files; i){String file_name;UInt64 file_size;readStringBinary(file_name, in);readBinary(file_size, in);/// File must be inside absolute_part_path directory./// Otherwise malicious ClickHouse replica may force us to write to arbitrary path.String absolute_file_path fs::weakly_canonical(fs::path(data_part_storage-getRelativePath()) / file_name);if (!startsWith(absolute_file_path, fs::weakly_canonical(data_part_storage-getRelativePath()).string()))throw Exception(ErrorCodes::INSECURE_PATH,File path ({}) doesnt appear to be inside part path ({}). This may happen if we are trying to download part from malicious replica or logical error.,absolute_file_path, data_part_storage-getRelativePath());written_files.emplace_back(output_buffer_getter(*data_part_storage, file_name, file_size));HashingWriteBuffer hashing_out(*written_files.back());copyDataWithThrottler(in, hashing_out, file_size, blocker.getCounter(), throttler);hashing_out.finalize();if (blocker.isCancelled()){/// NOTE The is_cancelled flag also makes sense to check every time you read over the network,/// performing a poll with a not very large timeout./// And now we check it only between read chunks (in the copyData function).throw Exception(ErrorCodes::ABORTED, Fetching of part was cancelled);}MergeTreeDataPartChecksum::uint128 expected_hash;readPODBinary(expected_hash, in);if (expected_hash ! hashing_out.getHash())throw Exception(ErrorCodes::CHECKSUM_DOESNT_MATCH,Checksum mismatch for file {} transferred from {} (0x{} vs 0x{}),(fs::path(data_part_storage-getFullPath()) / file_name).string(),replica_path,getHexUIntLowercase(expected_hash),getHexUIntLowercase(hashing_out.getHash()));if (file_name ! checksums.txt file_name ! columns.txt file_name ! IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME file_name ! IMergeTreeDataPart::METADATA_VERSION_FILE_NAME)checksums.addFile(file_name, file_size, expected_hash);}之后将Part涉及的文件写到磁盘/// Call fsync for all files at once in attempt to decrease the latencyfor (auto file : written_files){file-finalize();if (sync)file-sync();}5 扩展如何判断一个Part是否包含另一个Part通过这个函数完成bool contains(const MergeTreePartInfo rhs) const{/// Containing part may have equal level iff block numbers are equal (unless level is MAX_LEVEL)/// (e.g. all_0_5_2 does not contain all_0_4_2, but all_0_5_3 or all_0_4_2_9 do)bool strictly_contains_block_range (min_block rhs.min_block max_block rhs.max_block) || level rhs.level|| level MAX_LEVEL || level LEGACY_MAX_LEVEL;return partition_id rhs.partition_id /// Parts for different partitions are not merged min_block rhs.min_block max_block rhs.max_block level rhs.level mutation rhs.mutation strictly_contains_block_range;}同步删除表DROP DATABASE IF EXISTS my_database SYNC;删database目录的信息
system drop database replica 分片名|副本名 from database db_name;删replica下信息
system drop replica 副本名 from database db_name;剔除表的元信息SYSTEM DROP REPLICA r1 FROM ZKPATH /clickhouse/tables/s1/replicated_table5;在集群中创建表CREATE TABLE replicated_db.replicated_table ON CLUSTER my_cluster
(id UInt64,event_time DateTime,event_type String
)
ENGINE ReplicatedMergeTree(/clickhouse/tables/{shard}/replicated_table, {replica})
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_time, id)
SETTINGS index_granularity 8192