侧边栏壁纸
博主头像
qingtian博主等级

喜欢是一件细水流长的事,是永不疲惫的双向奔赴~!

  • 累计撰写 104 篇文章
  • 累计创建 48 个标签
  • 累计收到 1 条评论

SpringCloud Hystrix微服务容错

qingtian
2022-04-25 / 0 评论 / 0 点赞 / 534 阅读 / 22,141 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-04-25,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

SpringCloud Hystrix

Hystrix的设计目标

  • Hystrix是一个库,通过添加延迟容忍和容错逻辑,帮助你控制这些分布式服务之间的交互
  • Hystrix通过隔离服务之间的访问点、停止级联失败和提供回退选项来实现服务之间的容错
  • Hystrix有四大设计目标
    1. 对客户端访问的延迟和故障进行保护和控制
    2. 在复杂分布式系统中阻止级联故障
    3. 快速失败,快速恢复
    4. 兜底回退,尽可能的优雅降级

Hystrix解决了什么问题

  • 复杂分布式系统中服务之间存在许多依赖项,依赖项可能会存在故障,如果不做故障隔离,整个服务可能会被拖垮

Hystrix是如何实现它的目标的

  • 对依赖项(服务)进行包装代理,不直接与依赖项交互
  • 调用超时时间允许自行设定,超时后立即熔断报错
  • 每一个依赖项都在自己的空间内(线程池或信号量隔离),依赖项之间不存在干扰
  • 请求依赖项失败后,可以选择出错或是兜底回退

使用注解的方式实现服务的容错、降级

Hystrix的断路器模式和后备策略模式

Hystrix的舱壁模式

  • Hystrix 是通过线程池管理调用外部资源的,默认情况下所有服务调用的都是一个公用线程池。

  • 一个性能低下的服务会耗尽Hystrix的线程池资源。进而牵连到其他的远程调用,最后也会耗尽Java容器的资源。

  • 舱壁模式的功能:我们可以为各个服务分别指定线程池

@EnableDiscoveryClient  
@SpringBootApplication  
@EnableFeignClients  
@RefreshScope  
@EnableCircuitBreaker //启用 Hysrix
public class NacosClientApplication {  
  
    public static void main(String[] args) {  
  
        SpringApplication.run(NacosClientApplication.class,args);  
 }  
}

使用@HystrixCommand来配置

/**
 * @author qingtian
 * @version 1.0
 * @description: 使用 HystrixCommand 注解
 * @date 2022/3/23 23:10
 */
@Slf4j
@Service
public class UseHystrixCommandAnnotation {

    @Autowired
    private NacosClientService nacosClientService;

    @HystrixCommand(
            // 用于对 Hystrix 命令进行分组, 分组之后便于统计展示于仪表盘、上传报告和预警等等
            // 内部进行度量统计时候的分组标识, 数据上报和统计的最小维度就是 groupKey
            groupKey = "NacosClientService",
            // HystrixCommand 的名字, 默认是当前类的名字, 主要方便 Hystrix 进行监控、报警等
            commandKey = "NacosClientService",
            // 舱壁模式
            threadPoolKey = "NacosClientService",
            // 后备模式
            fallbackMethod = "getNacosClientInfoFallback",
            // 断路器模式
            commandProperties = {
                    // 超时时间, 单位毫秒, 超时进 fallback
                    @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "1500"),
                    // 判断熔断的最少请求数, 默认是10; 只有在一定时间内请求数量达到该值, 才会进行成功率的计算
                    @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "10"),
                    // 熔断的阈值默认值 50, 表示在一定时间内有50%的请求处理失败, 会触发熔断
                    @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "10"),
            },
            // 舱壁模式
            threadPoolProperties = {
                    @HystrixProperty(name = "coreSize", value = "30"),
                    @HystrixProperty(name = "maxQueueSize", value = "101"),
                    @HystrixProperty(name = "keepAliveTimeMinutes", value = "2"),
                    @HystrixProperty(name = "queueSizeRejectionThreshold", value = "15"),
                    // 在时间窗口中, 收集统计信息的次数; 在 1440ms 的窗口中一共统计 12 次
                    @HystrixProperty(name = "metrics.rollingStats.numBuckets", value = "12"),
                    // 时间窗口, 从监听到第一次失败开始计时
                    @HystrixProperty(name = "metrics.rollingStats.timeInMilliseconds", value = "1440")
            }
    )
    public List<ServiceInstance> getNacosClientInfo(String serviceId) {
        log.info("use hystrux command annotation to get nacos client info : [{}],[{}]",
                serviceId,Thread.currentThread().getName());
        return nacosClientService.getNacosClientInfo(serviceId);
    }

    /**
     * getNacosClientInfo 方法的兜底策略
     * @param serviceId
     * @return
     */
    public List<ServiceInstance> getNacosClientInfoFallback(String serviceId) {
        log.warn("can not get nacos client, trigger hystrix fallback : [{}],[{}]",
                serviceId, Thread.currentThread().getName());
        return Collections.emptyList();
    }
}

使用编程的方式实现服务的容错,降级

代码实例如下:

/**
 * @author qingtian
 * @version 1.0
 * @description: 给 nacosClientService 实现包装
 * Hystrix 舱壁模式
 * 1. 线程池
 * 2. 信号量
 * @date 2022/4/6 22:30
 */
@Slf4j
public class NacosClientHystrixCommand extends HystrixCommand<List<ServiceInstance>> {

    /**
     * 需要保护的服务
     */
    private final NacosClientService nacosClientService;

    /**
     * 方法需要传递的参数
     */
    private final String serviceId;

    public NacosClientHystrixCommand(NacosClientService nacosClientService, String serviceId) {

        super(
                Setter.withGroupKey(
                        HystrixCommandGroupKey.Factory.asKey("NacosClientService"))
                        .andCommandKey(HystrixCommandKey.Factory.asKey("NacosClientServiceCommand"))
                        .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("NacosClientPool"))
                        //线程池 key 配置
                        .andCommandPropertiesDefaults(
                                HystrixCommandProperties.Setter()
                                        //线程池隔离策略
                                        .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
                                        //开启降级
                                        .withFallbackEnabled(true)
                                        //开启熔断器
                                        .withCircuitBreakerEnabled(true)
                        )
        );

        //可以配置信号量隔离策略
//        Setter setter = Setter.withGroupKey(
//                        HystrixCommandGroupKey.Factory.asKey("NacosClientService"))
//                .andCommandKey(HystrixCommandKey.Factory.asKey("NacosClientServiceCommand"))
//                .andCommandPropertiesDefaults(
//                        HystrixCommandProperties.Setter()
//                                //至少有10个请求熔断器才开始计算成功率
//                                .withCircuitBreakerRequestVolumeThreshold(10)
//                                //熔断器在启用5秒后会进入半开启模式 尝试进行请求
//                                .withCircuitBreakerSleepWindowInMilliseconds(5000)
//                                //当请求错误率达到 50% 时开启熔断器
//                                .withCircuitBreakerErrorThresholdPercentage(50)
//                                //使用信号量熔断配置
//                                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
//                );

        this.nacosClientService = nacosClientService;
        this.serviceId = serviceId;
    }

    /**
     * 要保护的方法调用写在 run 方法中
     * @return
     * @throws Exception
     */
    @Override
    protected List<ServiceInstance> run() throws Exception {
        log.info("NacosClientService In Hystrix Command to Get Service Instance : [{}]. [{}]",
                this.serviceId,Thread.currentThread().getName());
        return this.nacosClientService.getNacosClientInfo(this.serviceId);
    }

    /**
     * 降级处理策略
     * @return
     */
    @Override
    protected List<ServiceInstance> getFallback() {
        log.warn("NacoClientService run error : [{}], [{}]", this.serviceId, Thread.currentThread().getName());
        return Collections.emptyList();
    }
}

--------------------------------------------------------------------------
	//使用方式
    @GetMapping("/simple-hystrix-command")
    public List<ServiceInstance> getServiceInstanceByServiceId(@RequestParam String serviceId) throws ExecutionException, InterruptedException {

        //第一种方式  同步阻塞的方式
        List<ServiceInstance> serviceInstances01 = new NacosClientHystrixCommand(nacosClientService, serviceId)
                .execute();
        log.info("use execute to get service instance : [{}], [{}]",
                JSON.toJSONString(serviceInstances01),Thread.currentThread().getName());

        //第二种方式  异步非阻塞
        List<ServiceInstance> serviceInstances02;
        Future<List<ServiceInstance>> future = new NacosClientHystrixCommand(nacosClientService, serviceId).queue();
        //做一些别的事情
        serviceInstances02 = future.get();
        log.info("use execute to get service instance : [{}], [{}]",
                JSON.toJSONString(serviceInstances02),Thread.currentThread().getName());

        //第三种方式  热响应调用
        Observable<List<ServiceInstance>> observable = new NacosClientHystrixCommand(nacosClientService, serviceId).observe();
        //热响应调用,在开始只会创建线程而不会触发方法的调用,等到使用 observable.toBlocking().single() 方法的时候才会真正去调用方法
        List<ServiceInstance> serviceInstances03 = observable.toBlocking().single();
        log.info("use execute to get service instance : [{}], [{}]",
                JSON.toJSONString(serviceInstances03),Thread.currentThread().getName());

        //第四种方式,异步冷响应调用
        Observable<List<ServiceInstance>> toObservable = new NacosClientHystrixCommand(nacosClientService, serviceId).toObservable();
        //异步冷响应调用,主要与第三种热响应调用的区别,冷响应调用不会创建新的线程 而是在 toObservable.toBlocking().single() 的时候才会创建新的线程调用方法
        List<ServiceInstance> serviceInstances04 = toObservable.toBlocking().single();
        log.info("use execute to get service instance : [{}], [{}]",
                JSON.toJSONString(serviceInstances04),Thread.currentThread().getName());
        return serviceInstances01;
    }

基于信号量的机制实现

@Slf4j
public class NacosClientHystrixObservableCommand extends HystrixObservableCommand<List<ServiceInstance>> {

    /**
     * 需要保护的服务
     */
    private final NacosClientService nacosClientService;

    /**
     * 方法需要传递的参数
     */
    private final List<String> serviceIds;

    public NacosClientHystrixObservableCommand(NacosClientService nacosClientService, List<String> serviceIds) {
        super(
                HystrixObservableCommand.Setter
                        .withGroupKey(HystrixCommandGroupKey
                                .Factory.asKey("NacosClientService"))
                        .andCommandKey(HystrixCommandKey
                                .Factory.asKey("NacosClientHystrixObservableCommand"))
                        .andCommandPropertiesDefaults(
                                HystrixCommandProperties.Setter()
                                        //开启降级
                                        .withFallbackEnabled(true)
                                        //开启熔断器
                                        .withCircuitBreakerEnabled(true)
                        )

        );
        this.nacosClientService = nacosClientService;
        this.serviceIds = serviceIds;
    }

    /**
     * 需要保护的方法
     * @return
     */
    @Override
    protected Observable<List<ServiceInstance>> construct() {
        return Observable.create(new Observable.OnSubscribe<List<ServiceInstance>>() {
            //Observable 有三个关键的事件方法,分别是 onNext:方法调用, onCompleted:方法执行完成, onError:方法执行出错
            @Override
            public void call(Subscriber<? super List<ServiceInstance>> subscriber) {
                try {
                    //第一次完成时进入,打印出信息
                    if (!subscriber.isUnsubscribed()) {
                        log.info("subscriber command task : [{}],[{}]",
                                JSON.toJSONString(serviceIds),Thread.currentThread().getName());
                        serviceIds.forEach(
                                s -> subscriber.onNext(nacosClientService.getNacosClientInfo(s))
                        );
                        subscriber.onCompleted();
                        log.info("command task completed: [{}], [{}]",
                                JSON.toJSONString(serviceIds), Thread.currentThread().getName());
                    }
                }catch (Exception ex) {
                    subscriber.onError(ex);
                }
            }
        });
    }

    /**
     * 降级策略
     * @return
     */
    @Override
    protected Observable<List<ServiceInstance>> resumeWithFallback() {
        return Observable.create(new Observable.OnSubscribe<List<ServiceInstance>>() {
            @Override
            public void call(Subscriber<? super List<ServiceInstance>> subscriber) {
                try {
                    //第一次完成时进入,打印出信息
                    if (!subscriber.isUnsubscribed()) {
                        log.info("fallback ---- subscriber command task : [{}],[{}]",
                                JSON.toJSONString(serviceIds),Thread.currentThread().getName());
                        subscriber.onNext(Collections.emptyList());
                        subscriber.onCompleted();
                        log.info("command task completed: [{}], [{}]",
                                JSON.toJSONString(serviceIds), Thread.currentThread().getName());
                    }
                }catch (Exception ex) {
                    subscriber.onError(ex);
                }
            }
        });
    }
}

-----------------------------------------------------------------------
//使用实例
@GetMapping("/hystrix-observable-command")
    public List<ServiceInstance> getServiceInstanceByServiceIdObservable(@RequestParam String serviceId) {

        List<String> serviceIds = Arrays.asList(serviceId, serviceId, serviceId);
        List<List<ServiceInstance>> result = new ArrayList<>(serviceIds.size());

        NacosClientHystrixObservableCommand observableCommand = new NacosClientHystrixObservableCommand(nacosClientService, serviceIds);

        //异步执行命令
        Observable<List<ServiceInstance>> observe = observableCommand.observe();

        //注册获取结果
        observe.subscribe(new Observer<List<ServiceInstance>>() {
            @Override
            public void onCompleted() {
                log.info("all tasks is complete : [{}], [{}]", serviceId, Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onNext(List<ServiceInstance> serviceInstances) {
                result.add(serviceInstances);
            }
        });
        log.info("observable command result is : [{}], [{}]", JSON.toJSONString(result), Thread.currentThread().getName());
        return result.get(0);
    }

Hystrix请求缓存

  • Hystrix的结果缓存是指在一次Hystrix的请求上下文中: image-20220421231526984

Hystrix请求缓存的两种实现方式

  • 编程(继承HystrixCommand)的方式,重写getCacheKey()方法即可。
  • 注解方式

使用编程的方式

/**
 * @author qingtian
 * @version 1.0
 * @description: 初始化 Hystrix 请求上下文环境
 * @date 2022/4/21 23:18
 */
@Slf4j
@Component
@WebFilter(
        filterName = "HystrixRequestContextServletFilter",
        urlPatterns = "/*",
        asyncSupported = true
)
public class HystrixRequestContextServletFilter implements Filter {
    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
            throws IOException, ServletException {
        //初始化 Hystrix 请求上下文
        //在不同的 context 中缓存是不共享的
        //这个初始化是必须的
        HystrixRequestContext context = HystrixRequestContext.initializeContext();

        try {
            //配置 hystrix 缓存策略
            hystrixConcurrencyStrategyConfig();
            //请求正常通过
            filterChain.doFilter(servletRequest,servletResponse);
        }finally {
            //关闭 Hystrix 请求上下文
            context.shutdown();
        }
    }

    /**
     * 配置 Hystrix 的并发策略
     * 为什么要配置?
     * 因为在项目中引入了sleuth, sleuth会有一个默认的 Hystrix
     * 并发策略: SleuthHystrixConcurrencyStrategy 我们不需要
     * */
    public void hystrixConcurrencyStrategyConfig() {
        try {
            HystrixConcurrencyStrategy target = HystrixConcurrencyStrategyDefault.getInstance();
            //得到当前的 Hystrix 缓存策略
            HystrixConcurrencyStrategy strategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
            if (strategy instanceof HystrixConcurrencyStrategyDefault) {
                //如果是我们想要的 直接返回
                return;
            }

            //将原来的其他配置保存下来
            HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
            HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
            HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
            HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();

            //先重置 再把我自定义的配置与原来的配置写回去
            HystrixPlugins.reset();
            HystrixPlugins.getInstance().registerConcurrencyStrategy(target);
            HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
            HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
            HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
            HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);

            log.info("config hystrix concurrency strategy success");
        }catch (Exception e) {
            log.error("failed to register Hystrix Concurrency Strategy : [{}]",e.getMessage(),e);
        }
    }
}

--------------------------------------------------------------------------------

/**
 * @author qingtian
 * @version 1.0
 * @description: 带有缓存功能的 Hystrix
 * @date 2022/4/21 23:50
 */
@Slf4j
public class CacheHystrixCommand extends HystrixCommand<List<ServiceInstance>> {

    /**
     * 需要保护的服务
     */
    private final NacosClientService nacosClientService;

    /**
     * 方法需要的参数
     */
    private final String serviceId;

    private static final HystrixCommandKey CACHED_KEY = HystrixCommandKey.Factory.asKey("CacheHystrixCommand");

    public CacheHystrixCommand(NacosClientService nacosClientService, String serviceId) {
        super(
                HystrixCommand.Setter
                        .withGroupKey(HystrixCommandGroupKey
                                .Factory.asKey("CacheHystrixCommandGroup"))
                        .andCommandKey(CACHED_KEY)
        );
        this.nacosClientService = nacosClientService;
        this.serviceId = serviceId;
    }

    @Override
    protected List<ServiceInstance> run() throws Exception {
        log.info("CacheHystrixCommand In Hystrix Command to get service instance:" +
                "[{}], [{}]", this.serviceId, Thread.currentThread().getName());
        return this.nacosClientService.getNacosClientInfo(this.serviceId);
    }


    /**
     * 在一次请求上下文中 缓存是 k:v 形式的,定义命中缓存的key
     * @return
     */
    @Override
    protected String getCacheKey() {
        return serviceId;
    }

    @Override
    protected List<ServiceInstance> getFallback() {
        return Collections.emptyList();
    }

    /**
     * 根据缓存的 key 清理在一次 hystrix 请求上下文中的缓存
     * @param serviceId
     */
    private static void flushRequestCache(String serviceId) {
        //需要传递 commandKey
        HystrixRequestCache.getInstance(
                CACHED_KEY,
                HystrixConcurrencyStrategyDefault.getInstance()
        ).clear(serviceId);
        log.info("flush request cache in hystrix command : [{}], [{}]",serviceId, Thread.currentThread().getName());
    }
}

------------------------------------------------------------------------------
如何使用 hystrix 请求缓存
	@GetMapping("/cache-hystrix-command")
    public void cacheHystrixCommand(@RequestParam String serviceId) {
        //使用缓存 command ,发起两次请求
        CacheHystrixCommand command1 = new CacheHystrixCommand(nacosClientService, serviceId);
        CacheHystrixCommand command2 = new CacheHystrixCommand(nacosClientService, serviceId);

        List<ServiceInstance> result01 = command1.execute();
        List<ServiceInstance> result02 = command2.execute();
        log.info("result01, result02 : [{}], [{}]",JSON.toJSONString(result01),JSON.toJSONString(result02));

        //清除缓存
        CacheHystrixCommand command3 = new CacheHystrixCommand(nacosClientService, serviceId);
        CacheHystrixCommand command4 = new CacheHystrixCommand(nacosClientService, serviceId);

        List<ServiceInstance> result03 = command3.execute();
        List<ServiceInstance> result04 = command4.execute();
        log.info("result03, result04 : [{}], [{}]",JSON.toJSONString(result01),JSON.toJSONString(result02));
    }

使用注解的方式

注解描述属性
@CacheResult该注解用来标记请求命令返回的结果应该被缓存,它必须和@HystrixCommand注解结合使用cacheKeyMethod
@CacheRemove该注解用来让请求命令的缓存失效,失效的缓存根据commandKey查找commandKey,cacheKeyMethod
@CacheKey该注解用来在请求命令的参数上标记,使其作为cacheKey,如果没有使用此注解则会使用所有参数列表中的参数作为cacheKeyvalue
/**
 * @author qingtian
 * @version 1.0
 * @description: 使用注解方式开启 Hystrix 请求缓存
 * @date 2022/4/23 10:20
 */
@Service
@Slf4j
public class CacheHystrixCommandAnnotation {

    private final NacosClientService nacosClientService;

    public CacheHystrixCommandAnnotation(NacosClientService nacosClientService) {
        this.nacosClientService = nacosClientService;
    }



    /**
     *  第一种使用方法
     * @return
     */
    @CacheResult(cacheKeyMethod = "getCacheKey")
    @HystrixCommand(commandKey = "CacheHystrixCommandAnnotation")
    public List<ServiceInstance> useCacheByAnnotation01(String serviceId) {
        log.info("use cache01 to get nacos client info : [{}]", serviceId);
        return nacosClientService.getNacosClientInfo(serviceId);
    }

    //必须和上面的 commandKey 一致
    @CacheRemove(commandKey = "CacheHystrixCommandAnnotation",
                cacheKeyMethod = "getCacheKey")
    @HystrixCommand
    public void flushCacheByAnnotation01(String cacheId) {
        //实际上这个方法里不需要做任何操作
        log.info("flush hystrix cache key : [{}]",cacheId);
    }

    public String getCacheKey(String cacheId) {
        return cacheId;
    }

    /**
     * 第二种使用方式
     * @param serviceId
     * @return
     */
    @CacheResult
    @HystrixCommand(commandKey = "CacheHystrixCommandAnnotation")
    public List<ServiceInstance> useCacheByAnnotation02(@CacheKey String serviceId) {
        log.info("use cache02 to get nacos client info : [{}]", serviceId);
        return nacosClientService.getNacosClientInfo(serviceId);
    }

    /**
     * 清除缓存
     * 必须和上面的 commandKey 一致
     * @param cacheId
     */
    @CacheRemove(commandKey = "CacheHystrixCommandAnnotation")
    @HystrixCommand
    public void flushCacheByAnnotation02(@CacheKey String cacheId) {
        //实际上这个方法里不需要做任何操作
        log.info("flush hystrix cache key : [{}]",cacheId);
    }

    /**
     * 第三种使用方式
     * @param serviceId
     * @return
     */
    @CacheResult
    @HystrixCommand(commandKey = "CacheHystrixCommandAnnotation")
    public List<ServiceInstance> useCacheByAnnotation03(String serviceId) {
        log.info("use cache02 to get nacos client info : [{}]", serviceId);
        return nacosClientService.getNacosClientInfo(serviceId);
    }

    /**
     * 清除缓存
     * 必须和上面的 commandKey 一致
     * @param cacheId
     */
    @CacheRemove(commandKey = "CacheHystrixCommandAnnotation")
    @HystrixCommand
    public void flushCacheByAnnotation03(String cacheId) {
        //实际上这个方法里不需要做任何操作
        log.info("flush hystrix cache key : [{}]",cacheId);
    }

}

Hystrix请求合并

请求合并的思想

  • 默认情况下,每个请求都会占用一个线程和一次网络请求,高并发场景下效率不高

    image-20220424231248100

  • 使用Hystrix请求合并,将多个请求merge为一个,提高服务的并发能力

使用编程方式

/**
 * @author qingtian
 * @version 1.0
 * @description: 批量请求 hystrix Command
 * @date 2022/4/24 23:20
 */
@Slf4j
public class NacosClientBatchCommand extends HystrixCommand<List<List<ServiceInstance>>> {

    private final NacosClientService nacosClientService;
    private final List<String> serviceIds;

    public NacosClientBatchCommand(NacosClientService nacosClientService, List<String> serviceIds) {
        super(
                HystrixCommand.Setter.withGroupKey(
                        HystrixCommandGroupKey.Factory.asKey("NacosClientBatchCommand")
                )
        );
        this.nacosClientService = nacosClientService;
        this.serviceIds = serviceIds;
    }

    @Override
    protected List<List<ServiceInstance>> run() throws Exception {
        log.info("use nacos client batch command to get result : [{}]", JSON.toJSONString(serviceIds));
        return nacosClientService.getNacosInfos(serviceIds);
    }

    @Override
    protected List<List<ServiceInstance>> getFallback() {
        log.warn("nacos client batch command failure , use fallback");
        return Collections.emptyList();
    }
}

-----------------------------------------------------------------------------
/**
 * @author qingtian
 * @version 1.0
 * @description: 请求合并器
 * @date 2022/4/25 21:11
 */
@Slf4j
public class NacosClientCollapseCommand extends HystrixCollapser<List<List<ServiceInstance>>, List<ServiceInstance>, String> {

    private final NacosClientService nacosClientService;
    private final String serviceId;

    public NacosClientCollapseCommand(NacosClientService nacosClientService, String serviceId) {
        //初始化请求合并器
        super(
                HystrixCollapser.Setter.withCollapserKey(
                        HystrixCollapserKey.Factory.asKey("NacosClientCollapseCommand")
                ).andCollapserPropertiesDefaults(
                        //300毫秒内合并请求,300毫秒后的请求需要重新生成请求合并器
                        HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(300)
                )
        );

        this.nacosClientService = nacosClientService;
        this.serviceId = serviceId;
    }

    /**
     * 获取请求参数
     * @return
     */
    @Override
    public String getRequestArgument() {
        return this.serviceId;
    }

    /**
     * 创建批量请求的 Hystrix Command
     * @param collection
     * @return
     */
    @Override
    protected HystrixCommand<List<List<ServiceInstance>>> createCommand(
            Collection<CollapsedRequest<List<ServiceInstance>, String>> collection) {

        List<String> serviceIds = new ArrayList<>(collection.size());
        serviceIds.addAll(
                collection.stream()
                        .map(CollapsedRequest::getArgument)
                        .collect(Collectors.toList())
        );
        return new NacosClientBatchCommand(nacosClientService,serviceIds);
    }

    /**
     * 响应分发给单独的请求, 将list中的结果分给每一个单独的请求
     * @param lists
     * @param collection
     */
    @Override
    protected void mapResponseToRequests(List<List<ServiceInstance>> lists,
                                         Collection<CollapsedRequest<List<ServiceInstance>,
                                                 String>> collection) {
        //返回的响应都是根据请求顺序包装的,只要按顺序返回给响应的请求就行
        int count = 0;
        for (CollapsedRequest<List<ServiceInstance>, String> collapsedRequest : collection) {
            //从批量响应中按顺序取出结果
            List<ServiceInstance> instances = lists.get(count++);
            //将结果放回原来的响应中
            collapsedRequest.setResponse(instances);
        }
    }
}

--------------------------------------------------------------------------------
//使用方式
/**
     * 编程方式实现请求合并
     * @throws Exception
     */
    @GetMapping("/request-merge")
    public void requestMerge() throws Exception {
        NacosClientCollapseCommand collapseCommand01 = new NacosClientCollapseCommand(
                nacosClientService, "e-commerce-nacos-client1");
        NacosClientCollapseCommand collapseCommand02 = new NacosClientCollapseCommand(
                nacosClientService, "e-commerce-nacos-client2");
        NacosClientCollapseCommand collapseCommand03 = new NacosClientCollapseCommand(
                nacosClientService, "e-commerce-nacos-client3");

        Future<List<ServiceInstance>> future01 = collapseCommand01.queue();
        Future<List<ServiceInstance>> future02 = collapseCommand02.queue();
        Future<List<ServiceInstance>> future03 = collapseCommand03.queue();

        future01.get();
        future02.get();
        future03.get();

        Thread.sleep(2000);

        //过了合并请求的时间
        NacosClientCollapseCommand collapseCommand04 = new NacosClientCollapseCommand(
                nacosClientService, "e-commerce-nacos-client4");
        Future<List<ServiceInstance>> future04 = collapseCommand04.queue();
        future04.get();
    }

请求合并的适用场景和注意事项

  • 适用场景:单个对象的查询并发数很高,服务提供方负载较高,就可以考虑使用请求合并
  • 注意事项:
    1. 请求在代码中人为的设置了延迟时间,会降低响应速度
    2. 可能会提高服务提供方的负载,因为请求合并的响应是List结果数据量会大
    3. 实现请求合并比较复杂

使用注解方式

 /**
     * 使用注解的方式实现 Hystrix 请求合并
     * @param serviceId
     * @return
     */
    @HystrixCollapser(
            batchMethod = "findNacosClientInfos",
            //全局性的请求合并策略
            scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
            //合并时间区间属性
            collapserProperties = {
                    @HystrixProperty(name = "timerDelayInMilliseconds", value = "300")
            }
    )
    public Future<List<ServiceInstance>> findNacosClientInfo(String serviceId) {
        //系统运行正常,不会走这个方法,一般直接抛出异常
        throw new RuntimeException("this method body should not be executed!");
    }

    @HystrixCommand
    public List<List<ServiceInstance>> findNacosClientInfos(List<String> serviceIds) {
        log.info("coming in find nacos client infos : [{}]",JSON.toJSONString(serviceIds));
        return getNacosInfos(serviceIds);
    }
0

评论区