SpringCloud Hystrix
Hystrix的设计目标
Hystrix
是一个库,通过添加延迟容忍和容错逻辑,帮助你控制这些分布式服务之间的交互Hystrix
通过隔离服务之间的访问点、停止级联失败和提供回退选项来实现服务之间的容错Hystrix
有四大设计目标- 对客户端访问的延迟和故障进行保护和控制
- 在复杂分布式系统中阻止级联故障
- 快速失败,快速恢复
- 兜底回退,尽可能的优雅降级
Hystrix解决了什么问题
- 复杂分布式系统中服务之间存在许多依赖项,依赖项可能会存在故障,如果不做故障隔离,整个服务可能会被拖垮
Hystrix是如何实现它的目标的
- 对依赖项(服务)进行包装代理,不直接与依赖项交互
- 调用超时时间允许自行设定,超时后立即熔断报错
- 每一个依赖项都在自己的空间内(线程池或信号量隔离),依赖项之间不存在干扰
- 请求依赖项失败后,可以选择出错或是兜底回退
使用注解的方式实现服务的容错、降级
Hystrix的断路器模式和后备策略模式
Hystrix的舱壁模式
-
Hystrix 是通过线程池管理调用外部资源的,默认情况下所有服务调用的都是一个公用线程池。
-
一个性能低下的服务会耗尽
Hystrix
的线程池资源。进而牵连到其他的远程调用,最后也会耗尽Java容器的资源。
-
舱壁模式的功能:我们可以为各个服务分别指定线程池
@EnableDiscoveryClient
@SpringBootApplication
@EnableFeignClients
@RefreshScope
@EnableCircuitBreaker //启用 Hysrix
public class NacosClientApplication {
public static void main(String[] args) {
SpringApplication.run(NacosClientApplication.class,args);
}
}
使用@HystrixCommand
来配置
/**
* @author qingtian
* @version 1.0
* @description: 使用 HystrixCommand 注解
* @date 2022/3/23 23:10
*/
@Slf4j
@Service
public class UseHystrixCommandAnnotation {
@Autowired
private NacosClientService nacosClientService;
@HystrixCommand(
// 用于对 Hystrix 命令进行分组, 分组之后便于统计展示于仪表盘、上传报告和预警等等
// 内部进行度量统计时候的分组标识, 数据上报和统计的最小维度就是 groupKey
groupKey = "NacosClientService",
// HystrixCommand 的名字, 默认是当前类的名字, 主要方便 Hystrix 进行监控、报警等
commandKey = "NacosClientService",
// 舱壁模式
threadPoolKey = "NacosClientService",
// 后备模式
fallbackMethod = "getNacosClientInfoFallback",
// 断路器模式
commandProperties = {
// 超时时间, 单位毫秒, 超时进 fallback
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "1500"),
// 判断熔断的最少请求数, 默认是10; 只有在一定时间内请求数量达到该值, 才会进行成功率的计算
@HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "10"),
// 熔断的阈值默认值 50, 表示在一定时间内有50%的请求处理失败, 会触发熔断
@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "10"),
},
// 舱壁模式
threadPoolProperties = {
@HystrixProperty(name = "coreSize", value = "30"),
@HystrixProperty(name = "maxQueueSize", value = "101"),
@HystrixProperty(name = "keepAliveTimeMinutes", value = "2"),
@HystrixProperty(name = "queueSizeRejectionThreshold", value = "15"),
// 在时间窗口中, 收集统计信息的次数; 在 1440ms 的窗口中一共统计 12 次
@HystrixProperty(name = "metrics.rollingStats.numBuckets", value = "12"),
// 时间窗口, 从监听到第一次失败开始计时
@HystrixProperty(name = "metrics.rollingStats.timeInMilliseconds", value = "1440")
}
)
public List<ServiceInstance> getNacosClientInfo(String serviceId) {
log.info("use hystrux command annotation to get nacos client info : [{}],[{}]",
serviceId,Thread.currentThread().getName());
return nacosClientService.getNacosClientInfo(serviceId);
}
/**
* getNacosClientInfo 方法的兜底策略
* @param serviceId
* @return
*/
public List<ServiceInstance> getNacosClientInfoFallback(String serviceId) {
log.warn("can not get nacos client, trigger hystrix fallback : [{}],[{}]",
serviceId, Thread.currentThread().getName());
return Collections.emptyList();
}
}
使用编程的方式实现服务的容错,降级
代码实例如下:
/**
* @author qingtian
* @version 1.0
* @description: 给 nacosClientService 实现包装
* Hystrix 舱壁模式
* 1. 线程池
* 2. 信号量
* @date 2022/4/6 22:30
*/
@Slf4j
public class NacosClientHystrixCommand extends HystrixCommand<List<ServiceInstance>> {
/**
* 需要保护的服务
*/
private final NacosClientService nacosClientService;
/**
* 方法需要传递的参数
*/
private final String serviceId;
public NacosClientHystrixCommand(NacosClientService nacosClientService, String serviceId) {
super(
Setter.withGroupKey(
HystrixCommandGroupKey.Factory.asKey("NacosClientService"))
.andCommandKey(HystrixCommandKey.Factory.asKey("NacosClientServiceCommand"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("NacosClientPool"))
//线程池 key 配置
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
//线程池隔离策略
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
//开启降级
.withFallbackEnabled(true)
//开启熔断器
.withCircuitBreakerEnabled(true)
)
);
//可以配置信号量隔离策略
// Setter setter = Setter.withGroupKey(
// HystrixCommandGroupKey.Factory.asKey("NacosClientService"))
// .andCommandKey(HystrixCommandKey.Factory.asKey("NacosClientServiceCommand"))
// .andCommandPropertiesDefaults(
// HystrixCommandProperties.Setter()
// //至少有10个请求熔断器才开始计算成功率
// .withCircuitBreakerRequestVolumeThreshold(10)
// //熔断器在启用5秒后会进入半开启模式 尝试进行请求
// .withCircuitBreakerSleepWindowInMilliseconds(5000)
// //当请求错误率达到 50% 时开启熔断器
// .withCircuitBreakerErrorThresholdPercentage(50)
// //使用信号量熔断配置
// .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
// );
this.nacosClientService = nacosClientService;
this.serviceId = serviceId;
}
/**
* 要保护的方法调用写在 run 方法中
* @return
* @throws Exception
*/
@Override
protected List<ServiceInstance> run() throws Exception {
log.info("NacosClientService In Hystrix Command to Get Service Instance : [{}]. [{}]",
this.serviceId,Thread.currentThread().getName());
return this.nacosClientService.getNacosClientInfo(this.serviceId);
}
/**
* 降级处理策略
* @return
*/
@Override
protected List<ServiceInstance> getFallback() {
log.warn("NacoClientService run error : [{}], [{}]", this.serviceId, Thread.currentThread().getName());
return Collections.emptyList();
}
}
--------------------------------------------------------------------------
//使用方式
@GetMapping("/simple-hystrix-command")
public List<ServiceInstance> getServiceInstanceByServiceId(@RequestParam String serviceId) throws ExecutionException, InterruptedException {
//第一种方式 同步阻塞的方式
List<ServiceInstance> serviceInstances01 = new NacosClientHystrixCommand(nacosClientService, serviceId)
.execute();
log.info("use execute to get service instance : [{}], [{}]",
JSON.toJSONString(serviceInstances01),Thread.currentThread().getName());
//第二种方式 异步非阻塞
List<ServiceInstance> serviceInstances02;
Future<List<ServiceInstance>> future = new NacosClientHystrixCommand(nacosClientService, serviceId).queue();
//做一些别的事情
serviceInstances02 = future.get();
log.info("use execute to get service instance : [{}], [{}]",
JSON.toJSONString(serviceInstances02),Thread.currentThread().getName());
//第三种方式 热响应调用
Observable<List<ServiceInstance>> observable = new NacosClientHystrixCommand(nacosClientService, serviceId).observe();
//热响应调用,在开始只会创建线程而不会触发方法的调用,等到使用 observable.toBlocking().single() 方法的时候才会真正去调用方法
List<ServiceInstance> serviceInstances03 = observable.toBlocking().single();
log.info("use execute to get service instance : [{}], [{}]",
JSON.toJSONString(serviceInstances03),Thread.currentThread().getName());
//第四种方式,异步冷响应调用
Observable<List<ServiceInstance>> toObservable = new NacosClientHystrixCommand(nacosClientService, serviceId).toObservable();
//异步冷响应调用,主要与第三种热响应调用的区别,冷响应调用不会创建新的线程 而是在 toObservable.toBlocking().single() 的时候才会创建新的线程调用方法
List<ServiceInstance> serviceInstances04 = toObservable.toBlocking().single();
log.info("use execute to get service instance : [{}], [{}]",
JSON.toJSONString(serviceInstances04),Thread.currentThread().getName());
return serviceInstances01;
}
基于信号量的机制实现
@Slf4j
public class NacosClientHystrixObservableCommand extends HystrixObservableCommand<List<ServiceInstance>> {
/**
* 需要保护的服务
*/
private final NacosClientService nacosClientService;
/**
* 方法需要传递的参数
*/
private final List<String> serviceIds;
public NacosClientHystrixObservableCommand(NacosClientService nacosClientService, List<String> serviceIds) {
super(
HystrixObservableCommand.Setter
.withGroupKey(HystrixCommandGroupKey
.Factory.asKey("NacosClientService"))
.andCommandKey(HystrixCommandKey
.Factory.asKey("NacosClientHystrixObservableCommand"))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
//开启降级
.withFallbackEnabled(true)
//开启熔断器
.withCircuitBreakerEnabled(true)
)
);
this.nacosClientService = nacosClientService;
this.serviceIds = serviceIds;
}
/**
* 需要保护的方法
* @return
*/
@Override
protected Observable<List<ServiceInstance>> construct() {
return Observable.create(new Observable.OnSubscribe<List<ServiceInstance>>() {
//Observable 有三个关键的事件方法,分别是 onNext:方法调用, onCompleted:方法执行完成, onError:方法执行出错
@Override
public void call(Subscriber<? super List<ServiceInstance>> subscriber) {
try {
//第一次完成时进入,打印出信息
if (!subscriber.isUnsubscribed()) {
log.info("subscriber command task : [{}],[{}]",
JSON.toJSONString(serviceIds),Thread.currentThread().getName());
serviceIds.forEach(
s -> subscriber.onNext(nacosClientService.getNacosClientInfo(s))
);
subscriber.onCompleted();
log.info("command task completed: [{}], [{}]",
JSON.toJSONString(serviceIds), Thread.currentThread().getName());
}
}catch (Exception ex) {
subscriber.onError(ex);
}
}
});
}
/**
* 降级策略
* @return
*/
@Override
protected Observable<List<ServiceInstance>> resumeWithFallback() {
return Observable.create(new Observable.OnSubscribe<List<ServiceInstance>>() {
@Override
public void call(Subscriber<? super List<ServiceInstance>> subscriber) {
try {
//第一次完成时进入,打印出信息
if (!subscriber.isUnsubscribed()) {
log.info("fallback ---- subscriber command task : [{}],[{}]",
JSON.toJSONString(serviceIds),Thread.currentThread().getName());
subscriber.onNext(Collections.emptyList());
subscriber.onCompleted();
log.info("command task completed: [{}], [{}]",
JSON.toJSONString(serviceIds), Thread.currentThread().getName());
}
}catch (Exception ex) {
subscriber.onError(ex);
}
}
});
}
}
-----------------------------------------------------------------------
//使用实例
@GetMapping("/hystrix-observable-command")
public List<ServiceInstance> getServiceInstanceByServiceIdObservable(@RequestParam String serviceId) {
List<String> serviceIds = Arrays.asList(serviceId, serviceId, serviceId);
List<List<ServiceInstance>> result = new ArrayList<>(serviceIds.size());
NacosClientHystrixObservableCommand observableCommand = new NacosClientHystrixObservableCommand(nacosClientService, serviceIds);
//异步执行命令
Observable<List<ServiceInstance>> observe = observableCommand.observe();
//注册获取结果
observe.subscribe(new Observer<List<ServiceInstance>>() {
@Override
public void onCompleted() {
log.info("all tasks is complete : [{}], [{}]", serviceId, Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(List<ServiceInstance> serviceInstances) {
result.add(serviceInstances);
}
});
log.info("observable command result is : [{}], [{}]", JSON.toJSONString(result), Thread.currentThread().getName());
return result.get(0);
}
Hystrix请求缓存
- Hystrix的结果缓存是指在一次
Hystrix
的请求上下文中:
Hystrix请求缓存的两种实现方式
- 编程(继承
HystrixCommand
)的方式,重写getCacheKey()
方法即可。 - 注解方式
使用编程的方式
/**
* @author qingtian
* @version 1.0
* @description: 初始化 Hystrix 请求上下文环境
* @date 2022/4/21 23:18
*/
@Slf4j
@Component
@WebFilter(
filterName = "HystrixRequestContextServletFilter",
urlPatterns = "/*",
asyncSupported = true
)
public class HystrixRequestContextServletFilter implements Filter {
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
throws IOException, ServletException {
//初始化 Hystrix 请求上下文
//在不同的 context 中缓存是不共享的
//这个初始化是必须的
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
//配置 hystrix 缓存策略
hystrixConcurrencyStrategyConfig();
//请求正常通过
filterChain.doFilter(servletRequest,servletResponse);
}finally {
//关闭 Hystrix 请求上下文
context.shutdown();
}
}
/**
* 配置 Hystrix 的并发策略
* 为什么要配置?
* 因为在项目中引入了sleuth, sleuth会有一个默认的 Hystrix
* 并发策略: SleuthHystrixConcurrencyStrategy 我们不需要
* */
public void hystrixConcurrencyStrategyConfig() {
try {
HystrixConcurrencyStrategy target = HystrixConcurrencyStrategyDefault.getInstance();
//得到当前的 Hystrix 缓存策略
HystrixConcurrencyStrategy strategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
if (strategy instanceof HystrixConcurrencyStrategyDefault) {
//如果是我们想要的 直接返回
return;
}
//将原来的其他配置保存下来
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
//先重置 再把我自定义的配置与原来的配置写回去
HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(target);
HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
log.info("config hystrix concurrency strategy success");
}catch (Exception e) {
log.error("failed to register Hystrix Concurrency Strategy : [{}]",e.getMessage(),e);
}
}
}
--------------------------------------------------------------------------------
/**
* @author qingtian
* @version 1.0
* @description: 带有缓存功能的 Hystrix
* @date 2022/4/21 23:50
*/
@Slf4j
public class CacheHystrixCommand extends HystrixCommand<List<ServiceInstance>> {
/**
* 需要保护的服务
*/
private final NacosClientService nacosClientService;
/**
* 方法需要的参数
*/
private final String serviceId;
private static final HystrixCommandKey CACHED_KEY = HystrixCommandKey.Factory.asKey("CacheHystrixCommand");
public CacheHystrixCommand(NacosClientService nacosClientService, String serviceId) {
super(
HystrixCommand.Setter
.withGroupKey(HystrixCommandGroupKey
.Factory.asKey("CacheHystrixCommandGroup"))
.andCommandKey(CACHED_KEY)
);
this.nacosClientService = nacosClientService;
this.serviceId = serviceId;
}
@Override
protected List<ServiceInstance> run() throws Exception {
log.info("CacheHystrixCommand In Hystrix Command to get service instance:" +
"[{}], [{}]", this.serviceId, Thread.currentThread().getName());
return this.nacosClientService.getNacosClientInfo(this.serviceId);
}
/**
* 在一次请求上下文中 缓存是 k:v 形式的,定义命中缓存的key
* @return
*/
@Override
protected String getCacheKey() {
return serviceId;
}
@Override
protected List<ServiceInstance> getFallback() {
return Collections.emptyList();
}
/**
* 根据缓存的 key 清理在一次 hystrix 请求上下文中的缓存
* @param serviceId
*/
private static void flushRequestCache(String serviceId) {
//需要传递 commandKey
HystrixRequestCache.getInstance(
CACHED_KEY,
HystrixConcurrencyStrategyDefault.getInstance()
).clear(serviceId);
log.info("flush request cache in hystrix command : [{}], [{}]",serviceId, Thread.currentThread().getName());
}
}
------------------------------------------------------------------------------
如何使用 hystrix 请求缓存
@GetMapping("/cache-hystrix-command")
public void cacheHystrixCommand(@RequestParam String serviceId) {
//使用缓存 command ,发起两次请求
CacheHystrixCommand command1 = new CacheHystrixCommand(nacosClientService, serviceId);
CacheHystrixCommand command2 = new CacheHystrixCommand(nacosClientService, serviceId);
List<ServiceInstance> result01 = command1.execute();
List<ServiceInstance> result02 = command2.execute();
log.info("result01, result02 : [{}], [{}]",JSON.toJSONString(result01),JSON.toJSONString(result02));
//清除缓存
CacheHystrixCommand command3 = new CacheHystrixCommand(nacosClientService, serviceId);
CacheHystrixCommand command4 = new CacheHystrixCommand(nacosClientService, serviceId);
List<ServiceInstance> result03 = command3.execute();
List<ServiceInstance> result04 = command4.execute();
log.info("result03, result04 : [{}], [{}]",JSON.toJSONString(result01),JSON.toJSONString(result02));
}
使用注解的方式
注解 | 描述 | 属性 |
---|---|---|
@CacheResult | 该注解用来标记请求命令返回的结果应该被缓存,它必须和@HystrixCommand注解结合使用 | cacheKeyMethod |
@CacheRemove | 该注解用来让请求命令的缓存失效,失效的缓存根据commandKey查找 | commandKey,cacheKeyMethod |
@CacheKey | 该注解用来在请求命令的参数上标记,使其作为cacheKey,如果没有使用此注解则会使用所有参数列表中的参数作为cacheKey | value |
/**
* @author qingtian
* @version 1.0
* @description: 使用注解方式开启 Hystrix 请求缓存
* @date 2022/4/23 10:20
*/
@Service
@Slf4j
public class CacheHystrixCommandAnnotation {
private final NacosClientService nacosClientService;
public CacheHystrixCommandAnnotation(NacosClientService nacosClientService) {
this.nacosClientService = nacosClientService;
}
/**
* 第一种使用方法
* @return
*/
@CacheResult(cacheKeyMethod = "getCacheKey")
@HystrixCommand(commandKey = "CacheHystrixCommandAnnotation")
public List<ServiceInstance> useCacheByAnnotation01(String serviceId) {
log.info("use cache01 to get nacos client info : [{}]", serviceId);
return nacosClientService.getNacosClientInfo(serviceId);
}
//必须和上面的 commandKey 一致
@CacheRemove(commandKey = "CacheHystrixCommandAnnotation",
cacheKeyMethod = "getCacheKey")
@HystrixCommand
public void flushCacheByAnnotation01(String cacheId) {
//实际上这个方法里不需要做任何操作
log.info("flush hystrix cache key : [{}]",cacheId);
}
public String getCacheKey(String cacheId) {
return cacheId;
}
/**
* 第二种使用方式
* @param serviceId
* @return
*/
@CacheResult
@HystrixCommand(commandKey = "CacheHystrixCommandAnnotation")
public List<ServiceInstance> useCacheByAnnotation02(@CacheKey String serviceId) {
log.info("use cache02 to get nacos client info : [{}]", serviceId);
return nacosClientService.getNacosClientInfo(serviceId);
}
/**
* 清除缓存
* 必须和上面的 commandKey 一致
* @param cacheId
*/
@CacheRemove(commandKey = "CacheHystrixCommandAnnotation")
@HystrixCommand
public void flushCacheByAnnotation02(@CacheKey String cacheId) {
//实际上这个方法里不需要做任何操作
log.info("flush hystrix cache key : [{}]",cacheId);
}
/**
* 第三种使用方式
* @param serviceId
* @return
*/
@CacheResult
@HystrixCommand(commandKey = "CacheHystrixCommandAnnotation")
public List<ServiceInstance> useCacheByAnnotation03(String serviceId) {
log.info("use cache02 to get nacos client info : [{}]", serviceId);
return nacosClientService.getNacosClientInfo(serviceId);
}
/**
* 清除缓存
* 必须和上面的 commandKey 一致
* @param cacheId
*/
@CacheRemove(commandKey = "CacheHystrixCommandAnnotation")
@HystrixCommand
public void flushCacheByAnnotation03(String cacheId) {
//实际上这个方法里不需要做任何操作
log.info("flush hystrix cache key : [{}]",cacheId);
}
}
Hystrix请求合并
请求合并的思想
-
默认情况下,每个请求都会占用一个线程和一次网络请求,高并发场景下效率不高
-
使用Hystrix请求合并,将多个请求merge为一个,提高服务的并发能力
使用编程方式
/**
* @author qingtian
* @version 1.0
* @description: 批量请求 hystrix Command
* @date 2022/4/24 23:20
*/
@Slf4j
public class NacosClientBatchCommand extends HystrixCommand<List<List<ServiceInstance>>> {
private final NacosClientService nacosClientService;
private final List<String> serviceIds;
public NacosClientBatchCommand(NacosClientService nacosClientService, List<String> serviceIds) {
super(
HystrixCommand.Setter.withGroupKey(
HystrixCommandGroupKey.Factory.asKey("NacosClientBatchCommand")
)
);
this.nacosClientService = nacosClientService;
this.serviceIds = serviceIds;
}
@Override
protected List<List<ServiceInstance>> run() throws Exception {
log.info("use nacos client batch command to get result : [{}]", JSON.toJSONString(serviceIds));
return nacosClientService.getNacosInfos(serviceIds);
}
@Override
protected List<List<ServiceInstance>> getFallback() {
log.warn("nacos client batch command failure , use fallback");
return Collections.emptyList();
}
}
-----------------------------------------------------------------------------
/**
* @author qingtian
* @version 1.0
* @description: 请求合并器
* @date 2022/4/25 21:11
*/
@Slf4j
public class NacosClientCollapseCommand extends HystrixCollapser<List<List<ServiceInstance>>, List<ServiceInstance>, String> {
private final NacosClientService nacosClientService;
private final String serviceId;
public NacosClientCollapseCommand(NacosClientService nacosClientService, String serviceId) {
//初始化请求合并器
super(
HystrixCollapser.Setter.withCollapserKey(
HystrixCollapserKey.Factory.asKey("NacosClientCollapseCommand")
).andCollapserPropertiesDefaults(
//300毫秒内合并请求,300毫秒后的请求需要重新生成请求合并器
HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(300)
)
);
this.nacosClientService = nacosClientService;
this.serviceId = serviceId;
}
/**
* 获取请求参数
* @return
*/
@Override
public String getRequestArgument() {
return this.serviceId;
}
/**
* 创建批量请求的 Hystrix Command
* @param collection
* @return
*/
@Override
protected HystrixCommand<List<List<ServiceInstance>>> createCommand(
Collection<CollapsedRequest<List<ServiceInstance>, String>> collection) {
List<String> serviceIds = new ArrayList<>(collection.size());
serviceIds.addAll(
collection.stream()
.map(CollapsedRequest::getArgument)
.collect(Collectors.toList())
);
return new NacosClientBatchCommand(nacosClientService,serviceIds);
}
/**
* 响应分发给单独的请求, 将list中的结果分给每一个单独的请求
* @param lists
* @param collection
*/
@Override
protected void mapResponseToRequests(List<List<ServiceInstance>> lists,
Collection<CollapsedRequest<List<ServiceInstance>,
String>> collection) {
//返回的响应都是根据请求顺序包装的,只要按顺序返回给响应的请求就行
int count = 0;
for (CollapsedRequest<List<ServiceInstance>, String> collapsedRequest : collection) {
//从批量响应中按顺序取出结果
List<ServiceInstance> instances = lists.get(count++);
//将结果放回原来的响应中
collapsedRequest.setResponse(instances);
}
}
}
--------------------------------------------------------------------------------
//使用方式
/**
* 编程方式实现请求合并
* @throws Exception
*/
@GetMapping("/request-merge")
public void requestMerge() throws Exception {
NacosClientCollapseCommand collapseCommand01 = new NacosClientCollapseCommand(
nacosClientService, "e-commerce-nacos-client1");
NacosClientCollapseCommand collapseCommand02 = new NacosClientCollapseCommand(
nacosClientService, "e-commerce-nacos-client2");
NacosClientCollapseCommand collapseCommand03 = new NacosClientCollapseCommand(
nacosClientService, "e-commerce-nacos-client3");
Future<List<ServiceInstance>> future01 = collapseCommand01.queue();
Future<List<ServiceInstance>> future02 = collapseCommand02.queue();
Future<List<ServiceInstance>> future03 = collapseCommand03.queue();
future01.get();
future02.get();
future03.get();
Thread.sleep(2000);
//过了合并请求的时间
NacosClientCollapseCommand collapseCommand04 = new NacosClientCollapseCommand(
nacosClientService, "e-commerce-nacos-client4");
Future<List<ServiceInstance>> future04 = collapseCommand04.queue();
future04.get();
}
请求合并的适用场景和注意事项
- 适用场景:单个对象的查询并发数很高,服务提供方负载较高,就可以考虑使用请求合并
- 注意事项:
- 请求在代码中人为的设置了延迟时间,会降低响应速度
- 可能会提高服务提供方的负载,因为请求合并的响应是List结果数据量会大
- 实现请求合并比较复杂
使用注解方式
/**
* 使用注解的方式实现 Hystrix 请求合并
* @param serviceId
* @return
*/
@HystrixCollapser(
batchMethod = "findNacosClientInfos",
//全局性的请求合并策略
scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
//合并时间区间属性
collapserProperties = {
@HystrixProperty(name = "timerDelayInMilliseconds", value = "300")
}
)
public Future<List<ServiceInstance>> findNacosClientInfo(String serviceId) {
//系统运行正常,不会走这个方法,一般直接抛出异常
throw new RuntimeException("this method body should not be executed!");
}
@HystrixCommand
public List<List<ServiceInstance>> findNacosClientInfos(List<String> serviceIds) {
log.info("coming in find nacos client infos : [{}]",JSON.toJSONString(serviceIds));
return getNacosInfos(serviceIds);
}
评论区