hystrix在微服务中扮演重要作用,其实现了隔离、熔断、降级等重要组件
但在spring cloud中,常与feign搭配起来使用。故本文仅包含其余feign整合后的相关源码。
hystrix资源隔离
共有两种:线程池的资源隔离,信号量的资源隔离
线程池:适合绝大多数的场景,99%的,线程池,对依赖服务的网络请求的调用和访问,timeout这种问题
信号量:适合对内部的一些比较复杂的业务逻辑的访问,但是像这种访问,系统内部的代码,其实不涉及任何的网络请求,那么只要做信号量的普通限流就可以了,因为不需要去捕获timeout类似的问题。但算法+数据结构的效率不是太高,并发量突然太高,因为这里稍微耗时一些,导致很多线程卡在这里的话,不太好,所以进行一个基本的资源隔离和访问,避免内部复杂的低效率的代码,导致大量的线程被hang住
feign集成hystrix以后,HystrixFeign.Builder则被替换为HystrixFeign.Builder。以支持hystrix相关组件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
//ReflectiveFeign 类中
public <T> T newInstance(Target<T> target) {
Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
//对应FeignClient中的方法,及代理类
Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();
for (Method method : target.type().getMethods()) {
if (method.getDeclaringClass() == Object.class) {
continue;
} else if (Util.isDefault(method)) {
DefaultMethodHandler handler = new DefaultMethodHandler(method);
defaultMethodHandlers.add(handler);
methodToHandler.put(method, handler);
} else {
methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
}
}
//调用HystrixFeign.Build方法
InvocationHandler handler = factory.create(target, methodToHandler);
T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(),
new Class<?>[] {target.type()}, handler);
for (DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
defaultMethodHandler.bindTo(proxy);
}
return proxy;
}
//HystrixFeign.Build
@Override
public Feign build() {
return build(null);
}
Feign build(final FallbackFactory<?> nullableFallbackFactory) {
super.invocationHandlerFactory(new InvocationHandlerFactory() {
@Override
public InvocationHandler create(Target target,
Map<Method, MethodHandler> dispatch) {
return new HystrixInvocationHandler(target, dispatch, setterFactory,
nullableFallbackFactory);
}
});
super.contract(new HystrixDelegatingContract(contract));
return super.build();
}
|
HystrixInvocationHandler重要代码
1
2
3
4
5
6
7
8
|
@Override
public HystrixCommand.Setter create(Target<?> target, Method method) {
String groupKey = target.name();
String commandKey = Feign.configKey(target.type(), method);
return HystrixCommand.Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
.andCommandKey(HystrixCommandKey.Factory.asKey(commandKey));
}
|
一个接口一个线程池即groupKey,一个方法一个线程即commandKey
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
HystrixInvocationHandler(Target<?> target, Map<Method, MethodHandler> dispatch,
SetterFactory setterFactory, FallbackFactory<?> fallbackFactory) {
this.target = checkNotNull(target, "target");
this.dispatch = checkNotNull(dispatch, "dispatch");
this.fallbackFactory = fallbackFactory;
this.fallbackMethodMap = toFallbackMethod(dispatch);
this.setterMethodMap = toSetters(setterFactory, target, dispatch.keySet());
}
/**
* Process all methods in the target so that appropriate setters are created.
*/
static Map<Method, Setter> toSetters(SetterFactory setterFactory,
Target<?> target,
Set<Method> methods) {
Map<Method, Setter> result = new LinkedHashMap<Method, Setter>();
for (Method method : methods) {
method.setAccessible(true);
result.put(method, setterFactory.create(target, method));
}
return result;
}
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args)
throws Throwable {
// early exit if the invoked method is from java.lang.Object
// code is the same as ReflectiveFeign.FeignInvocationHandler
if ("equals".equals(method.getName())) {
try {
Object otherHandler =
args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
return equals(otherHandler);
} catch (IllegalArgumentException e) {
return false;
}
} else if ("hashCode".equals(method.getName())) {
return hashCode();
} else if ("toString".equals(method.getName())) {
return toString();
}
HystrixCommand<Object> hystrixCommand =
new HystrixCommand<Object>(setterMethodMap.get(method)) {
@Override
protected Object run() throws Exception {
try {
return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
} catch (Exception e) {
throw e;
} catch (Throwable t) {
throw (Error) t;
}
}
@Override
protected Object getFallback() {
if (fallbackFactory == null) {
return super.getFallback();
}
try {
Object fallback = fallbackFactory.create(getExecutionException());
Object result = fallbackMethodMap.get(method).invoke(fallback, args);
if (isReturnsHystrixCommand(method)) {
return ((HystrixCommand) result).execute();
} else if (isReturnsObservable(method)) {
// Create a cold Observable
return ((Observable) result).toBlocking().first();
} else if (isReturnsSingle(method)) {
// Create a cold Observable as a Single
return ((Single) result).toObservable().toBlocking().first();
} else if (isReturnsCompletable(method)) {
((Completable) result).await();
return null;
} else if (isReturnsCompletableFuture(method)) {
return ((Future) result).get();
} else {
return result;
}
} catch (IllegalAccessException e) {
// shouldn't happen as method is public due to being an interface
throw new AssertionError(e);
} catch (InvocationTargetException | ExecutionException e) {
// Exceptions on fallback are tossed by Hystrix
throw new AssertionError(e.getCause());
} catch (InterruptedException e) {
// Exceptions on fallback are tossed by Hystrix
Thread.currentThread().interrupt();
throw new AssertionError(e.getCause());
}
}
};
if (Util.isDefault(method)) {
return hystrixCommand.execute();
} else if (isReturnsHystrixCommand(method)) {
return hystrixCommand;
} else if (isReturnsObservable(method)) {
// Create a cold Observable
return hystrixCommand.toObservable();
} else if (isReturnsSingle(method)) {
// Create a cold Observable as a Single
return hystrixCommand.toObservable().toSingle();
} else if (isReturnsCompletable(method)) {
return hystrixCommand.toObservable().toCompletable();
} else if (isReturnsCompletableFuture(method)) {
return new ObservableCompletableFuture<>(hystrixCommand);
}
return hystrixCommand.execute();
}
|
hystrixCommand.execute()中源码大量用到了rxjava来实现,暂时不太熟悉rxjava。后续填坑~
大致的逻辑是,hystrix内部实现了一个线程池来管理接口的调用,实现熔断限流降级等功能。
hystrixCommand.execute() 底层返回一个future.get()的对象,实现了异步调用