dubbo源码浅析

Dubbo 为了更好地达到 OCP 原则(即“对扩展开放,对修改封闭”的原则),采用了“微内核+插件”的架构。那什么是微内核架构呢?微内核架构也被称为插件化架构(Plug-in Architecture),这是一种面向功能进行拆分的可扩展性架构。内核功能是比较稳定的,只负责管理插件的生命周期,不会因为系统功能的扩展而不断进行修改。功能上的扩展全部封装到插件之中,插件模块是独立存在的模块,包含特定的功能,能拓展内核系统的功能。

微内核常见的方式有SPI(Service Provider Interface)、Factory、IOC、OSGI等。dubbo采用的是SPI,参考了JDK原生的SPI,并进行了优化和功能增强。

JDK SPI

需要在Classpath 下的 META-INF/services/目录创建一个以接口名命名的文件。mysql的driver就是使用的此方式,spring中相关框架也有相关应用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
public interface Log { 
    void log(String info); 
}

public class Logback implements Log { 
    @Override 
    public void log(String info) { 
        System.out.println("Logback:" + info); 
    } 
} 

public class Log4j implements Log { 
    @Override 
    public void log(String info) { 
        System.out.println("Log4j:" + info); 
    } 
} 

在项目的 resources/META-INF/services 目录下添加一个名为 com.xxx.Log 的文件,这是 JDK SPI 需要读取的配置文件,具体内容如下:

1
2
com.xxx.impl.Log4j 
com.xxx.impl.Logback 

使用方法如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public class Main { 
    public static void main(String[] args) { 
        ServiceLoader<Log> serviceLoader =  
                ServiceLoader.load(Log.class); 
        Iterator<Log> iterator = serviceLoader.iterator(); 
        while (iterator.hasNext()) { 
            Log log = iterator.next(); 
            log.log("JDK SPI");  
        } 
    } 
} 

JDK SPI缺点

所有的实现类都会被加载并初始化,如果没有使用就会有资源的浪费。

dubbo SPI

dubbo SPI将配置文件改为kv形式,在需要用到的时候再进行加载并初始化,并且使用的时候也不需要遍历SPI所有的接口。

@SPI注解

dubbo中某个接口被@SPI注解修饰时,就表示该接口是dubbo扩展接口

1
2
3
4
@SPI(value = "dubbo", scope = ExtensionScope.FRAMEWORK)
public interface Protocol {
//忽略其内容
}

@SPI 注解的 value 值指定了默认的扩展名称,例如,在通过 Dubbo SPI 加载 Protocol 接口实现时,如果没有明确指定扩展名,则默认会将 @SPI 注解的 value 值作为扩展名,即加载 dubbo 这个扩展名对应的 org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol 这个扩展实现类

1
2
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getDefaultExtension();//与下面等价
//Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("dubbo");

ExtensionLoader.getExtension()方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private T createExtension(String name, boolean wrap) {
    //查找对应的class对象
    Class<?> clazz = getExtensionClasses().get(name);
    if (clazz == null || unacceptableExceptions.contains(name)) {
        throw findException(name);
    }
    try {
        //尝试从缓存中获取
        T instance = (T) extensionInstances.get(clazz);
        if (instance == null) {
            //创建对象
            extensionInstances.putIfAbsent(clazz, createExtensionInstance(clazz));
            instance = (T) extensionInstances.get(clazz);
            //后置处理器
            instance = postProcessBeforeInitialization(instance, name);
            //依赖注入,查找所有的setter方法,有合适的就调用setter方法进行注入
            injectExtension(instance);
            //后置处理器
            instance = postProcessAfterInitialization(instance, name);
        }

        if (wrap) {
            List<Class<?>> wrapperClassesList = new ArrayList<>();
            if (cachedWrapperClasses != null) {
                wrapperClassesList.addAll(cachedWrapperClasses);
                wrapperClassesList.sort(WrapperComparator.COMPARATOR);
                Collections.reverse(wrapperClassesList);
            }

            if (CollectionUtils.isNotEmpty(wrapperClassesList)) {
                for (Class<?> wrapperClass : wrapperClassesList) {
                    Wrapper wrapper = wrapperClass.getAnnotation(Wrapper.class);
                    boolean match = (wrapper == null) ||
                        ((ArrayUtils.isEmpty(wrapper.matches()) || ArrayUtils.contains(wrapper.matches(), name)) &&
                            !ArrayUtils.contains(wrapper.mismatches(), name));
                    if (match) {
                        instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                        instance = postProcessAfterInitialization(instance, name);
                    }
                }
            }
        }

        // Warning: After an instance of Lifecycle is wrapped by cachedWrapperClasses, it may not still be Lifecycle instance, this application may not invoke the lifecycle.initialize hook.
        initExtension(instance);
        return instance;
    } catch (Throwable t) {
        throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
            type + ") couldn't be instantiated: " + t.getMessage(), t);
    }
}

@Adaptive 注解与适配器

@Adaptive 注解用来实现dubbo的适配器功能。

Adaptive 注解在类上的情况很少,在 Dubbo 中,仅有两个类被 Adaptive 注解了,分别是 AdaptiveCompiler 和 AdaptiveExtensionFactory。此种情况,表示拓展的加载逻辑由人工编码完成。

加到方法上,较常用。表示拓展的加载逻辑需由框架自动生成,dubbo借助了javaassist框架,进行相关代码生成。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//以下代码,由于自己的强迫症(部分用了StringBuilder,部分没用),对代码做了略微修改
public String generate(boolean sort) {
    // 如果接口中,没有方法加Adaptive注解,则直接抛异常
    if (!hasAdaptiveMethod()) {
        throw new IllegalStateException("No adaptive method exist on extension " + type.getName() + ", refuse to create the adaptive class!");
    }

    StringBuilder code = new StringBuilder();
    //包名
    generatePackageInfo(code);
    //import相关代码
    generateImports(code);
    //类名,以LoadBalance接口为例,生成的名字为LoadBalance$Adaptive
    generateClassDeclaration(code);

    Method[] methods = type.getMethods();
    if (sort) {
        Arrays.sort(methods, Comparator.comparing(Method::toString));
    }
    //生成所有的方法
    for (Method method : methods) {
        generateMethod(method,code);
    }
    code.append('}');

    String generatedCode = code.toString();
    if (logger.isDebugEnabled()) {
        logger.debug(generatedCode);
    }
    return generatedCode;
}

@Activate注解与自动激活特性

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Activate {
    /**
     * Activate the current extension when one of the groups matches. The group passed into
     * {@link ExtensionLoader#getActivateExtension(URL, String, String)} will be used for matching.
     *
     * @return group names to match
     * @see ExtensionLoader#getActivateExtension(URL, String, String)
     */
    //修饰实现类是在provider端还是在consumer端
    String[] group() default {};

    /**
     * Activate the current extension when the specified keys appear in the URL's parameters.
     * <p>
     * For example, given <code>@Activate("cache, validation")</code>, the current extension will be return only when
     * there's either <code>cache</code> or <code>validation</code> key appeared in the URL's parameters.
     * </p>
     *
     * @return URL parameter keys
     * @see ExtensionLoader#getActivateExtension(URL, String)
     * @see ExtensionLoader#getActivateExtension(URL, String, String)
     */
    //url参数中出现指定的key才被激活
    String[] value() default {};

    /**
     * Absolute ordering info, optional
     *
     * Ascending order, smaller values will be in the front o the list.
     *
     * @return absolute ordering info
     */
    //确定实现类的排序,
    int order() default 0;
}

使用方法

1
2
3
URL url = URL.valueOf("test://localhost/test");
List<ActivateExt1> list = getExtensionLoader(ActivateExt1.class)
    .getActivateExtension(url, new String[]{}, "order");

总结

dubbo中无非就是用了以上SPI的功能,来实现相关的功能。理解了以上代码,再逐个去看dubbo中的相关代码。在需要扩展或者修改的时候,就可以通过SPI来实现。我之前实现了一个按id定向loadbalance的扩展,就是参考了LoadBalance接口源码,进行了相关扩展。

在使用**@DubboReference@DubboService**,在相关的注解配置中,使用自己扩展的功能即可。如下

1
2
@DubboReference(interfaceClass = Admin.class,loadbalance = "myhash")
   private Admin admin;
Talk is cheap, show me the bug/code.
使用 Hugo 构建
主题 StackJimmy 设计