广州网站开发服务,成都周边旅游景点大全,wordpress点文章标题怎么出现错误,数字营销专业学什么概述
都说“实践是检验真理的唯一标准”#xff0c;光说不练假把式#xff0c;那么本文就通过实际的测试来感受一下Doris和clickhouse在读写方面的性能差距#xff0c;看看Doris盛名之下#xff0c;是否真有屠龙之技#xff1b;clickhouse长锋出鞘#xff0c;是否敢缚苍…概述
都说“实践是检验真理的唯一标准”光说不练假把式那么本文就通过实际的测试来感受一下Doris和clickhouse在读写方面的性能差距看看Doris盛名之下是否真有屠龙之技clickhouse长锋出鞘是否敢缚苍龙
废话不多说上货。
硬件配置
在这里我使用多台物理机搭建了clickhouse和Doris集群。
clickhouse集群
节点IP分片编号副本编号物理配置ck93192.168.101.931148core 256G 27T HDDck94192.168.101.941248core 256G 27T HDDck96192.168.101.962148core 256G 27T HDDck97192.168.101.972248core 256G 27T HDD Doris集群
角色节点IP物理配置FEck94192.168.101.9448core 256G 27T HDDBEck93192.168.101.9348core 256G 27T HDDBEck94192.168.101.9448core 256G 27T HDDBEck96192.168.101.9648core 256G 27T HDD clickhouse集群和Doris集群共用一套物理机。 数据准备
由于clickhouse与Doris共用物理机资源为了避免互相干扰在Doris测试时clickhouse集群停止一切读写操作同理当clickhouse集群测试时Doris集群也停止一切读写操作。
本次测试主要针对全文检索的场景。测试数据为clickhouse-server的日志文件三个节点上的日志共计2亿条数据采集写入kafka后数据量为30GB。 我们将数据通过采集器组织成json格式后采集到kafka数据结构如下所示
{message: 2023.12.14 05:25:20.983533 [ 243360 ] {} Information aimeter.apm_span_index_trace_id (ReplicatedMergeTreePartCheckThread): Checking if anyone has a part 20231104_7476_7590_23 or covering part.,id: cd8946be124a4079f4f372782ee6da1f,filehashkey: 484d4e40bf93db91e25fbdfb47f084fe,collectiontime: 2023-12-18T10:56:08.12508:00,hostname: ck93,path: /data01/chenyc/logs/clickhouse-server/clickhouse-server.log.4,rownumber: 6,seq: 6,ip: 192.168.101.93,topic: log_test
}
主要测试数据写入和数据查询两个方面。
预设查询场景有如下几个
根据ip和path维度统计每个ip下path的个数统计每个ip下的Error日志的数量统计日志中出现Debug 和 cdb56920-2d39-4e6d-be99-dd6ef24cc66a 的条数统计出现Trace和gauge.apm_service_span出现的次数查询Error中出现READ_ONLY的日志明细查询日志中出现上海关键字的日志么明细
主要测试的性能指标包括
写入性能 写入速度写入过程的资源占用CPU负载写入后数据的压缩占比 查询性能 查询耗时查询的资源占用查询命中索引的情况 另增加测试一边写入一遍查询时对读写的影响。
Doris建表语句
建表语句如下
CREATE database demo;
use demo;
CREATE TABLE demo.log_test (id CHAR(34) NOT NULL COMMENT 每行的唯一hash标识id,message STRING NOT NULL COMMENT 日志内容,filehashkey CHAR(34) NOT NULL COMMENT 每个文件的hash值用于标识文件唯一性,collectiontime DATETIME(3) COMMENT 采集时间,hostname VARCHAR(20) NOT NULL COMMENT 主机名,path VARCHAR(256) NOT NULL COMMENT 文件路径,rownumber BIGINT NOT NULL COMMENT 行号,seq BIGINT NOT NULL COMMENT 在同一个文件内连续的序列号,ip CHAR(16) NOT NULL COMMENT 节点IP,topic CHAR(16) NOT NULL COMMENT 所属kafka的topic,INDEX idx_message_inv(message) USING INVERTED PROPERTIES(parser unicode,parser_mode fine_grained,support_phrase true) COMMENT 倒排索引,INDEX idx_message_ngram(message) USING NGRAM_BF PROPERTIES(gram_size5, bf_size4096) COMMENT ngram_bf 索引
)
DUPLICATE KEY(id)
PARTITION BY RANGE(collectiontime) ()
DISTRIBUTED BY HASH(id) BUCKETS AUTO
ROLLUP (r1 (message),r2 (ip, path)
)
PROPERTIES (dynamic_partition.enable true,dynamic_partition.time_unit DAY,dynamic_partition.start -7,dynamic_partition.end 3,dynamic_partition.prefix p,dynamic_partition.buckets 32,compressionzstd
);
说明如下
按collectiontime动态分区默认三个副本在message上创建一个unicode倒排索引一个NGram BloomFilter索引创建两个ROLLUP 用来重建前缀索引压缩方式采用ZSTD
clickhouse建表语句
--- 本地表
create table log_test on cluster abc (id String NOT NULL CODEC(ZSTD(1)),message String NOT NULL CODEC(ZSTD(1)) ,filehashkey String NOT NULL CODEC(ZSTD(1)) ,collectiontime DateTime64(3) CODEC(DoubleDelta, LZ4),hostname LowCardinality(String) NOT NULL CODEC(ZSTD(1)) ,path String NOT NULL CODEC(ZSTD(1)) ,rownumber Int64 NOT NULL ,seq Int64 NOT NULL ,ip LowCardinality(String) NOT NULL CODEC(ZSTD(1)) ,topic LowCardinality(String) NOT NULL CODEC(ZSTD(1)) ,
INDEX message_idx message TYPE ngrambf_v1(5, 65535, 1, 0) GRANULARITY 1,PROJECTION p_cnt (SELECT ip, path, count() GROUP BY ip, path)
)ENGINE ReplicatedMergeTree
PARTITION BY toYYYYMMDD(collectiontime)
ORDER BY (collectiontime, ip, path);--- 分布式表
create table dist_log_test on cluster abc as log_test engine Distributed(abc, default, log_test)
说明如下
string字段使用ZSTD压缩时间字段使用DoubleDelta压缩在message字段是创建一个ngrambf_v1的二级索引创建一个projection用于根据ip和path维度的预聚合根据collectiontime字段按天做分区
写入性能
clickhouse
使用clickhouse_sinker向ck集群写入数据为公平起见clickhouse_sinker为单实例。
配置文件如下
{clickhouse: {cluster: abc,db: default,hosts: [[192.168.101.93, 192.168.101.94],[192.168.101.96, 192.168.101.97]],port: 19000,username: default,password: 123456,maxOpenConns: 5,retryTimes: 0},kafka: {brokers: 192.168.101.94:29092,192.168.101.96:29092,192.168.101.98:29092},tasks: [{name: cktest,topic: log_test,earliest: true,consumerGroup: abc,parser: fastjson,tableName: log_test,autoSchema: true,dynamicSchema:{enable: false},prometheusSchema: false,bufferSize: 1000000,flushInterval: 10}],logLevel: info
}
测试结果
数据总量CPUsinker内存sinkerCPUclickhouse内存clickhouse写入速度条/s写入速度M/s总耗时压缩前大小压缩后大小压缩比2亿15 core22G5core4G280k/s125MB/s12min88.10GB10.39GB81
clickhouse中数据情况 Doris
Doris使用Routine load向Doris集群写入数据由于有3个backend开启3个并发度。
Routine Load配置如下
CREATE ROUTINE LOAD demo.doris_test ON log_test
COLUMNS(message,id,filehashkey,collectiontime,hostname,path,rownumber,seq,ip,topic)
PROPERTIES
(desired_concurrent_number3,max_error_number 500,max_batch_interval 20,max_batch_rows 300000,max_batch_size 209715200,strict_mode false,format json
)
FROM KAFKA
(kafka_broker_list 192.168.101.94:29092,192.168.101.96:29092,192.168.101.98:29092,kafka_topic log_test,kafka_partitions 0,1,2,3,4,5,kafka_offsets 0,0,0,0,0,0
);
这里 max_error_number 设置了500 意思是容忍task失败500次。之所以设置这个容忍度是因为Doris采用RapidJSON库解析JSON串这个库解析部分乱码数据时会报错
Reason: Parse json data for JsonDoc failed. code: 10, error info: The input is not valid UTF-8. src line [{message:2023.12.13 18:41:17.762463 [ 143536 ] {bcf2d3d6-a68a-46a3-a008-55399f6a596f} Error void DB::ParallelParsingInputFormat::onBackgroundException(size_t): Code: 27. DB::ParsingException: Cannot parse input: expected \\t before: d8caa60-f8f4-45a0-9e45-ee1a9344f774\\t200.188.12.25\\t\\\\N\\tssh\\t\\\\N\\t22\\tzx2\\t1\\t1\\t2\\t180\\t0\\tIT运维管理平台e海智维\\t010336\\t季杨\\t6339\\t运维支持部\\t1\\t广东: \nRow 1:\nColumn 0, name: id, type: Int64, parsed text: \4\\nColumn 1, name: deviceId, type: Nullable(Int64), parsed text: \4\\nERROR: garbage after Nullable(Int64): \d8caa60-f8\\n\n: (at row 1)\n. (CANNOT_PARSE_INPUT_ASSERTION_FAILED), Stack trace (when copying this message, always include the lines below):\n\n0. DB::Exception::Exception(DB::Exception::MessageMasked, int, bool) 0xe18d895 in /usr/bin/clickhouse\n1. ? 0xe1ec044 in /usr/bin/clickhouse\n2. DB::throwAtAssertionFailed(char const*, DB::ReadBuffer) 0xe1ebf41 in /usr/bin/clickhouse\n3. DB::TabSeparatedFormatReader::skipFieldDelimiter() 0x14adec49 in /usr/bin/clickhouse\n4. DB::RowInputFormatWithNamesAndTypes::readRow(std::vectorCOWDB::IColumn::mutable_ptrDB::IColumn, std::allocatorCOWDB::IColumn::mutable_ptrDB::IColumn, DB::RowReadExtension) 0x149d531f in /usr/bin/clickhouse\n5. DB::IRowInputFormat::generate() 0x149b08ae in /usr/bin/clickhouse\n6. DB::ISource::tryGenerate() 0x14933695 in /usr/bin/clickhouse\n7. DB::ISource::work() 0x14933206 in /usr/bin/clickhouse\n8. DB::ParallelParsingInputFormat::parserThreadFunction(std::shared_ptrDB::ThreadGroupStatus, unsigned long) 0x14a5c341 in /usr/bin/clickhouse\n9. ThreadPoolImplThreadFromGlobalPoolImplfalse::worker(std::__list_iteratorThreadFromGlobalPoolImplfalse, void*) 0xe260ea5 in /usr/bin/clickhouse\n10. void std::__function::__policy_invokervoid ()::__call_implstd::__function::__default_alloc_funcThreadFromGlobalPoolImplfalse::ThreadFromGlobalPoolImplvoid ThreadPoolImplThreadFromGlobalPoolImplfalse::scheduleImplvoid(std::functionvoid (), long, std::optionalunsigned long, bool)::lambda0()(void)::lambda(), void ()(std::__function::__policy_storage const*) 0xe263a15 in /usr/bin/clickhouse\n11. ThreadPoolImplstd::thread::worker(std::__list_iteratorstd::thread, void*) 0xe25cc73 in /usr/bin/clickhouse\n12. ? 0xe2628e1 in /usr/bin/clickhouse\n13. start_thread 0x7ea5 in /usr/lib64/libpthread-2.17.so\n14. clone 0xfeb0d in /usr/lib64/libc-2.17.so\n (version 23.3.1.2823 (official build)),id:b63049ee2121095314365cc38aa23472,filehashkey:778261a8412adda6f7e3f7297ea2d5d1,collectiontime:2023-12-18T11:07:21.68008:00,hostname:master94,path:/data01/chenyc/logs/clickhouse-server/clickhouse-server.err.log.0,rownumber:2883976,seq:2842107,ip:192.168.101.94,topic:log_test}];
为了屏蔽掉这个报错导致任务异常PAUSED所以将出错容忍度设置为500。
测试结果如下
数据总量CPUFE内存FECPUBE内存BE写入速度条/s写入速度M/s总耗时压缩前大小压缩后大小压缩比2亿1 core2G19 core15G126K/s81MB/s27min120.34GB22GB51
任务情况
mysql show routine load\G;
*************************** 1. row ***************************Id: 22719Name: doris_testCreateTime: 2023-12-18 16:08:49PauseTime: NULLEndTime: NULLDbName: default_cluster:demoTableName: log_testIsMultiTable: falseState: RUNNINGDataSourceType: KAFKACurrentTaskNum: 3JobProperties: {max_batch_rows:300000,timezone:Asia/Shanghai,send_batch_parallelism:1,load_to_single_tablet:false,current_concurrent_number:3,delete:*,partial_columns:false,merge_type:APPEND,exec_mem_limit:2147483648,strict_mode:false,jsonpaths:,max_batch_interval:20,max_batch_size:209715200,fuzzy_parse:false,partitions:*,columnToColumnExpr:message,id,filehashkey,collectiontime,hostname,path,rownumber,seq,ip,topic,whereExpr:*,desired_concurrent_number:3,precedingFilter:*,format:json,max_error_number:500,max_filter_ratio:1.0,json_root:,strip_outer_array:false,num_as_string:false}
DataSourceProperties: {topic:log_test,currentKafkaPartitions:0,1,2,3,4,5,brokerList:192.168.101.94:29092,192.168.101.96:29092,192.168.101.98:29092}CustomProperties: {group.id:doris_test_4fba1257-9291-44cc-b4ef-61dd64f58c5e}Statistic: {receivedBytes:129215158726,runningTxns:[],errorRows:57,committedTaskNum:720,loadedRows:201594590,loadRowsRate:41912,abortedTaskNum:0,errorRowsAfterResumed:0,totalRows:201594647,unselectedRows:0,receivedBytesRate:26864584,taskExecuteTimeMs:4809870}Progress: {0:33599107,1:33599106,2:33599107,3:33599107,4:33599107,5:33599107}Lag: {0:0,1:0,2:0,3:0,4:0,5:0}
ReasonOfStateChanged: ErrorLogUrls: http://192.168.101.93:58040/api/_load_error_log?file__shard_15/error_log_insert_stmt_6c68b52cc0f849f7-ac9a0ce97dabf7e9_6c68b52cc0f849f7_ac9a0ce97dabf7e9, http://192.168.101.96:58040/api/_load_error_log?file__shard_16/error_log_insert_stmt_7e6449b5c1b6469f-901d55766835b101_7e6449b5c1b6469f_901d55766835b101, http://192.168.101.94:58040/api/_load_error_log?file__shard_18/error_log_insert_stmt_44ca6b30a5394689-b34d2e33de930a0c_44ca6b30a5394689_b34d2e33de930a0cOtherMsg: User: rootComment:
这个地方的指标数据是有BUG的 Statistic: {receivedBytes:129215158726,runningTxns:[],errorRows:57,committedTaskNum:720,loadedRows:201594590,loadRowsRate:41912,abortedTaskNum:0,errorRowsAfterResumed:0,totalRows:201594647,unselectedRows:0,receivedBytesRate:26864584,taskExecuteTimeMs:4809870}
我们可以看一下它的计算逻辑
public MapString, Object summary() {MapString, Object summary Maps.newHashMap();summary.put(totalRows, Long.valueOf(totalRows));summary.put(loadedRows, Long.valueOf(totalRows - this.errorRows - this.unselectedRows));summary.put(errorRows, Long.valueOf(this.errorRows));summary.put(errorRowsAfterResumed, Long.valueOf(this.errorRowsAfterResumed));summary.put(unselectedRows, Long.valueOf(this.unselectedRows));summary.put(receivedBytes, Long.valueOf(this.receivedBytes));summary.put(taskExecuteTimeMs, Long.valueOf(this.totalTaskExcutionTimeMs));summary.put(receivedBytesRate, Long.valueOf(this.receivedBytes * 1000 / this.totalTaskExcutionTimeMs));summary.put(loadRowsRate, Long.valueOf((this.totalRows - this.errorRows - this.unselectedRows) * 1000/ this.totalTaskExcutionTimeMs));summary.put(committedTaskNum, Long.valueOf(this.committedTaskNum));summary.put(abortedTaskNum, Long.valueOf(this.abortedTaskNum));summary.put(runningTxns, runningTxnIds);return summary;}
receivedBytesRate是拿总条数/总耗时loadRowsRate是拿总条数-出错条数-未选中条数 再除以总时间看起来是没有问题的。但问题出在这个总时间taskExecuteTimeMs上
private void updateNumOfData(long numOfTotalRows, long numOfErrorRows, long unselectedRows, long receivedBytes,long taskExecutionTime, boolean isReplay) throws UserException {this.jobStatistic.totalRows numOfTotalRows;this.jobStatistic.errorRows numOfErrorRows;this.jobStatistic.unselectedRows unselectedRows;this.jobStatistic.receivedBytes receivedBytes;this.jobStatistic.totalTaskExcutionTimeMs taskExecutionTime;...
}
此处计算总行数去统计各个并发的总数是没问题的但是总耗时也这样计算的话实际上是多算了也就是说有几个并发度这个时间就多算了多少倍。因此这个导入任务的真实时间应该是 4809870/ 3 1603290也就是27分钟。
插入后表中数据
mysql select count(*) from log_test;
-----------
| count(*) |
-----------
| 201594590 |
-----------
1 row in set (0.05 sec) Doris接收到的数据总量为120G远大于clickhouse压缩前的数据量88G猜测原因可能是写放大导致。因为Routine Load实际上是FE分配任务后在BE上执行stream load而stream load则是先将数据拉取到一个BE节点然后广播发送给其他节点。 写入性能小结
从2亿条数据的写入性能来看clickhouse写入可以达到28w条每秒Doris大约12w条/s clickhouse的写入性能是Doris的2倍以上。
但是从资源消耗来看Doris的写入由于是由Routine Load完成占用的是BE节点的资源而clickhouse使用第三方的clickhouse_sinker完成完全可以和节点部署在不同机器上从而避免对clickhouse集群资源的侵占。
Doris在写入性能落后的情况下CPU的消耗与clickhouse_sinker相当内存稍微占用少一点但是和clickhouse节点来比就完全不是一个数量级了。clickhouse在数据写入时CPU和内存的波动都比较小处于正常水平。不会吃太多的资源从而影响到查询。更为重要的是Doris写入的资源占用是每个节点都要占这么多N个节点这个资源消耗就是N倍这和clickhouse_sinker之间的差距就进一步拉大了。
clickhouse_sinker是擎创科技开源的一个将kafka的数据写入clickhouse的开源项目。拥有着低资源消耗高性能写入高容错、稳定的运行能力。它通过写本地表的方式可以达到数据均衡写入到各节点自动按照shardingkey做hash路由多个sinker进程实例之间自动根据kafka的消费lag来合理分配写入任务等非常适合作为clickhouse数据写入的方案。
从压缩性能上来看由于clickhouse和Doris对压缩前的数据统计口径不一致所以光看压缩比意义不大。但kafka的数据是固定的kafka里的数据是30Gkafka也有自己的压缩算法使用的是zstd写入到clickhouse后数据为10.17GB这个大小没有算入副本如果算上副本应该乘以2也就是20.34GB。
Doris的数据大小通过SHOW TABLE STATUS FROM demo LIKE %log_test%;查询得到Data_length为67.71GB除以三个副本得到22.57GB。
不算副本只看单一数据的大小clickhouse的压缩率达到了Doris的2.2倍这个压缩差距还是非常大的。 查询性能
我们分上面六个预设的场景进行查询测试。
场景说明场景1根据ip和path维度统计每个ip下path的个数场景2统计每个ip下的Error日志的数量场景3统计日志中出现Debug 和 query_id 为 cdb56920-2d39-4e6d-be99-dd6ef24cc66a 的条数场景4统计出现Trace和gauge.apm_service_span出现的次数场景5查询Error中出现READ_ONLY的日志明细场景6查询日志中出现“上海”关键字的明细 查询SQL如下
场景数据库SQL语句场景1clickhouseSELECT ip, path, count() FROM dist_log_test GROUP BY ip,path场景1DorisSELECT ip, path, count() FROM log_test GROUP BY ip,path场景2clickhouseSELECT ip, count() FROM dist_log_test WHERE message LIKE %Error% GROUP BY ip场景2DorisSELECT ip, count() FROM log_test WHERE message MATCH_ANY Error GROUP BY ip场景3clickhouseSELECT count() FROM dist_log_test WHERE message LIKE %Debug% AND message LIKE %cdb56920-2d39-4e6d-be99-dd6ef24cc66a%场景3DorisSELECT count() FROM log_test WHERE message MATCH_ALL Debug cdb56920-2d39-4e6d-be99-dd6ef24cc66a场景4clickhouseSELECT count() FROM dist_log_test WHERE message LIKE %Trace% AND message LIKE %gauge.apm_service_span%场景4DorisSELECT count() FROM log_test WHERE message MATCH_ALL Trace gauge.apm_service_span场景5clickhouseSELECT * FROM dist_log_test WHERE message LIKE %Error% AND message LIKE %READ_ONLY%场景5DorisSELECT * FROM log_test WHERE message MATCH_ALL Error READ_ONLY场景6clickhouseSELECT * FROM dist_log_test WHERE message LIKE %上海%场景6DorisSELECT * FROM log_test WHERE message MATCH_ANY 上海 查询结果
备注查询结果取连续查询10次的中位数。 数据库场景1场景2场景3场景4场景5场景6clickhouse0.078 sec7.948 sec0.917 sec3.362 sec4.584 sec3.784 secDoris0.84 sec5.91 sec0.19 sec0.84 sec5.07 sec0.75 sec
初步分析
从上述结果来看clickhouse胜2负4。其中场景1是碾压性优势查询性能是Doris的10倍多这是因为Doris本身不善于count类的查询而clickhouse依靠projection的预聚合查询达到了极致性能。
场景5之所以clickhouse能领先有必要说明一下按照原计划Doris使用MATCH ALL语法去查询但是没有查询到结果不明白为什么改用LIKE查询后性能比较差达到了5秒左右甚至比clickhouse更慢这也是我没有想到的。
Doris在PK中胜4负2比分大幅领先。除了场景2查询耗时相差不大之外其余场景3、4、6都是降维打击性能遥遥领先达到了clickhouse的5倍左右。这自然是得益于Doris的全文检索功能立了大功。
场景2之所以相差不大还是因为SQL中涉及到了count的计算前面说过Doris不擅长count类的查询因此性能比较拉胯也就情有可原了。但即便如此依然依然能达到clickhouse的1.3倍。 clickhouse 日志存储优化方案-构建隐式列
构建隐式列或map列是目前业界各大企业使用clickhouse存储日志的通用落地方案。下面摘取了一些成熟的日志存储的实践方案无不例外都用到了构建隐式列或Map列的思想
使用 ClickHouse 构建通用日志系统Uber 如何使用 ClickHouse 建立快速可靠且与模式无关的日志分析平台还在用 ES 查日志吗快看看石墨文档 Clickhouse 日志架构玩法Building an Observability Solution with ClickHouse - Part 1 - LogsB站基于Clickhouse的下一代日志体系建设实践
所谓隐式列 Implicit columns 我们可以将message中常用的有规律的一些字段通过正则表达式提取出来作为一个隐式列构建一个大宽表然后查询的时候匹配该隐式列从而达到避免走或少走全文检索的效果。你clickhouse不是不擅长模糊查询么那么我就尽量不走模糊查询不就行了吗
比如本案例中我们可以将日志中的query_idthread_idloglevel timestamp等内容提取出来。
下例为通过我们自研采集器提取字段的例子
if $raw_event ~ /(^\d\.\d\.\d\s\d:\d:\d\.\d)\s\[\s(\d)\s\]\s{(.*)}\s(\w).*/ {$timestampreplace($1, ., -, 2);$threadid$2;$queryid$3;$loglevel$4;
}
采集到的数据样例如下
{message: 2023.12.07 03:41:18.775976 [ 154026 ] {} Error aimeter.metric_agg (ReplicatedMergeTreePartCheckThread): No replica has part covering 202312_11890_20601_1725 and a merge is impossible: we didnt find a smaller part with the same min block.,id: f7efeef0501a4f13f8561d2dfa18461d,filehashkey: 12d1faf9499acb0664d5dfe4af9d761c,collectiontime: 2023-12-18T17:56:26.58608:00,hostname: master94,path: /data01/chenyc/logs/clickhouse-server/clickhouse-server.err.log.1,rownumber: 3,seq: 3,timestamp: 2023-12-07 03:41:18.775976,threadid: 154026,queryid: ,loglevel: Error,ip: 192.168.101.94,topic: log_test2
}
建表语句如下
CREATE TABLE default.log_test2 on cluster abc (id String CODEC(ZSTD(1)),message String CODEC(ZSTD(1)),filehashkey String CODEC(ZSTD(1)),collectiontime DateTime64(3),hostname String,path String CODEC(ZSTD(1)),rownumber Int64,seq Int64,timestamp DateTime64(3) CODEC(DoubleDelta, LZ4),threadid Int32,queryid String CODEC(ZSTD(1)),loglevel LowCardinality(String),ip String CODEC(ZSTD(1)),topic LowCardinality(String),INDEX level_idx loglevel TYPE tokenbf_v1(4096, 1, 0) GRANULARITY 1,INDEX ip_idx ip TYPE tokenbf_v1(4096, 1, 0) GRANULARITY 1,INDEX query_idx queryid TYPE ngrambf_v1(10, 30720, 1, 0) GRANULARITY 1,INDEX message_idx message TYPE ngrambf_v1(5, 65535, 1, 0) GRANULARITY 1,PROJECTION p_cnt (SELECT ip, path, count() GROUP BY ip, path)
) ENGINE ReplicatedMergeTree
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY(timestamp, ip, path, loglevel)--- 分布式表
create table dist_log_test2 on cluster abc as log_test2 engine Distributed(abc, default, log_test2) 由于多了4个字段kafka中的数据膨胀了4G。 通过clickhouse_sinker将数据导入到clickhouse集群。
写入clickhouse后如下所示 针对上面6种场景改写SQL如下
场景查询SQL语句场景1SELECT ip, path, count() FROM dist_log_test2 GROUP BY ip,path场景2SELECT ip, count() FROM dist_log_test2 WHERE hasToken(loglevel, Error) GROUP BY ip场景3SELECT count() FROM dist_log_test2 WHERE hasToken(loglevel, Debug) AND queryid cdb56920-2d39-4e6d-be99-dd6ef24cc66a场景4SELECT count() FROM dist_log_test2 WHERE hasToken(loglevel, Trace) AND message LIKE %gauge.apm_service_span%场景5SELECT * FROM dist_log_test2 WHERE hasToken(loglevel, Error) AND message LIKE %READ_ONLY%场景6SELECT * FROM dist_log_test2 WHERE message LIKE %上海% 为了方便对比我们将上两次的查询结果也贴到一块。
数据库场景1场景2场景3场景4场景5场景6clickhouse0.078 sec7.948 sec0.917 sec3.362 sec4.584 sec3.784 secDoris0.84 sec5.91 sec0.19 sec0.84 sec5.07 sec0.75 secclickhouse with Implicit columns0.064 sec0.390 sec0.317 sec1.117 sec4.288 sec3.437 sec
先刨除掉场景1场景6不看因为查询SQL与不加隐式列是一样的所以性能也差不多。
关键看场景2场景3场景4。场景2比未加隐式列之前性能提升了20倍比Doris提升了15倍提升非常大。而场景3和场景4也在不加隐式列的基础上提升了3倍左右的性能虽然还比不上Doris但差距已经追小了许多Doris仅领先1.5倍。
场景5由于仍然有根据message字段做模糊搜索所以性能提升不大。
总而言之在绝大多数场景通过隐式列的方式改写查询语句可以将原有的查询性能提升3倍左右。 通过构建隐式列的方式存储日志可有效解决查询性能的问题。但在交互上就显得不那么友好。因为对于使用者来说是不知道有这些列的存在的或者说如果使用者没有很强的业务感知能力都是随性搜索短语的话同样会导致查询的内容只能通过模糊匹配那么就起不到任何加速作用。
因此要想用好隐式列首先需要在交互上引导用户去使用隐式列进行条件搜索而不是随意选择关键字
其次是要做好日志规范否则不仅提取有效关键字比较困难而且不同的业务日志有不同的提取方法有不同的关键字导致提取出来的维度关键字五花八门这对搜索来说也带来了一定的困难。
大查询对写入的影响
我们知道在实际生产环境除非有一套相对成熟的存算分离方案否则写入对查询的互相影响是不可避免的。由于数据量有限为了尽可能模拟大查询对写入的影响我们采用场景2的SQL语句通过5个SQL并发查询观察写入数据的速度变化。
为了尽可能模拟真实并发情况在这里使用golang分别实现了5并发查询数据库的功能。
clickhouse
package main
import (fmtsync
github.com/ClickHouse/clickhouse-go/v2
)
func main() {conn : clickhouse.OpenDB(clickhouse.Options{Addr: []string{192.168.101.93:19000,192.168.101.94:19000,192.168.101.96:19000,192.168.101.97:19000},Auth: clickhouse.Auth{Database: default,Username: default,Password: 123456,},})
err : conn.Ping()if err ! nil {panic(err)}
query : SELECT ip, count() FROM dist_log_test WHERE message LIKE %Error% GROUP BY ipfor {var wg sync.WaitGroupvar lastErr errorfor i : 0; i 5; i {wg.Add(1)go func() {defer wg.Done()rows, err : conn.Query(query)if err ! nil {lastErr errreturn}defer rows.Close()for rows.Next() {var (ip stringcnt uint64)if err : rows.Scan(ip, cnt); err ! nil {lastErr errreturn}fmt.Printf(ip: %s, count: %d\n, ip, cnt)}}()}wg.Wait()if lastErr ! nil {panic(lastErr)}}
}查询Doris
package main
import (database/sqlfmtsync
_ github.com/go-sql-driver/mysql
)
func main() {conn, err : sql.Open(mysql, root:(192.168.101.94:59030)/demo)if err ! nil {panic(err)}
if err conn.Ping(); err ! nil {panic(err)}
query : SELECT ip, count() FROM log_test WHERE message MATCH_ANY Error GROUP BY ipfor {var wg sync.WaitGroupvar lastErr errorfor i : 0; i 5; i {wg.Add(1)go func() {defer wg.Done()rows, err : conn.Query(query)if err ! nil {lastErr errreturn}defer rows.Close()for rows.Next() {var (ip stringcnt uint64)if err : rows.Scan(ip, cnt); err ! nil {lastErr errreturn}fmt.Printf(ip: %s, count: %d\n, ip, cnt)}}()}wg.Wait()if lastErr ! nil {panic(lastErr)}}
}clickhouse clickhouse5个并发同时查询CPU飙到了接近40 core几乎占满了物理机的80%的资源。
此时我们启动clickhouse_sinker任务sinker进程部署在clickhouse集群以外的节点观察写入性能如下
写入数据条数写入数据总量写入性能条/s写入性能M/s总耗时2亿条数据88GB196k/s88M/s17min
上面这个测试案例比较特殊因为所有的查询请求都打到了同一个clickhouse节点导致这个节点的CPU占用特别高其他节点的CPU比较正常整体写入性能有所下降约为原来的70%。但总体性能还是不错的单sinker进程可以达到20w行每秒。
如果我们在clickhouse的查询端加一层proxy使得查询请求比较均衡地分不到各个clickhouse节点相信查询性能还能进一步提高。 Doris
5个并发查询Doris可以看到Doris的FE几乎无资源损耗但是BE的CPU吃满。而且与clickhouse不同的是clickhouse是仅请求打到这个节点上这个节点的CPU才会占得比较高但是Doris的各个节点的CPU都占得比较高。 我们同样启动一个Routine Load向Doris写入数据性能如下
写入数据条数写入数据总量写入性能条/s写入性能M/s总耗时2亿条数据120GB71k/s43MB/s47min
当我们在并发执行很多耗时的大查询时由于CPU占用比较满导致写入性能下降了50%左右这使得原本就不富裕的生活更加雪上加霜。
但是我们注意到一个有意思的现象原本场景2的查询5秒能返回结果但是在高速写入时查询速度降到了15秒事实上所有的查询都慢了2-3倍左右clickhouse更夸张查询会慢5倍以上。所以Doris在同时有读写请求的时候是优先保证写请求的资源的。
大查询写入优化方案-用户资源限制
clickhouse和Doris都可以通过设置用户权限来限制某个查询用户所能使用的资源。从上面的测试结果来看我们发现即使clickhouse写入时大查询占据了80%资源clickhouse的写入速度190k/s还是高于Doris无干扰写入的速度126k/s 因此我们主要来看clickhouse在专门设置了一个查询用户后的插入性能情况。
我们新增一个query的用户 通过profile限制其查询最大线程数为8 由于可用线程数减少为了避免查询超时将SQL超时时间修改到1个小时。
同时通过quota设置当1分钟内连续5次报错就禁止该用户查询 我们通过这个query用户起5个并发轮询查询副本节点ck94, ck97clickhouse写ck93, ck96。clickhouse_sinker在集群以外的节点上启动。
测试结果如下
写入数据条数写入数据总量写入性能条/s写入性能M/s总耗时2亿条数据88GB250~270k/s114MB/s13min 对比无干扰时写入写入性能仅下降5-10%左右这主要是因为写入本身消耗的节点资源就比较少当查询的资源被限制clickhouse的节点就有足够多的资源去保障写入并且我们通过配置简单的读写分离的方式让查询请求尽量分配到不同的副本节点可以进一步减小查询对写入的影响。
不过需要注意的是用户的最大线程数限制的是单个查询所使用的最大线程资源如果多个查询语句同时请求到同一个节点仍然能将该节点的CPU负载占满。测试过程中尝试5个并发全部请求到一个节点该节点CPU能达到2500%。
总结
本文重点比较了clickhouse和Doris在日志存储场景下的写入能力和查询能力。
写入性能上clickhouse完胜。在写入性能领先Doris 2倍的情况下可以做到使用更少的系统资源且压缩率也达到了Doris的2倍以上。
至于查询性能由于Doris支持倒排索引在模糊查询场景Doris对比clickhouse有5倍左右的提升虽然clickhouse可以通过构建隐式列的方式提升查询效率但Doris仍然能够做到1.5倍左右的性能领先。
而在需要聚合计算count的查询场景Doris明显不如clickhouse高效。
在读写同时进行的场景在大查询比较多时clickhouse和Doris的写入性能都有所下降clickhouse写入性能下降到70%Doris则直接腰斩。而且都对查询影响比较大。
通过配置专门的查询用户限制查询查询资源可有效缓解大查询对写入带来的性能影响。 本专栏知识点是通过零声教育的系统学习进行梳理总结写下文章对C/C课程感兴趣的读者可以点击链接查看详细的服务C/CLinux服务器开发/高级架构师