帮做网站一般多少钱,wordpress 暂无评论,wordpress 工作室模板,一起做网站17ES数据预处理
Ingest Node
Elasticsearch 5.0后#xff0c;引入的一种新的节点类型。默认配置下#xff0c;每个节点都是Ingest Node#xff1a;
具有预处理数据的能力#xff0c;可拦截lndex或Bulk API的请求对数据进行转换#xff0c;并重新返回给Index或Bulk APl
无…ES数据预处理
Ingest Node
Elasticsearch 5.0后引入的一种新的节点类型。默认配置下每个节点都是Ingest Node
具有预处理数据的能力可拦截lndex或Bulk API的请求对数据进行转换并重新返回给Index或Bulk APl
无需Logstash就可以进行数据的预处理例如
为某个字段设置默认值重命名某个字段的字段名对字段值进行Split操作支持设置Painless脚本对数据进行更加复杂的加工 Ingest Node VS Logstash
LogstashIngest Node数据输入与输出支持从不同的数据源读取并写入不同的数据源支持从ES REST API获取数据并且写入Elasticsearch数据缓冲实现了简单的数据队列支持重写不支持缓冲数据处理支持大量的插件也支持定制开发内置的插件可以开发Plugin进行扩展(Plugin更新需要重启)配置和使用增加了一定的架构复杂度无需额外部署
Ingest Pipeline
应用场景修复与增强写入数据。案例需求后期需要对Tags进行Aggregation统计。Tags字段中逗号分隔的文本应该是数组而不是一个字符串。
#Blog数据包含3个字段tags用逗号间隔
PUT tech_blogs/_doc/1
{title:Introducing big data......,tags:hadoop,elasticsearch,spark,content:You konw, for big data
}Pipeline Processor
Pipeline管道会对通过的数据(文档)按照顺序进行加工ProcessorElasticsearch 对一些加工的行为进行了抽象包装。
Elasticsearch 有很多内置的Processors也支持通过插件的方式实现自己的Processor。 https://www.elastic.co/guide/en/elasticsearch/reference/7.17/ingest-processors.html 一些内置的Processors
Split Processor将给定字段值分成一个数组Remove / Rename Processor移除一个重命名字段Append为商品增加一个新的标签Convert将商品价格从字符串转换成float类型Date / JSON日期格式转换字符串转JSON对象Date lndex Name Processor将通过该处理器的文档分配到指定时间格式的索引中Fail Processor一旦出现异常该Pipeline 指定的错误信息能返回给用户Foreach Process数组字段数组的每个元素都会使用到一个相同的处理器Grok Processor日志的日期格式切割Gsub / Join / Split字符串替换/数组转字符串/字符串转数组Lowercase / upcase大小写转换 # 测试split tags
POST _ingest/pipeline/_simulate
{pipeline: {description: to split blog tags,processors: [{split: {field: tags,separator: ,}}]},docs: [{_index: index,_id: 1,_source: {title: Introducing big data......,tags: hadoop,elasticsearch,spark,content: You konw, for big data}},{_index: index,_id: 2,_source: {title: Introducing cloud computering,tags: openstack,k8s,content: You konw, for cloud}}]
}#同时为文档增加一个字段。blog查看量
POST _ingest/pipeline/_simulate
{pipeline: {description: to split blog tags,processors: [{split: {field: tags,separator: ,}},{set:{field: views,value: 0}}]},docs: [{_index:index,_id:1,_source:{title:Introducing big data......,tags:hadoop,elasticsearch,spark,content:You konw, for big data}},{_index:index,_id:2,_source:{title:Introducing cloud computering,tags:openstack,k8s,content:You konw, for cloud}}]
}创建pipeline
# 为ES添加一个 Pipeline
PUT _ingest/pipeline/blog_pipeline
{description: a blog pipeline,processors: [{split: {field: tags,separator: ,}},{set:{field: views,value: 0}}]
}#查看Pipleline
GET _ingest/pipeline/blog_pipeline使用pipeline更新数据
#不使用pipeline更新数据
PUT tech_blogs/_doc/1
{title:Introducing big data......,tags:hadoop,elasticsearch,spark,content:You konw, for big data
}#使用pipeline更新数据
PUT tech_blogs/_doc/2?pipelineblog_pipeline
{title: Introducing cloud computering,tags: openstack,k8s,content: You konw, for cloud
}借助update_by_query更新已存在的文档
#update_by_query 会导致错误
POST tech_blogs/_update_by_query?pipelineblog_pipeline
{
}#增加update_by_query的条件
POST tech_blogs/_update_by_query?pipelineblog_pipeline
{query: {bool: {must_not: {exists: {field: views}}}}
}GET tech_blogs/_searchPainless Script
自Elasticsearch 5.x后引入专门为Elasticsearch 设计扩展了Java的语法。Painless支持所有Java 的数据类型及Java API子集。Painless Script具备以下特性
高性能/安全支持显示类型或者动态定义类型
Painless的用途
可以对文档字段进行加工处理更新或删除字段处理数据聚合操作Script Field对返回的字段提前进行计算Function Score对文档的算分进行处理在lngest Pipeline中执行脚本在Reindex APlUpdate By Query时对数据进行处理
通过Painless脚本访问字段
上下文语法Ingestionctx.field_nameUpdatectx._source.field_nameSearch Aggregationdoc[“field_name”]
测试 # 增加一个 Script Prcessor
POST _ingest/pipeline/_simulate
{pipeline: {description: to split blog tags,processors: [{split: {field: tags,separator: ,}},{script: {source: if(ctx.containsKey(content)){ctx.content_length ctx.content.length();}else{ctx.content_length0;}}},{set:{field: views,value: 0}}]},docs: [{_index:index,_id:1,_source:{title:Introducing big data......,tags:hadoop,elasticsearch,spark,content:You konw, for big data}},{_index:index,_id:2,_source:{title:Introducing cloud computering,tags:openstack,k8s,content:You konw, for cloud}}]
}DELETE tech_blogs
PUT tech_blogs/_doc/1
{title:Introducing big data......,tags:hadoop,elasticsearch,spark,content:You konw, for big data,views:0
}POST tech_blogs/_update/1
{script: {source: ctx._source.views params.new_views,params: {new_views:100}}
}# 查看views计数
POST tech_blogs/_search#保存脚本在 Cluster State
POST _scripts/update_views
{script:{lang: painless,source: ctx._source.views params.new_views}
}POST tech_blogs/_update/1
{script: {id: update_views,params: {new_views:1000}}
}GET tech_blogs/_search
{script_fields: {rnd_views: {script: {lang: painless,source: java.util.Random rnd new Random();doc[views].valuernd.nextInt(1000);}}},query: {match_all: {}}
}ES文档建模
Elasticsearch中如何处理关联关系
关系型数据库范式化Normalize设计的主要目标是减少不必要的更新往往会带来一些副作用
一个完全范式化设计的数据库会经常面临“查询缓慢”的问题。数据库越范式化就需要Join越多的表范式化节省了存储空间但是存储空间已经变得越来越便宜范式化简化了更新但是数据读取操作可能更多。
反范式化(Denormalize)的设计不使用关联关系而是在文档中保存冗余的数据拷贝。
优点无需处理Join操作数据读取性能好。Elasticsearch可以通过压缩_source字段减少磁盘空间的开销缺点不适合在数据频繁修改的场景。一条数据的改动可能会引起很多数据的更新
关系型数据库一般会考虑Normalize 数据在Elasticsearch往往考虑Denormalize 数据。Elasticsearch并不擅长处理关联关系一般会采用以下四种方法处理关联
对象类型嵌套对象(Nested Object)父子关联关系(Parent / Child )应用端关联
对象类型
案例1博客作者信息变更
对象类型
在每一博客的文档中都保留作者的信息如果作者信息发生变化需要修改相关的博客文档
DELETE blog
# 设置blog的 Mapping
PUT /blog
{mappings: {properties: {content: {type: text},time: {type: date},user: {properties: {city: {type: text},userid: {type: long},username: {type: keyword}}}}}
}# 插入一条 blog信息
PUT /blog/_doc/1
{content:I like Elasticsearch,time:2022-01-01T00:00:00,user:{userid:1,username:Fox,city:Changsha}
}# 查询 blog信息
POST /blog/_search
{query: {bool: {must: [{match: {content: Elasticsearch}},{match: {user.username: Fox}}]}}
}案例2包含对象数组的文档
DELETE /my_movies# 电影的Mapping信息
PUT /my_movies
{mappings : {properties : {actors : {properties : {first_name : {type : keyword},last_name : {type : keyword}}},title : {type : text,fields : {keyword : {type : keyword,ignore_above : 256}}}}}
}# 写入一条电影信息
POST /my_movies/_doc/1
{title:Speed,actors:[{first_name:Keanu,last_name:Reeves},{first_name:Dennis,last_name:Hopper}]
}# 查询电影信息
POST /my_movies/_search
{query: {bool: {must: [{match: {actors.first_name: Keanu}},{match: {actors.last_name: Hopper}}]}}}思考为什么会搜到不需要的结果存储时内部对象的边界并没有考虑在内JSON格式被处理成扁平式键值对的结构。当对多个字段进行查询时导致了意外的搜索结果。可以用Nested Data Type解决这个问题。
title:Speed
actor.first_name: [Keanu,Dennis]
actor.last_name: [Reeves,Hopper]嵌套对象(Nested Object)
什么是Nested Data Type
Nested数据类型允许对象数组中的对象被独立索引使用nested 和properties 关键字将所有actors索引到多个分隔的文档在内部Nested文档会被保存在两个Lucene文档中在查询时做Join处理
DELETE /my_movies
# 创建 Nested 对象 Mapping
PUT /my_movies
{mappings : {properties : {actors : {type: nested,properties : {first_name : {type : keyword},last_name : {type : keyword}}},title : {type : text,fields : {keyword:{type:keyword,ignore_above:256}}}}}
}POST /my_movies/_doc/1
{title:Speed,actors:[{first_name:Keanu,last_name:Reeves},{first_name:Dennis,last_name:Hopper}]
}# Nested 查询
POST /my_movies/_search
{query: {bool: {must: [{match: {title: Speed}},{nested: {path: actors,query: {bool: {must: [{match: {actors.first_name: Keanu}},{match: {actors.last_name: Hopper}}]}}}}]}}
}# Nested Aggregation
POST /my_movies/_search
{size: 0,aggs: {actors_agg: {nested: {path: actors},aggs: {actor_name: {terms: {field: actors.first_name,size: 10}}}}}
}# 普通 aggregation不工作
POST /my_movies/_search
{size: 0,aggs: {actors_agg: {terms: {field: actors.first_name,size: 10}}}
}父子关联关系(Parent / Child )
对象和Nested对象的局限性每次更新可能需要重新索引整个对象(包括根对象和嵌套对象)ES提供了类似关系型数据库中Join的实现。使用Join数据类型实现可以通过维护Parent/ Child的关系从而分离两个对象父文档和子文档是两个独立的文档更新父文档无需重新索引子文档。子文档被添加更新或者删除也不会影响到父文档和其他的子文档
设定 Parent/Child Mapping
DELETE /my_blogs# 设定 Parent/Child Mapping
PUT /my_blogs
{settings: {number_of_shards: 2},mappings: {properties: {blog_comments_relation: {type: join,relations: {blog: comment}},content: {type: text},title: {type: keyword}}}
}索引父文档
#索引父文档
PUT /my_blogs/_doc/blog1
{title:Learning Elasticsearch,content:learning ELK ,blog_comments_relation:{name:blog}
}#索引父文档
PUT /my_blogs/_doc/blog2
{title:Learning Hadoop,content:learning Hadoop,blog_comments_relation:{name:blog}
}索引子文档
#索引子文档
PUT /my_blogs/_doc/comment1?routingblog1
{comment:I am learning ELK,username:Jack,blog_comments_relation:{name:comment,parent:blog1}
}#索引子文档
PUT /my_blogs/_doc/comment2?routingblog2
{comment:I like Hadoop!!!!!,username:Jack,blog_comments_relation:{name:comment,parent:blog2}
}#索引子文档
PUT /my_blogs/_doc/comment3?routingblog2
{comment:Hello Hadoop,username:Bob,blog_comments_relation:{name:comment,parent:blog2}
}注意
父文档和子文档必须存在相同的分片上能够确保查询join的性能当指定子文档时候必须指定它的父文档ld。使用routing参数来保证分配到相同的分片
查询
# 查询所有文档
POST /my_blogs/_search#根据父文档ID查看
GET /my_blogs/_doc/blog2# Parent Id 查询
POST /my_blogs/_search
{query: {parent_id: {type: comment,id: blog2}}
}# Has Child 查询,返回父文档
POST /my_blogs/_search
{query: {has_child: {type: comment,query : {match: {username : Jack}}}}
}# Has Parent 查询返回相关的子文档
POST /my_blogs/_search
{query: {has_parent: {parent_type: blog,query : {match: {title : Learning Hadoop}}}}
}#通过ID 访问子文档
GET /my_blogs/_doc/comment3
#通过ID和routing 访问子文档
GET /my_blogs/_doc/comment3?routingblog2#更新子文档
PUT /my_blogs/_doc/comment3?routingblog2
{comment: Hello Hadoop??,blog_comments_relation: {name: comment,parent: blog2}
}嵌套文档 VS 父子文档
Nested ObjectParent / Child优点文档存储在一起读取性能高父子文档可以独立更新缺点更新嵌套的子文档时需要更新整个文档需要额外的内存维护关系。读取性能相对差适用场景子文档偶尔更新以查询为主子文档更新频繁
ElasticSearch数据建模最佳实践
如何处理关联关系
Object优先考虑反范式DenormalizationNested当数据包含多数值对象同时有查询需求Child/Parent关联文档更新非常频繁时
避免过多字段
一个文档中最好避免大量的字段过多的字段数不容易维护Mapping 信息保存在Cluster State 中数据量过大对集群性能会有影响删除或者修改数据需要reindex默认最大字段数是1000可以设置index.mapping.total_fields.limit限定最大字段数。
思考什么原因会导致文档中有成百上千的字段生产环境中尽量不要打开 Dynamic可以使用Strict控制新增字段的加入
true未知字段会被自动加入false新字段不会被索引但是会保存在_sourcestrict新增字段不会被索引文档写入失败
对于多属性的字段比如cookie商品属性可以考虑使用Nested
避免正则通配符前缀查询
正则通配符查询前缀查询属于Term查询但是性能不够好。特别是将通配符放在开头会导致性能的灾难案例针对版本号的搜索
# 将字符串转对象
PUT softwares/
{mappings: {properties: {version: {properties: {display_name: {type: keyword},hot_fix: {type: byte},marjor: {type: byte},minor: {type: byte}}}}}
}#通过 Inner Object 写入多个文档
PUT softwares/_doc/1
{version:{display_name:7.1.0,marjor:7,minor:1,hot_fix:0 }}PUT softwares/_doc/2
{version:{display_name:7.2.0,marjor:7,minor:2,hot_fix:0 }
}PUT softwares/_doc/3
{version:{display_name:7.2.1,marjor:7,minor:2,hot_fix:1 }
}# 通过 bool 查询
POST softwares/_search
{query: {bool: {filter: [{match:{version.marjor:7}},{match:{version.minor:2}}]}}
}
避免空值引起的聚合不准
# Not Null 解决聚合的问题
DELETE /scores
PUT /scores
{mappings: {properties: {score: {type: float,null_value: 0}}}
}PUT /scores/_doc/1
{score: 100
}
PUT /scores/_doc/2
{score: null
}POST /scores/_search
{size: 0,aggs: {avg: {avg: {field: score}}}
}为索引的Mapping加入Meta 信息
Mappings设置非常重要需要从两个维度进行考虑功能搜索、聚合、排序性能存储的开销、内存的开销、搜索的性能Mappings设置是一个迭代的过程加入新的字段很容易必要时需要update_by_query)更新删除字段不允许需要Reindex重建数据最好能对Mappings 加入Meta 信息更好的进行版本管理可以考虑将Mapping文件上传git进行管理
PUT /my_index
{mappings: {_meta: {index_version_mapping: 1.1}}
}ES读写性能调优
ES底层读写工作原理分析
写请求是写入 primary shard然后同步给所有的 replica shard读请求可以从 primary shard 或 replica shard 读取采用的是随机轮询算法。
ES写入数据的过程
客户端选择一个node发送请求过去这个node就是coordinating node (协调节点)coordinating node对document进行路由将请求转发给对应的nodenode上的primary shard处理请求然后将数据同步到replica nodecoordinating node如果发现primary node和所有的replica node都搞定之后就会返回请求到客户端 ES读取数据的过程
根据id查询数据的过程根据 doc id 进行 hash判断出来当时把 doc id 分配到了哪个 shard 上面去从那个 shard 去查询。
客户端发送请求到任意一个 node成为 coordinate node 。coordinate node 对 doc id 进行哈希路由(hash(_id)%shards_size)将请求转发到对应的 node此时会使用 round-robin 随机轮询算法在 primary shard 以及其所有 replica 中随机选择一个让读请求负载均衡。接收请求的 node 返回 document 给 coordinate node 。coordinate node 返回 document 给客户端。
根据关键词查询数据的过程:
客户端发送请求到一个 coordinate node 。协调节点将搜索请求转发到所有的 shard 对应的 primary shard 或 replica shard 都可以。query phase每个 shard 将自己的搜索结果返回给协调节点由协调节点进行数据的合并、排序、分页等操作产出最终结果。fetch phase接着由协调节点根据 doc id 去各个节点上拉取实际的 document 数据最终返回给客户端。
写数据底层原理
核心概念segment file存储倒排索引的文件每个segment本质上就是一个倒排索引每秒都会生成一个segment文件当文件过多时es会自动进行segment merge合并文件合并时会同时将已经标注删除的文档物理删除。commit point记录当前所有可用的segment每个commit point都会维护一个.del文件即每个.del文件都有一个commit point文件es删除数据本质是不属于物理删除当es做删改操作时首先会在.del文件中声明某个document已经被删除文件内记录了在某个segment内某个文档已经被删除当查询请求过来时在segment中被删除的文件是能够查出来的但是当返回结果时会根据commit point维护的那个.del文件把已经删除的文档过滤掉。translog日志文件为了防止elasticsearch宕机造成数据丢失保证可靠存储es会将每次写入数据同时写到translog日志中。os cache操作系统里面磁盘文件其实都有一个东西叫做os cache操作系统缓存就是说数据写入磁盘文件之前会先进入os cache先进入操作系统级别的一个内存缓存中去。Refresh
将文档先保存在Index buffer中以refresh_interval为间隔时间定期清空buffer生成 segment,借助文件系统缓存的特性先将segment放在文件系统缓存中并开放查询以提升搜索的实时性
Translog
Segment没有写入磁盘即便发生了宕机重启后数据也能恢复从ES6.0开始默认配置是每次请求都会落盘
Flush
删除旧的translog 文件生成Segment并写入磁盘│更新commit point并写入磁盘。ES自动完成可优化点不多
如何提升集群的读写性能
提升集群读取性能的方法
数据建模
尽量将数据先行计算然后保存到Elasticsearch 中。尽量避免查询时的 Script计算
#避免查询时脚本
GET blogs/_search
{query: {bool: {must: [{match: {title: elasticsearch}}],filter: {script: {script: {source: doc[title.keyword].value.length()5}}}}}
}尽量使用Filter Context利用缓存机制减少不必要的算分结合profileexplain API分析慢查询的问题持续优化数据模型避免使用*开头的通配符查询
GET /es_db/_search
{query: {wildcard: {address: {value: *白云*}}}
}优化分片
避免Over Sharing一个查询需要访问每一个分片分片过多会导致不必要的查询开销结合应用场景控制单个分片的大小Search20GBLogging50GBForce-merge Read-only索引使用基于时间序列的索引将只读的索引进行force merge减少segment数量
#手动force merge
POST /my_index/_forcemerge提升写入性能的方法
写性能优化的目标: 增大写吞吐量、越高越好客户端多线程、批量写可以通过性能测试确定最佳文档数量多线程需要观察是否有HTTP 429Too Many Requests返回实现 Retry以及线程数量的自动调节服务器端单个性能问题往往是多个因素造成的。需要先分解问题在单个节点上进行调整并且结合测试尽可能压榨硬件资源,以达到最高吞吐量使用更好的硬件。观察CPU / IO Block线程切/堆栈状况
服务器端优化写入性能的一些手段
降低IO操作使用ES自动生成的文档ld一些相关的ES配置如Refresh Interval降低 CPU 和存储开销减少不必要分词避免不需要旳doc_values文档的字段尽量保证相同的顺予可以提高文档的压缩率尽可能做到写入和分片的均衡负载实现水平扩展Shard Filtering / Write Load Balancer调整Bulk线程池和队列
注意ES 的默认设置已经综合考虑了数据可靠性搜索的实时性写入速度一般不要盲目修改。一切优化都要基于高质量的数据建模。
建模时的优化
只需要聚合不需要搜索index设置成false不要对字符串使用默认的dynamic mapping。字段数量过多会对性能产生比较大的影响Index_options控制在创建倒排索引时哪些内容会被添加到倒排索引中。
如果需要追求极致的写入速度可以牺牲数据可靠性及搜索实时性以换取性能
牺牲可靠性将副本分片设置为0写入完毕再调整回去牺牲搜索实时性增加Refresh Interval的时间牺牲可靠性修改Translog的配置
降低 Refresh的频率
增加refresh_interval 的数值。默认为1s 如果设置成-1会禁止自动refresh
避免过于频繁的refresh而生成过多的segment 文件但是会降低搜索的实时性
PUT /my_index/_settings
{index : {refresh_interval : 10s}
}增大静态配置参数indices.memory.index_buffer_size默认是10%会导致自动触发refresh
降低Translog写磁盘的频率但是会降低容灾能力
Index.translog.durability默认是request每个请求都落盘。设置成async异步写入lndex.translog.sync_interval设置为60s每分钟执行一次Index.translog.flush_threshod_size默认512 m可以适当调大。当translog 超过该值会触发flush
分片设定
副本在写入时设为0完成后再增加合理设置主分片数确保均匀分配在所有数据节点上Index.routing.allocation.total_share_per_node限定每个索引在每个节点上可分配的主分片数
调整Bulk 线程池和队列
客户端
单个bulk请求体的数据量不要太大官方建议大约5-15m写入端的 bulk请求超时需要足够长建议60s 以上写入端尽量将数据轮询打到不同节点。
服务器端
索引创建属于计算密集型任务应该使用固定大小的线程池来配置。来不及处理的放入队列线程数应该配置成CPU核心数1避免过多的上下文切换队列大小可以适当增加不要过大否则占用的内存会成为GC的负担ES线程池设置 https://blog.csdn.net/justlpf/article/details/103233215 DELETE myindex
PUT myindex
{settings: {index: {refresh_interval: 30s, #30s一次refreshnumber_of_shards: 2},routing: {allocation: {total_shards_per_node: 3 #控制分片避免数据热点}},translog: {sync_interval: 30s,durability: async #降低translog落盘频率},number_of_replicas: 0},mappings: {dynamic: false, #避免不必要的字段索引必要时可以通过update by query索引必要的字段properties: {}}
}