当前位置: 首页 > news >正文

盐城网站建设百度2020新版下载

盐城网站建设,百度2020新版下载,晋城手机网站建设,用eclipse编程做网站张海东#xff0c; ‍多点生活#xff08;成都#xff09;云原生开发工程师。本篇主要介绍 Pilot 源码中的 ServiceEntryStore 及其推送 xDS 的流程。本文为 Istio Pilot 源码分析系列的第二篇文章。Istio Pilot 源码分析#xff08;一#xff09;了解了 Pilot 源码的基本… 张海东 ‍多点生活成都云原生开发工程师。本篇主要介绍 Pilot 源码中的 ServiceEntryStore 及其推送 xDS 的流程。本文为 Istio Pilot 源码分析系列的第二篇文章。Istio Pilot 源码分析一了解了 Pilot 源码的基本结构和启动流程之后我们可以深入探索 Pilot 究竟是怎么下发 xDS 协议的以及协议的生成逻辑。相信大家都会有这些疑问控制面与数据面详细的交互过程是什么到底什么时候才会增量推送增量推送判断的逻辑是什么非 Kubernetes 原生的服务如存在于虚拟机的服务、 Dubbo 服务等到底是怎么注册并且经过一系列转化下发至数据面的带着这些问题开始我们今天对 Pilot 的探索。注本文基于 istio release-1.7 分支分析其他版本的代码结构会有所不同。ServiceEntryStore在多点落地 ServiceMesh 的过程中大量的用到了 ServiceEntry 每一个 Dubbo 服务都会映射一个 ServiceEntry 创建在 Kubernetes 里。 ServiceEntry 的作用就是将集群外部的服务注册到 Pilot 中再统一由 ServiceController 进行管理。相应的管理外部服务实例的对象为 WorkloadEntry  ServiceEntry 可以通过 LabelSelector 筛选出自身对应的实例。ServiceEntry 是作为 CR (Custome Resource) 保存在 Kubernetes 集群里的也可以通过 MCP 服务直接发送给 Pilot 暂时只讨论在集群中创建 CR 的情况。在上一篇源码分析中我们介绍到 Pilot 是通过 ConfigController 来监听创建在集群中的 CR 的 ServiceEntry 也不例外保存这些 CR 的 ConfigStore 会被转化为 ServiceEntryStore 中的 store 转化的详情见上一篇源码分析这就是最终 Pilot 存储 ServiceEntry 的地方。当监听的资源推送更改的事件时会触发 ServiceEntryStore 对应的 handler 处理后续的流程。我们先来看一下 ServiceEntryStore 的结构和它提供的方法// istio/pilot/pkg/serviceregistry/serviceentry/servicediscovery.go:61 // ServiceEntryStore communicates with ServiceEntry CRDs and monitors for changes type ServiceEntryStore struct {XdsUpdater model.XDSUpdater // 用来接收 EnvouXdsServer 的接口主要用来 Push 相应的 xDS 更新请求store model.IstioConfigStore // 保存 ServiceEntry 实例的地方storeMutex sync.RWMutex // 读写 store 时需要的锁// 以 hostname/namespace 以及类型是服务还是实例等作为索引的服务实例表instances map[instancesKey]map[configKey][]*model.ServiceInstance// seWithSelectorByNamespace 保存了每个 namespace 里所有的 ServiceEntry也是作为一个索引供 handler 使用seWithSelectorByNamespace map[string][]servicesWithEntryrefreshIndexes bool... } 可以看到除了 XdsUpdater 和 store 两个必须的结构外其余大部分都是些资源的缓存和索引索引键不同为后续 handler 处理事件时提供便利。除了结构还需要关注两个比较重要的 handler :// WorkloadEntry 变化时的处理逻辑 func (s *ServiceEntryStore) workloadEntryHandler(old, curr model.Config, event model.Event) {} // ServiceEntry 变化时的处理逻辑 func (s *ServiceEntryStore) serviceEntryHandler(old, curr model.Config, event model.Event) {} 这两个 handler 的业务逻辑后文中再详细讨论先来回忆下 ServiceEntryStore 的初始化流程img在 Server 初始化 ServiceController 的时候通过调用 NewServiceDiscovery() 方法初始化 ServiceEntryStore 这里除了将 EnvoyXdsServer 和 IstioConfigStore 与 ServiceEntryStore 关联起来外最重要的就是向 ConfigController 注册了 ServiceEntry 和 WorkloadEntry 的事件 Handler:func NewServiceDiscovery(configController model.ConfigStoreCache, store model.IstioConfigStore, xdsUpdater model.XDSUpdater) *ServiceEntryStore {s : ServiceEntryStore{XdsUpdater: xdsUpdater,store: store,ip2instance: map[string][]*model.ServiceInstance{},instances: map[instancesKey]map[configKey][]*model.ServiceInstance{},workloadInstancesByIP: map[string]*model.WorkloadInstance{},refreshIndexes: true,}if configController ! nil {configController.RegisterEventHandler(gvk.ServiceEntry, s.serviceEntryHandler)configController.RegisterEventHandler(gvk.WorkloadEntry, s.workloadEntryHandler)}return s } 这样在 ConfigController 监听到资源变化的时候就会调用 serviceEntryHandler 和 workloadEntryHandler 来处理事件了。这两个 handler 的目的都是向 EnvoyXdsServer 推送相应的 xDS 资源变化。workloadEntryHandler首先来分析服务实例 WorkloadEntry 的更新是如何下发 xDS 的imgseWithSelectorByNamespace 和 instances 如上述 ServiceEntryStore 结构介绍中的注释前者缓存了各个 namespace 中所有的 ServiceEntry 后者则是所有服务节点 WorkloadEntry 的缓存。当有新的 WorkloadEntry 变化时先从 seWithSelectorByNamespace 中读取同一 namespace 中的 ServiceEntry 遍历它们并与 WorkloadEntry 的 Label 进行比对确定是关联的服务后依据获取的服务创建 ServiceInstance 。 ServiceInstance 是 Pilot 抽象出的描述具体服务对应实例的结构:type ServiceInstance struct {Service *Service json:service,omitemptyServicePort *Port json:servicePort,omitemptyEndpoint *IstioEndpoint json:endpoint,omitempty } 创建了新的 ServiceInstance 后需要及时更新实例的索引表 s.instances :if event ! model.EventDelete {s.updateExistingInstances(key, instances) } else {s.deleteExistingInstances(key, instances) } 之后将新创建的 ServiceInstance 传入 ServiceEntryStore 专门处理 EDS 的函数 s.edsUpdate() 。在做进一步处理时需要再刷新一遍索引表调用 maybeRefreshIndexes() 避免其他协程的工作导致索引表更新不及时完成后开启读锁从服务实例索引表 s.Instances 中查找我们要处理的实例。如果是删除事件先前更新索引表的时候已经删除了所以这里是查不到 allInstances 的直接向 EnvouXdsServer 发送删除 EDS 的请求。// edsUpdate triggers an EDS update for the given instances func (s *ServiceEntryStore) edsUpdate(instances []*model.ServiceInstance) {allInstances : []*model.ServiceInstance{}// Find all keys we need to lookupkeys : map[instancesKey]struct{}{}for _, i : range instances {keys[makeInstanceKey(i)] struct{}{}}s.maybeRefreshIndexes()s.storeMutex.RLock()for key : range keys {for _, i : range s.instances[key] {allInstances append(allInstances, i...)}}s.storeMutex.RUnlock()// This was a deleteif len(allInstances) 0 {for k : range keys {_ s.XdsUpdater.EDSUpdate(s.Cluster(), string(k.hostname), k.namespace, nil)}return}... } 如果实例有更新则直接发送更新 EDS 的请求// edsUpdate triggers an EDS update for the given instances func (s *ServiceEntryStore) edsUpdate(instances []*model.ServiceInstance) {...endpoints : make(map[instancesKey][]*model.IstioEndpoint)for _, instance : range allInstances {port : instance.ServicePortkey : makeInstanceKey(instance)endpoints[key] append(endpoints[key],model.IstioEndpoint{Address: instance.Endpoint.Address,EndpointPort: instance.Endpoint.EndpointPort,ServicePortName: port.Name,Labels: instance.Endpoint.Labels,UID: instance.Endpoint.UID,ServiceAccount: instance.Endpoint.ServiceAccount,Network: instance.Endpoint.Network,Locality: instance.Endpoint.Locality,LbWeight: instance.Endpoint.LbWeight,TLSMode: instance.Endpoint.TLSMode,})}for k, eps : range endpoints {_ s.XdsUpdater.EDSUpdate(s.Cluster(), string(k.hostname), k.namespace, eps)} } 完整的 workloadEntryHandler() 代码如下func (s *ServiceEntryStore) workloadEntryHandler(old, curr model.Config, event model.Event) {wle : curr.Spec.(*networking.WorkloadEntry)key : configKey{kind: workloadEntryConfigType,name: curr.Name,namespace: curr.Namespace,}...s.storeMutex.RLock()// We will only select entries in the same namespaceentries : s.seWithSelectorByNamespace[curr.Namespace]s.storeMutex.RUnlock()// if there are no service entries, return now to avoid taking unnecessary locksif len(entries) 0 {return}log.Debugf(Handle event %s for workload entry %s in namespace %s, event, curr.Name, curr.Namespace)instances : []*model.ServiceInstance{}for _, se : range entries {workloadLabels : labels.Collection{wle.Labels}if !workloadLabels.IsSupersetOf(se.entry.WorkloadSelector.Labels) {// Not a match, skip this onecontinue}instance : convertWorkloadEntryToServiceInstances(wle, se.services, se.entry)instances append(instances, instance...)}if event ! model.EventDelete {s.updateExistingInstances(key, instances)} else {s.deleteExistingInstances(key, instances)}s.edsUpdate(instances) } 接下来就是 EnvoyXdsServer 来处理这次 EDS 的更新请求了。首先 EnvoyXdsServer 会判断此次 EDS 更新是全量下发还是增量下发然后创建 PushRequest 发送至 EnvoyXdsServer 统一用来接收推送请求的 pushChannel 。func (s *DiscoveryServer) EDSUpdate(clusterID, serviceName string, namespace string,istioEndpoints []*model.IstioEndpoint) error {inboundEDSUpdates.Increment()// 判断是否是全量下发fp : s.edsUpdate(clusterID, serviceName, namespace, istioEndpoints)s.ConfigUpdate(model.PushRequest{Full: fp,ConfigsUpdated: map[model.ConfigKey]struct{}{{Kind: gvk.ServiceEntry,Name: serviceName,Namespace: namespace,}: {}},Reason: []model.TriggerReason{model.EndpointUpdate},})return nil } pushChannel 后续的处理流程和 EDS 是否增量更新将在下文讨论 EnvoyXdsServer 的时候再分析这里不再赘述。serviceEntryHandler了解了 WorkloadEntry 的更新是如何处理之后我们再来看下 serviceEntryHandler 是如何处理 ServiceEntry 的imgserviceEntryHandler 会将 ServiceEntry 转化为一组 Pilot 内部抽象的服务每个不同的 Hosts 、 Address 都会对应一个 Service 并且初始化一个名为 configsUpdated 的 map 来保存是否有 ServiceEntry 需要更新以及创建了多个 slice 分别保存该新增、删除、更新和没有变化的服务func (s *ServiceEntryStore) serviceEntryHandler(old, curr model.Config, event model.Event) {cs : convertServices(curr)configsUpdated : map[model.ConfigKey]struct{}{}var addedSvcs, deletedSvcs, updatedSvcs, unchangedSvcs []*model.Service... } 根据不同的事件类型更新不同的 slice :switch event { case model.EventUpdate:os : convertServices(old)if selectorChanged(old, curr) {// Consider all services are updated.mark : make(map[host.Name]*model.Service, len(cs))for _, svc : range cs {mark[svc.Hostname] svcupdatedSvcs append(updatedSvcs, svc)}for _, svc : range os {if _, f : mark[svc.Hostname]; !f {updatedSvcs append(updatedSvcs, svc)}}} else {addedSvcs, deletedSvcs, updatedSvcs, unchangedSvcs servicesDiff(os, cs)} case model.EventDelete:deletedSvcs cs case model.EventAdd:addedSvcs cs default:// this should not happenunchangedSvcs cs } 比较特别的是当事件为更新事件时会和老的 Service 列表进行比对。先看是否有某个服务的 Selector 发生了变化如果发生了变化需要将新老服务列表里的所有服务都加入到更新列表中。如果 Selector 没有发生变化通过 serviceDiff() 挨个比对新老服务列表中的服务对应保存至新增、删除、更新和未变化的 slice 中。将服务归类后把需要变化的服务都写入 configsUpdated 中for _, svcs : range [][]*model.Service{addedSvcs, deletedSvcs, updatedSvcs} {for _, svc : range svcs {configsUpdated[model.ConfigKey{Kind: gvk.ServiceEntry,Name: string(svc.Hostname),Namespace: svc.Attributes.Namespace}] struct{}{}} } 由于 serviceDiff() 只会比对 Service 结构并不会对比 Endpoints 是否变化所以当有 unchangedSvcs 时可能需要对这些服务的 xDS 做增量更新只更新 EDS 也可能是全量更新。什么时候会全量更新呢当服务的 Resolution 为 DNS 时可以阅读文档了解 Resolution[1]  Endpoint 的 address 都是全域名需要更新 CDS 才行。if len(unchangedSvcs) 0 {// If this service entry had endpoints with IPs (i.e. resolution STATIC), then we do EDS update.// If the service entry had endpoints with FQDNs (i.e. resolution DNS), then we need to do// full push (as fqdn endpoints go via strict_dns clusters in cds).currentServiceEntry : curr.Spec.(*networking.ServiceEntry)oldServiceEntry : old.Spec.(*networking.ServiceEntry)if currentServiceEntry.Resolution networking.ServiceEntry_DNS {if !reflect.DeepEqual(currentServiceEntry.Endpoints, oldServiceEntry.Endpoints) {// fqdn endpoints have changed. Need full pushfor _, svc : range unchangedSvcs {configsUpdated[model.ConfigKey{Kind: gvk.ServiceEntry,Name: string(svc.Hostname),Namespace: svc.Attributes.Namespace}] struct{}{}}}} } 当 unchangedSvcs 的 Resolution 为 STATIC 时只需要增量的更新 EDS 即可if len(unchangedSvcs) 0 !fullPush {// IP endpoints in a STATIC service entry has changed. We need EDS update// If will do full-push, leave the edsUpdate to that.// XXX We should do edsUpdate for all unchangedSvcs since we begin to calculate service// data according to this configsUpdated and thus remove the !willFullPush condition.instances : convertInstances(curr, unchangedSvcs)key : configKey{kind: serviceEntryConfigType,name: curr.Name,namespace: curr.Namespace,}// If only instances have changed, just update the indexes for the changed instances.s.updateExistingInstances(key, instances)s.edsUpdate(instances)return } 如果 configsUpdated 中有值则需要做 fullPush 先更新这些服务的 EDS 再向 pushChannel 发送 fullPush 的 PushRequest :if fullPush {// When doing a full push, for added and updated services trigger an eds update// so that endpoint shards are updated.var instances []*model.ServiceInstanceif len(addedSvcs) 0 {instances append(instances, convertInstances(curr, addedSvcs)...)}if len(updatedSvcs) 0 {instances append(instances, convertInstances(curr, updatedSvcs)...)}if len(unchangedSvcs) 0 {currentServiceEntry : curr.Spec.(*networking.ServiceEntry)oldServiceEntry : old.Spec.(*networking.ServiceEntry)// Non DNS service entries are sent via EDS. So we should compare and update if such endpoints change.if currentServiceEntry.Resolution ! networking.ServiceEntry_DNS {if !reflect.DeepEqual(currentServiceEntry.Endpoints, oldServiceEntry.Endpoints) {instances append(instances, convertInstances(curr, unchangedSvcs)...)}}}s.edsUpdate(instances)// If service entry is deleted, cleanup endpoint shards for services.for _, svc : range deletedSvcs {s.XdsUpdater.SvcUpdate(s.Cluster(), string(svc.Hostname), svc.Attributes.Namespace, model.EventDelete)}pushReq : model.PushRequest{Full: true,ConfigsUpdated: configsUpdated,Reason: []model.TriggerReason{model.ServiceUpdate},}s.XdsUpdater.ConfigUpdate(pushReq) } 至此 ServiceEntryStore 是如何处理 ServiceEntry 和 WorkloadEntry 的逻辑就介绍完了。其余像 ServiceEntry 选择集群内的 Pods 、 Kubernetes 原生 Service 选择 WorkloadEntry 的用法读者感兴趣可以自行研究相关源码。其余注册中心的处理逻辑如 kube 、 mcp 等可继续关注本系列的其他文章。读者也可以自行尝试走读分析// 相关源码目录 kube: pilot/pkg/serviceregistry/kube mcp: pilot/pkg/serviceregistry/mcp 接下来我们介绍 Pilot Server 中的核心 EnvoyXdsServer 。引用链接[1] Resolution: https://istio.io/latest/docs/reference/config/networking/service-entry/#ServiceEntry-Resolution云原生社区 Istio SIG云原生社区是 ServiceMesher 的姊妹社区扫码下面二维码回复 0924 即可加入云原生社区 Istio SIG和包括本文作者一起交流 Istio 经验。
http://www.pierceye.com/news/405774/

相关文章:

  • 网站设计的工具盱眙在仕德伟做网站的有几家
  • 建设一个网站要花多少时间临沂网站网站建设
  • 南宁网站推广经理做动漫网站如何应用数据绑定
  • 眼镜东莞网站建设兰州公司做网站
  • 改成 响应式 网站重庆微信企业网站
  • 用微信怎么做商城网站微信官网下载安装
  • 汽车网站建设方案预算md风格的wordpress主题
  • 免费外贸网站模板dede 网站栏目管理
  • 做网站有包括哪些东西站长素材网
  • 淘宝做促销的网站网站开发报价清单
  • 备案查询网站网站建设中可能遇到的问题
  • 怎么注册网站的步骤快速建站官网
  • 网站怎么做口碑wordpress淘宝客知乎
  • 响应式网站建设信息网站建设宽带
  • ps如何做网站超级链接微信公众平台运营中心电话
  • 网站建设怎么估算费用和报价h5特效网站欣赏
  • 东软集团建设网站娱乐网站排行榜
  • 石家庄网站建站米拓建站官网怎么用不了
  • 推广seo网站的公司金华网站建设公司排名
  • 阿里巴巴网站工作流程网站建设 教学设计
  • 电子商务网站建设的方法怎样用织梦做音乐网站
  • 临夏州住房和城乡建设局网站出词
  • 企业网站的综合要求最新领导班子7人名单
  • 通过阿里云建设企业网站联想企业网站建设的思路
  • 网站建设服务器的选择方案建设报名系统是正规网站吗
  • 揭阳高端模板建站WordPress背景音乐6
  • 如何使用云服务建设网站cpa之家 app推广平台
  • 网站设计策划书案例漳浦建设局网站
  • ps做分享类网站效果图设计公司工作室创业规划
  • 个人虚拟机做网站设计实例网站