Dubbo扩展点加载

问题

  • ExtensionLoader是做什么用的?ExtensionFactory是做什么用的?
  • @Adaptive(自适应)用在类上起什么作用?@Adaptive用在方法上起什么作用?
  • @Activate、@Activate(“xxx”)的作用?
  • 类上标注@SPI(“netty”)的作用?
  • 如何自己添加一个扩展点?
  • IOC, AOP分别是如何实现的?
  • getExtension和getAdaptiveExtension的区别

Dubbo扩展点

Dubbo的扩展点加载从JDK标准的SPI(Service Provider Interface)扩展点发现机制加强而来。所以我们先看看Java的SPI可以做什么。

Java SPI

概念

SPI是一种简单的服务提供和发现的机制。这里的服务(Service)是指一组发布的接口或者抽象类;服务提供者((service provider)是接口或者抽象类的实现。
接口发布者只提供一组接口,不同的厂商提供不同的实现,不同的实现可以通过SPI的方式发现和加载。例如:jdbc,SUN只提供了java.sql.Driver, 由各个厂商去实现具体的Driver,如com.mysql.jdbc.Driver。

可以实验一下,不用Class.forName,而使用SPI的方式加载mysql的Driver

示例

首先,接口的提供着要提供一个查找关键字的功能。接口定义如下:

1
2
3
4
package com.nb.spi;
public interface Search {
List<String> search(String keyword);
}

厂商1实现了在文件中查找,并生成了filesearch.jar, 实现如下:

1
2
3
4
5
6
7
package com.file.spi;
public class FileSearch implements Search{
public List<String> search(String keyword) {
System.out.println("Call File Search!");
return null;
}
}

并且,在jar内的META-INF/services/目录下添加一个文件,文件名是接口名cn.nb.spi.Search,文件内容是com.file.spi.FileSearch

厂商2实现了在数据库中查找,并生成了databasesearch.jar, 实现如下:

1
2
3
4
5
6
7
package com.db.spi;
public class DatabaseSearch implements Search{
public List<String> search(String keyword) {
System.out.println("Call DB Search!");
return null;
}
}

同样,在jar内的META-INF/services/目录下添加一个文件,文件名是接口名cn.nb.spi.Search,文件内容是com.db.spi.DatabaseSearch

在需要使用search功能的时候,通过ServiceLoader加载:

1
2
3
4
5
6
7
8
9
10
public class SPIHello {
public static void main(String[] args) {
ServiceLoader<Search> s = ServiceLoader.load(Search.class);
Iterator<Search> searchs = s.iterator();
if (searchs.hasNext()) {
Search curSearch = searchs.next();
curSearch.search("test");
}
}
}

在ServiceLoader执行load方法时,会根据META-INF/services/下提供的实现名称加载实现。当引入filesearch.jar时,打印Call File Search!; 当引入databasesearch.jar时,打印Call DB Search!

为什么不直接使用Java SPI

  • JDK标准的SPI会一次性实例化扩展点所有实现,如果有扩展实现初始化很耗时,但如果没用上也加载,会很浪费资源。
  • 如果扩展点加载失败,连扩展点的名称都拿不到了。比如:JDK标准的ScriptEngine,通过getName();获取脚本类型的名称,但如果RubyScriptEngine因为所依赖的jruby.jar不存在,导致RubyScriptEngine类加载失败,这个失败原因被吃掉了,和ruby对应不起来,当用户执行ruby脚本时,会报不支持ruby,而不是真正失败的原因。
  • 增加了对扩展点IoC和AOP的支持,一个扩展点可以直接setter注入其它扩展点。
    综上所述,实际上,Dubbo只是模仿了SPI的方式,并且把功能做了扩展,在实现上已经和Java的SPI没有任何关系了。相关内容可以查看Dubbo的相关文档扩展点加载

Dubbo入口

执行Dubbo,实际上是执行了com.alibaba.dubbo.container.Main.main。首先,会初始化Main的静态成员变量,其中最重要的变量是private static final ExtensionLoader<Container> loader = ExtensionLoader.getExtensionLoader(Container.class);,下面,我们就从ExtensionLoader开始,逐步分析Dubbo扩展点的加载过程。

ExtensionLoader

getExtensionLoader

Dubbo的扩展都是通过调用ExtensionLoader的getExtension方法加载的,可以类比ClassLoader。例如,在com.alibaba.dubbo.container.Main.main中,首先从dubbo.container配置中读取到了log4j,spring。然后分别调用loader.getExtension("log4j")loader.getExtension("spring")来加载log4j和spring容器。每个类都会有自己的ExtensionLoader

1
2
3
4
5
6
7
8
9
10
11
12
@SuppressWarnings("unchecked")
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
......
//EXTENSION_LOADERS变量缓存了各种类型的ExtensionLoader,如果已经存在,就从缓存中获取,如果没有就调用new方法新建
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS
.get(type);
if (loader == null) {
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}

在ExtensionLoader的构造方法中可以看到,ExtensionLoader有两个属性,一个是type,标识了这个ExtensionLoader所属的类型,另一个是ObjectFactory,类型是ExtensionFactory。当调用Loader.getExtension时,事实上是调用了objectFactory的getExtension方法来加载扩展点的。

Dubbo的微内核做得非常的彻底,ExtensionFactory也是一个扩展点,也需要通过ExtensionLoader<ExtensionFactory>加载,其objectFactory = null, 否则会陷入到先有鸡还是先有蛋的循环。随后,会调用ExtensionLoader<ExtensionFactory>的getAdaptiveExtension方法来查找加载Contrainer的objectFactory.

Container的ExtensionLoader的初始化

主要过程如图所示:
初始化过程

  • public T getAdaptiveExtension()
    • 首先试图从cachedAdaptiveInstance中获取扩展实例,如果没有,则调用createAdaptiveExtension();方法新建,然后缓存到cachedAdaptiveInstance
  • private T createAdaptiveExtension()
    • return injectExtension((T) getAdaptiveExtensionClass().newInstance());调用getAdaptiveExtensionClass()获取到class以后再进行实例化,然后使用injectExtension对实例从的扩展成员进行注入,注入使用的是实例的set方法。
  • private Class<?> getAdaptiveExtensionClass()
    • 首先调用getExtensionClasses(),该方法中可能会设置cachedAdaptiveClass以及cachedClasses。如果cachedAdaptiveClass设置了,则会直接返回cachedAdaptiveClass。否则,调用createAdaptiveExtensionClass()方法新建,并保持在cachedAdaptiveClass中。
  • private Map<String, Class<?>> getExtensionClasses()
    • 该方法使用loadExtensionClasses()获取所有的扩展类,并缓存在cachedClasses中。
  • private Map<String, Class<?>> loadExtensionClasses()
    • 首先,如果type有SPI注释,则获取名称,保存到cachedDefaultName变量中,cachedDefaultName是默认的扩展实例的名称。
      然后调用loadFile方法在以下两个目录(META-INF/services/,META-INF/dubbo/)中查询配置中的扩展定义,并添加到extensionClasses中。
    • 对于ExtensionFactory来说,默认情况下,会读取dubbo-common中的META-INF/dubbo/internal/com.alibaba.dubbo.common.extension.ExtensionFactory,文件中有两个实现,分别是:
    • 1
      2
      3
      //dubbo-common
      adaptive=com.alibaba.dubbo.common.extension.factory.AdaptiveExtensionFactory
      spi=com.alibaba.dubbo.common.extension.factory.SpiExtensionFactory
1
2
//dubbo-config-spring
spring=com.alibaba.dubbo.config.spring.extension.SpringExtensionFactory
* 读入第一行数据,初始化name=adaptive, line = com.alibaba.dubbo.common.extension.factory.AdaptiveExtensionFactory
* 使用`Class<?> clazz = Class.forName(line, true, classLoader);`加载类
* `!type.isAssignableFrom(clazz)`:验证加载的类是否是当前type的一个实现
* `clazz.isAnnotationPresent(Adaptive.class)`,如果设置了@Adaptive,则把clazz保存在cachedAdaptiveClass中, AdaptiveExtensionFactory设置了@Adaptive。
* 对于SpiExtensionFactory和SpringExtensionFactory,都没有设置@Adaptive,所以都被添加进了cachedClasses变量
* 如果没有设置cachedAdaptiveClass,则需要调用`createAdaptiveExtensionClass();`方法动态生成
  • (T) getAdaptiveExtensionClass().newInstance()
    对上一步生成的cachedAdaptiveClass进行实例化。对于ExtensionFactory来说,cachedAdaptiveClassAdaptiveExtensionFactory。 所以这里会执行AdaptiveExtensionFactory的构造方法。
1
2
3
4
5
6
7
8
9
10
11
public AdaptiveExtensionFactory() {
//直接从缓存中获取
ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
//loader.getSupportedExtensions方法从cachedClasses中获取key,依次获取到"spi"和"spring"
//把SPI extensionFactory以及Spring extengFactory放进Factory list
for (String name : loader.getSupportedExtensions()) {
list.add(loader.getExtension(name));
}
factories = Collections.unmodifiableList(list);
}

然后回到createAdaptiveExtension方法中, 调用injectExtension后返回给objectFactory赋值,这时contrainer的ExtensionLoader才正式生成了。objectFactory是AdaptiveExtensionFactory,通过AdaptiveExtensionFactory的getExtension方法,会先后依次调用spiExtensionFactory和springExtensionFactory获取扩展。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private T createAdaptiveExtension() {
try {
//
return injectExtension((T) getAdaptiveExtensionClass()
.newInstance());
} catch (Exception e) {
throw new IllegalStateException(
"Can not create adaptive extenstion " + type + ", cause: "
+ e.getMessage(), e);
}
}
//该方法是Adaptive的核心方法之一,首先获取set方法,获取到属性名称,然后调用objectFactory的getExtension方法加载依赖属性
private T injectExtension(T instance) {
try {
if (objectFactory != null) {
for (Method method : instance.getClass().getMethods()) {
//查找public修饰的,只有一个变量的set方法
if (method.getName().startsWith("set")
&& method.getParameterTypes().length == 1
&& Modifier.isPublic(method.getModifiers())) {
//获取变量类型
Class<?> pt = method.getParameterTypes()[0];
try {
//获取属性名
String property = method.getName().length() > 3 ? method
.getName().substring(3, 4).toLowerCase()
+ method.getName().substring(4)
: "";
//调用ExtensionFactory加载扩展点
Object object = objectFactory.getExtension(pt,
property);
if (object != null) {
//调用set方法进行注入
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error(
"fail to inject via method "
+ method.getName()
+ " of interface " + type.getName()
+ ": " + e.getMessage(), e);
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}

main方法-根据config获取container

从配置文件中获取dubbo.container的内容,这里是”log4j,spring”。
然后通过ExtensionLoader逐个加载container。

1
2
3
for (int i = 0; i < args.length; i ++) {
containers.add(loader.getExtension(args[i]));
}
  • public T getExtension(String name)
    同上面一样,这里做一个缓存,如果没有的话,调用createExtension方法新建
  • private T createExtension(String name)
    这里主要获取class,然后调用newInstance进行实例化,需要注意的是,这里用了两次injectExtension方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && wrapperClasses.size() > 0) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(
type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name
+ ", class: " + type + ") could not be instantiated: "
+ t.getMessage(), t);
}
}

这里实例化了log4jcontainer和springcontainer。初始化contrainer完成后,会打印一条类似这样的日志:[22/07/15 11:11:21:021 CST] main INFO container.Main: [DUBBO] Use container type([log4j, spring]) to run dubbo serivce., dubbo version: 2.0.0, current host: 127.0.0.1

关键问题

如何实现IOC

在injectExtension(T instance)方法中实现,上面已经分析过

如何实现AOP

通过Wrapper类来实现的。

  • loadFile的调用过程中,如果发现类上没有注释@Adaptive,并且有拷贝构造函数,那么判定为扩展点的Wrapper类。Wrapper类会被添加到cachedWrapperClasses(Set<Class<?>>)变量中。
1
2
3
4
5
6
7
8
9
try {
clazz.getConstructor(type);
Set<Class<?>> wrappers = cachedWrapperClasses;
if (wrappers == null) {
cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
wrappers = cachedWrapperClasses;
}
wrappers.add(clazz);
} catch (NoSuchMethodException e) {
  • 在createExtension方法中
1
2
3
4
5
6
7
8
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && wrapperClasses.size() > 0) {
for (Class<?> wrapperClass : wrapperClasses) {
//依次调用拷贝构造函数生成Wrapper的实例,并且再次调用injectExtension注入Wrapper中其他属性
instance = injectExtension((T) wrapperClass.getConstructor(
type).newInstance(instance));
}
}

类上标注类似@SPI(“netty”)的作用

  • 有@SPI的扩展点,会使用SpiExtensionFactory加载。加载过程如下:
1
2
3
4
5
6
7
8
9
10
public <T> T getExtension(Class<T> type, String name) {
if (type.isInterface() && type.isAnnotationPresent(SPI.class)) {
ExtensionLoader<T> loader = ExtensionLoader.getExtensionLoader(type);
if (loader.getSupportedExtensions().size() > 0) {
//调用getAdaptiveExtension加载扩展点
return loader.getAdaptiveExtension();
}
}
return null;
}
  • getAdaptiveExtension()方法经过多个方法调用,然后会调用createAdaptiveExtensionClass();这个方法会动态的生成一个Class,并且通过这个Class可以对调用的方法进行动态的选择。下面我们逐步分析.
    • 生成code:createAdaptiveExtensionClassCode(),该方法会根据当前的type,用字符串拼接一个类。
    • 加载CompilerClassLoader classLoader = findClassLoader(); com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class)
    • 调用Compiler编译, compiler.compile(code, classLoader);, 默认使用JavassistCompiler
    • Javassist的用法
    • Javassist guide
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//生成code:createAdaptiveExtensionClassCode
//用CacheFactory举例
@SPI("lru")
public interface CacheFactory {
@Adaptive("cache")
Cache getCache(URL url);
}
//createAdaptiveExtensionClassCode拼成的类,实现了getCache(URL url)方法
public class CacheFactory$Adpative implements
com.alibaba.dubbo.cache.CacheFactory {
public com.alibaba.dubbo.cache.Cache getCache(
com.alibaba.dubbo.common.URL arg0) {
if (arg0 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg0;
//获取参数名,"cache"是从@Adaptive获取的,lru是从@SPI获取的。
//如果url中有cache参数,则extName为cache参数的值;否则,就为lru
//通过这样的方法,getCache可以动态的选择实现
String extName = url.getParameter("cache", "lru");
if (extName == null)
throw new IllegalStateException(
"Fail to get extension(com.alibaba.dubbo.cache.CacheFactory) name from url("
+ url.toString() + ") use keys([cache])");
com.alibaba.dubbo.cache.CacheFactory extension = (com.alibaba.dubbo.cache.CacheFactory) ExtensionLoader
.getExtensionLoader(com.alibaba.dubbo.cache.CacheFactory.class)
.getExtension(extName);
return extension.getCache(arg0);
}
}
public String getParameter(String key, String defaultValue) {
String value = getParameter(key);
if (value == null || value.length() == 0) {
return defaultValue;
}
return value;
}

@Activate、@Activate(“xxx”)的作用

自动激活,通过调用ExtensionLoader的getActivateExtension(URL url, String key)方法获取自动激活的List。例如下面的方法

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        return protocol.export(invoker);
    }
    return new ListenerExporterWrapper<T>(protocol.export(invoker), 
            Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                    .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}