云南做网站多少钱,一呼百应推广平台,做的网站在百度找不到了,成都网站建设有名的Java在SpringCloud中自定义Gateway负载均衡策略
一、前言
spring-cloud-starter-netflix-ribbon已经不再更新了#xff0c;最新版本是2.2.10.RELEASE#xff0c;最后更新时间是2021年11月18日#xff0c;详细信息可以看maven官方仓库#xff1a;org.springframework.clou…Java在SpringCloud中自定义Gateway负载均衡策略
一、前言
spring-cloud-starter-netflix-ribbon已经不再更新了最新版本是2.2.10.RELEASE最后更新时间是2021年11月18日详细信息可以看maven官方仓库org.springframework.cloud/spring-cloud-starter-netflix-ribbonSpringCloud官方推荐使用spring-cloud-starter-loadbalancer进行负载均衡。
背景大文件上传做切片文件上传
流程将切片文件上传到服务器然后进行合并任务合并完成之后上传到对象存储现在服务搞成多节点以后网关默认走轮循但是相同的服务在不同的机器上这样就会导致切片文件散落在不同的服务器上会导致文件合并失败所以根据一个标识去自定义gateway对应服务的负载均衡策略可以解决这个问题
我的版本如下
spring-boot.version2.7.3/spring-boot.version spring-cloud.version2021.0.4/spring-cloud.version spring-cloud-alibaba.version2021.0.4.0/spring-cloud-alibaba.version
二、参考默认实现
springCloud原生默认的负载均衡策略是这个类 org.springframework.cloud.loadbalancer.core.RoundRobinLoadBalancer 我们参考这个类实现自己的负载均衡策略即可RoundRobinLoadBalancer实现了ReactorServiceInstanceLoadBalancer这个接口实现了choose这个方法如下图 在choose方法中调用了processInstanceResponse方法processInstanceResponse方法中调用了getInstanceResponse方法所以我们我们可以复制RoundRobinLoadBalancer整个类只修改getInstanceResponse这个方法里的内容就可以实现自定义负载均衡策略。
三、实现代码
原理根据请求头当中设备的唯一标识传递到下游唯一标识做哈希取余可以指定对应的服务器节点需要的服务设置自定义负载策略不需要的服务设置默认的轮循机制即可
package com.wondertek.gateway.loadBalancer;import cn.hutool.core.util.ObjectUtil;
import com.wondertek.web.exception.enums.HttpRequestHeaderEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
Slf4j
Component
public class RequestFilter implements GlobalFilter, Ordered {Overridepublic int getOrder() {// 应该小于LoadBalancerClientFilter的顺序值return Ordered.HIGHEST_PRECEDENCE;}Overridepublic MonoVoid filter(ServerWebExchange exchange, GatewayFilterChain chain) {ServerHttpRequest request exchange.getRequest();String clientDeviceUniqueCode request.getHeaders().getFirst(HttpRequestHeaderEnum.CLIENT_DEVICE_UNIQUE_CODE.getCode());// 存入Reactor上下文String resultCode clientDeviceUniqueCode;return chain.filter(exchange).contextWrite(context - {if (ObjectUtil.isNotEmpty(resultCode)) {log.info(开始将request中的唯一标识封装到上下游中:{}, resultCode);return context.put(identification, resultCode);} else {// 或者根据需求进行其他处理return context;}});}
}
package com.wondertek.gateway.loadBalancer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import reactor.core.publisher.Mono;import java.util.*;
import java.util.concurrent.ThreadLocalRandom;Slf4j
public class ClientDeviceUniqueCodeInstanceLoadBalancer implements ReactorServiceInstanceLoadBalancer {private final String serviceId;private ObjectProviderServiceInstanceListSupplier serviceInstanceListSupplierProvider;public ClientDeviceUniqueCodeInstanceLoadBalancer(ObjectProviderServiceInstanceListSupplier serviceInstanceListSupplierProvider, String serviceId) {this.serviceId serviceId;this.serviceInstanceListSupplierProvider serviceInstanceListSupplierProvider;}Overridepublic MonoResponseServiceInstance choose(Request request) {//在 choose 方法中使用 deferContextual 方法来访问上下文并提取客户端标识。这里的 getOrDefault 方法尝试从上下文中获取一个键为 identification 的值如果不存在则返回 default-identificationreturn Mono.deferContextual(contextView - {String identification contextView.getOrDefault(identification, 14d58a1ba286f087d9736249ec785314);log.info(上下游获取到的identification的值为:{}, identification);ServiceInstanceListSupplier supplier serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);return supplier.get(request).next().map(serviceInstances - processInstanceResponse(supplier, serviceInstances, identification));});}private ResponseServiceInstance processInstanceResponse(ServiceInstanceListSupplier supplier, ListServiceInstance serviceInstances, String identification) {ResponseServiceInstance serviceInstanceResponse;if (Objects.isNull(identification)) {serviceInstanceResponse this.getInstanceResponse(serviceInstances, null);} else {serviceInstanceResponse this.getIpInstanceResponse(serviceInstances, identification);}if (supplier instanceof SelectedInstanceCallback serviceInstanceResponse.hasServer()) {((SelectedInstanceCallback) supplier).selectedServiceInstance((ServiceInstance) serviceInstanceResponse.getServer());}return serviceInstanceResponse;}private ResponseServiceInstance getInstanceResponse(ListServiceInstance instances, String identification) {if (instances.isEmpty()) {if (log.isWarnEnabled()) {log.warn(No servers available for service: this.serviceId);}return new EmptyResponse();} else {int index ThreadLocalRandom.current().nextInt(instances.size());ServiceInstance instance (ServiceInstance) instances.get(index);return new DefaultResponse(instance);}}private ResponseServiceInstance getIpInstanceResponse(ListServiceInstance instances, String identification) {if (instances.isEmpty()) {log.warn(No servers available for service: this.serviceId);return new EmptyResponse();} else if (instances.size() 1) {log.info(只有一个服务实例直接返回这个实例);return new DefaultResponse(instances.get(0));} else {//创建一个新的列表以避免在原始列表上排序,避免了修改共享状态可能带来的线程安全问题ListServiceInstance sortedInstances new ArrayList(instances);// 现在对新列表进行排序保持原始列表的顺序不变Collections.sort(sortedInstances, Comparator.comparing(ServiceInstance::getHost));//log.info(获取到的实例个数的值为:{}, sortedInstances.size());sortedInstances.forEach(instance - log.info(排序后的实例: {},{}, instance.getHost(), instance.getPort()));//log.info(多个服务实例使用客户端 identification 地址的哈希值来选择服务实例);// 使用排序后的列表来找到实例int ipHashCode Math.abs(identification.hashCode());//log.info(identificationHashCode的值为:{}, ipHashCode);int instanceIndex ipHashCode % sortedInstances.size();//log.info(instanceIndex的值为:{}, instanceIndex);ServiceInstance instanceToReturn sortedInstances.get(instanceIndex);//log.info(instanceToReturn.getUri()的值为:{}, instanceToReturn.getUri());log.info(自定义identification负载机制,Client identification: {} is routed to instance: {}:{}, identification, instanceToReturn.getHost(), instanceToReturn.getPort());return new DefaultResponse(instanceToReturn);}}}
package com.wondertek.gateway.loadBalancer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import reactor.core.publisher.Mono;import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;Slf4j
public class DefaultInstanceLoadBalancer implements ReactorServiceInstanceLoadBalancer {private final String serviceId;private ObjectProviderServiceInstanceListSupplier serviceInstanceListSupplierProvider;final AtomicInteger position;public DefaultInstanceLoadBalancer(ObjectProviderServiceInstanceListSupplier serviceInstanceListSupplierProvider, String serviceId, AtomicInteger position) {this.serviceId serviceId;this.serviceInstanceListSupplierProvider serviceInstanceListSupplierProvider;this.position position;}Overridepublic MonoResponseServiceInstance choose(Request request) {ServiceInstanceListSupplier supplier serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);return supplier.get(request).next().map(serviceInstances - processInstanceResponse(supplier, serviceInstances));}private ResponseServiceInstance processInstanceResponse(ServiceInstanceListSupplier supplier,ListServiceInstance serviceInstances) {ResponseServiceInstance serviceInstanceResponse getInstanceResponse(serviceInstances);if (supplier instanceof SelectedInstanceCallback serviceInstanceResponse.hasServer()) {((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());}return serviceInstanceResponse;}private ResponseServiceInstance getInstanceResponse(ListServiceInstance instances) {if (instances.isEmpty()) {if (log.isWarnEnabled()) {log.warn(No servers available for service: serviceId);}return new EmptyResponse();}//创建一个新的列表以避免在原始列表上排序,避免了修改共享状态可能带来的线程安全问题ListServiceInstance sortedInstances new ArrayList(instances);// 现在对新列表进行排序保持原始列表的顺序不变Collections.sort(sortedInstances, Comparator.comparing(ServiceInstance::getHost));//log.info(获取到的实例个数的值为:{}, sortedInstances.size());sortedInstances.forEach(instance - log.info(排序后的实例: {},{}, instance.getHost(), instance.getPort()));int pos Math.abs(this.position.incrementAndGet());//log.info(默认轮循机制,pos递加后的值为:{}, pos);int positionIndex pos % instances.size();//log.info(取余后的positionIndex的值为:{}, positionIndex);ServiceInstance instance instances.get(positionIndex);//log.info(instance.getUri()的值为:{}, instance.getUri());log.info(默认轮循机制,routed to instance: {}:{},instance.getHost(), instance.getPort());return new DefaultResponse(instance);}}
package com.wondertek.gateway.loadBalancer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClient;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClients;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;import java.util.concurrent.atomic.AtomicInteger;Configuration
//单台服务
//LoadBalancerClient(name oms-api, configuration CustomLoadBalancerConfig.class)
//多台服务
LoadBalancerClients({LoadBalancerClient(name oms-api, configuration CustomLoadBalancerConfig.class),LoadBalancerClient(name unity-api, configuration CustomLoadBalancerConfig.class),LoadBalancerClient(name cloud-api, configuration CustomLoadBalancerConfig.class),LoadBalancerClient(name open-api, configuration CustomLoadBalancerConfig.class),LoadBalancerClient(name server-api, configuration CustomLoadBalancerConfig.class),LoadBalancerClient(name center-service, configuration CustomLoadBalancerConfig.class),
})
Slf4j
public class CustomLoadBalancerConfig {// 定义一个Bean来提供AtomicInteger的实例Beanpublic AtomicInteger positionTracker() {// 这将在应用上下文中只初始化一次return new AtomicInteger(0);}//自定义优先级负载均衡器Beanpublic ReactorServiceInstanceLoadBalancer customPriorityLoadBalancer(ObjectProviderServiceInstanceListSupplier serviceInstanceListSupplierProvider,Environment environment,AtomicInteger positionTracker) {String serviceId environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);//目的为解决文件上传切片文件分散上传的问题if (oms-api.equals(serviceId)||unity-api.equals(serviceId)||cloud-api.equals(serviceId)){//log.info(服务名称:serviceId:{},走自定义clientDeviceUniqueCode负载模式, serviceId);return new ClientDeviceUniqueCodeInstanceLoadBalancer(serviceInstanceListSupplierProvider, serviceId);}//log.info(服务名称:serviceId:{},走默认负载模式, serviceId);return new DefaultInstanceLoadBalancer(serviceInstanceListSupplierProvider, serviceId,positionTracker);}
}
【SpringCloud系列】开发环境下重写Loadbalancer实现自定义负载均衡