hystrix在微服务中扮演重要作用,其实现了隔离、熔断、降级等重要组件

但在spring cloud中,常与feign搭配起来使用。故本文仅包含其余feign整合后的相关源码。

hystrix资源隔离

共有两种:线程池的资源隔离,信号量的资源隔离

线程池:适合绝大多数的场景,99%的,线程池,对依赖服务的网络请求的调用和访问,timeout这种问题

信号量:适合对内部的一些比较复杂的业务逻辑的访问,但是像这种访问,系统内部的代码,其实不涉及任何的网络请求,那么只要做信号量的普通限流就可以了,因为不需要去捕获timeout类似的问题。但算法+数据结构的效率不是太高,并发量突然太高,因为这里稍微耗时一些,导致很多线程卡在这里的话,不太好,所以进行一个基本的资源隔离和访问,避免内部复杂的低效率的代码,导致大量的线程被hang住

feign与hystrix整合的核心原理

feign集成hystrix以后,HystrixFeign.Builder则被替换为HystrixFeign.Builder。以支持hystrix相关组件

//ReflectiveFeign 类中
public <T> T newInstance(Target<T> target) {
  Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
  //对应FeignClient中的方法,及代理类
  Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
  List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();

  for (Method method : target.type().getMethods()) {
    if (method.getDeclaringClass() == Object.class) {
      continue;
    } else if (Util.isDefault(method)) {
      DefaultMethodHandler handler = new DefaultMethodHandler(method);
      defaultMethodHandlers.add(handler);
      methodToHandler.put(method, handler);
    } else {
      methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
    }
  }
  //调用HystrixFeign.Build方法
  InvocationHandler handler = factory.create(target, methodToHandler);
  T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(),
      new Class<?>[] {target.type()}, handler);

  for (DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
    defaultMethodHandler.bindTo(proxy);
  }
  return proxy;
}

//HystrixFeign.Build
 @Override
    public Feign build() {
      return build(null);
    }
Feign build(final FallbackFactory<?> nullableFallbackFactory) {
    super.invocationHandlerFactory(new InvocationHandlerFactory() {
        @Override
        public InvocationHandler create(Target target,
                                        Map<Method, MethodHandler> dispatch) {
            return new HystrixInvocationHandler(target, dispatch, setterFactory,
                                                nullableFallbackFactory);
        }
    });
    super.contract(new HystrixDelegatingContract(contract));
    return super.build();
}

HystrixInvocationHandler重要代码

@Override
public HystrixCommand.Setter create(Target<?> target, Method method) {
  String groupKey = target.name();
  String commandKey = Feign.configKey(target.type(), method);
  return HystrixCommand.Setter
      .withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
      .andCommandKey(HystrixCommandKey.Factory.asKey(commandKey));
}

一个接口一个线程池即groupKey,一个方法一个线程即commandKey

  HystrixInvocationHandler(Target<?> target, Map<Method, MethodHandler> dispatch,
      SetterFactory setterFactory, FallbackFactory<?> fallbackFactory) {
    this.target = checkNotNull(target, "target");
    this.dispatch = checkNotNull(dispatch, "dispatch");
    this.fallbackFactory = fallbackFactory;
    this.fallbackMethodMap = toFallbackMethod(dispatch);
    this.setterMethodMap = toSetters(setterFactory, target, dispatch.keySet());
  }
  /**
   * Process all methods in the target so that appropriate setters are created.
   */
  static Map<Method, Setter> toSetters(SetterFactory setterFactory,
                                       Target<?> target,
                                       Set<Method> methods) {
    Map<Method, Setter> result = new LinkedHashMap<Method, Setter>();
    for (Method method : methods) {
      method.setAccessible(true);
      result.put(method, setterFactory.create(target, method));
    }
    return result;
  }

@Override
public Object invoke(final Object proxy, final Method method, final Object[] args)
    throws Throwable {
  // early exit if the invoked method is from java.lang.Object
  // code is the same as ReflectiveFeign.FeignInvocationHandler
  if ("equals".equals(method.getName())) {
    try {
      Object otherHandler =
          args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
      return equals(otherHandler);
    } catch (IllegalArgumentException e) {
      return false;
    }
  } else if ("hashCode".equals(method.getName())) {
    return hashCode();
  } else if ("toString".equals(method.getName())) {
    return toString();
  }

  HystrixCommand<Object> hystrixCommand =
      new HystrixCommand<Object>(setterMethodMap.get(method)) {
        @Override
        protected Object run() throws Exception {
          try {
            return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
          } catch (Exception e) {
            throw e;
          } catch (Throwable t) {
            throw (Error) t;
          }
        }

        @Override
        protected Object getFallback() {
          if (fallbackFactory == null) {
            return super.getFallback();
          }
          try {
            Object fallback = fallbackFactory.create(getExecutionException());
            Object result = fallbackMethodMap.get(method).invoke(fallback, args);
            if (isReturnsHystrixCommand(method)) {
              return ((HystrixCommand) result).execute();
            } else if (isReturnsObservable(method)) {
              // Create a cold Observable
              return ((Observable) result).toBlocking().first();
            } else if (isReturnsSingle(method)) {
              // Create a cold Observable as a Single
              return ((Single) result).toObservable().toBlocking().first();
            } else if (isReturnsCompletable(method)) {
              ((Completable) result).await();
              return null;
            } else if (isReturnsCompletableFuture(method)) {
              return ((Future) result).get();
            } else {
              return result;
            }
          } catch (IllegalAccessException e) {
            // shouldn't happen as method is public due to being an interface
            throw new AssertionError(e);
          } catch (InvocationTargetException | ExecutionException e) {
            // Exceptions on fallback are tossed by Hystrix
            throw new AssertionError(e.getCause());
          } catch (InterruptedException e) {
            // Exceptions on fallback are tossed by Hystrix
            Thread.currentThread().interrupt();
            throw new AssertionError(e.getCause());
          }
        }
      };

  if (Util.isDefault(method)) {
    return hystrixCommand.execute();
  } else if (isReturnsHystrixCommand(method)) {
    return hystrixCommand;
  } else if (isReturnsObservable(method)) {
    // Create a cold Observable
    return hystrixCommand.toObservable();
  } else if (isReturnsSingle(method)) {
    // Create a cold Observable as a Single
    return hystrixCommand.toObservable().toSingle();
  } else if (isReturnsCompletable(method)) {
    return hystrixCommand.toObservable().toCompletable();
  } else if (isReturnsCompletableFuture(method)) {
    return new ObservableCompletableFuture<>(hystrixCommand);
  }
  return hystrixCommand.execute();
}

hystrixCommand.execute()中源码大量用到了rxjava来实现,暂时不太熟悉rxjava。后续填坑~

大致的逻辑是,hystrix内部实现了一个线程池来管理接口的调用,实现熔断限流降级等功能。

hystrixCommand.execute() 底层返回一个future.get()的对象,实现了异步调用