2、Dubbo SPI

烟雨 4年前 (2021-12-11) 阅读数 613 #Dubbo
文章标签 Dubbo

SPI(Service Provider Interface)

  • 将接口实现类的全限定名配置在文件中,并由服务加载器读取配置文件,加载实现类。这样可以在运行时,动态为接口替换实现类

  • 在Java中SPI是被用来设计给服务提供商做插件使用的。基于策略模式来实现动态加载的机制。程序只定义一个接口,具体的实现交个不同的服务提供者;在程序启动的时候,读取配置文件,由配置确定要调用哪一个实现类。

一、Java原生SPI

public interface Fruits {
    void getName();
}

public class Banana implements Fruits {
    public void getName() {
        System.out.println("name:Banana");
    }
}

public class Apple implements Fruits {
    public void getName() {
        System.out.println("name:Apple");
    }
}

public class JavaSPIDemo {
    public static void main(String[] args) {
        ServiceLoader<Fruits> serviceLoader = ServiceLoader.load(Fruits.class);
        serviceLoader.forEach(Fruits::getName);
    }
}

image.png

原生SPI缺点

  • 不能按需加载。Java SPI在加载扩展点的时候,会一次性加载所有可用的扩展点,很多是不需要的,会浪费系统资源。

  • 获取某个实现类的方式不够灵活,只能通过Iterator形式获取,不能根据某个参数来获取对应的实现类。

  • 不支持AOP与依赖注入。

  • JAVA SPI可能会丢失加载扩展点异常信息,导致追踪问题很困难。

1.1、Java原生SPI源码分析

public static <S> ServiceLoader<S> load(Class<S> service) {
    //获取当前线程ClassLoader
    ClassLoader cl = Thread.currentThread().getContextClassLoader();
    return ServiceLoader.load(service, cl);
}

public static <S> ServiceLoader<S> load(Class<S> service, ClassLoader loader{
    return new ServiceLoader<>(service, loader);
}
                                        
private ServiceLoader(Class<S> svc, ClassLoader cl) {
    service = Objects.requireNonNull(svc, "Service interface cannot be null");
    loader = (cl == null) ? ClassLoader.getSystemClassLoader() : cl;
    acc = (System.getSecurityManager() != null) ? AccessController.getContext() : null;
    reload();
}
                                        
public void reload() {
    //清除缓存
    providers.clear();
    lookupIterator = new LazyIterator(service, loader);
}
上面代码简单的说就是先找当前线程绑定的ClassLoader,如果没有就用SystemClassLoader(13行),然后清除一下缓存(20行),再创建一个LazyIterator(21行)。
LazyIterator是Iterator的实现类。通过LazyIterator去遍历加载文件里配置的类。
public boolean hasNext() {
    if (acc == null) {
        return hasNextService();
    } else {
        PrivilegedAction<Boolean> action = new PrivilegedAction<Boolean>() {
            public Boolean run() { return hasNextService(); }
        };
        return AccessController.doPrivileged(action, acc);
    }
}

public S next() {
    if (acc == null) {
        return nextService();
    } else {
        PrivilegedAction<S> action = new PrivilegedAction<S>() {
            public S run() { return nextService(); }
        };
        return AccessController.doPrivileged(action, acc);
    }
}

private boolean hasNextService() {
    if (nextName != null) {
        return true;
    }
    if (configs == null) {
        try {
            //得到文件所在路径
            String fullName = PREFIX + service.getName();
            if (loader == null)
                configs = ClassLoader.getSystemResources(fullName);
            else
                configs = loader.getResources(fullName);
        } catch (IOException x) {
            fail(service, "Error locating configuration files", x);
        }
    }
    //遍历文件内容
    while ((pending == null) || !pending.hasNext()) {
        if (!configs.hasMoreElements()) {
            return false;
        }
        //解析内容
        pending = parse(service, configs.nextElement());
    }
    nextName = pending.next();
    return true;
}


private S nextService() {
    if (!hasNextService())
        throw new NoSuchElementException();
    String cn = nextName;
    nextName = null;
    Class<?> c = null;
    try {
        //根据全限定类名加载类
        c = Class.forName(cn, false, loader);
    } catch (ClassNotFoundException x) {
        fail(service,
             "Provider " + cn + " not found");
    }
    if (!service.isAssignableFrom(c)) {
        fail(service,
             "Provider " + cn  + " not a subtype");
    }
    try {
        //创建实现类
        S p = service.cast(c.newInstance());
        //创建后缓存起来
        providers.put(cn, p);
        return p;
    } catch (Throwable x) {
        fail(service,
             "Provider " + cn + " could not be instantiated",
             x);
    }
    throw new Error();          // This cannot happen
}

二、dubbo SPI

首先在 META-INF/dubbo 目录下按接口全限定名建立一个文件(com.zender.dubbo.spi.Fruits),内容更改如下:
banana = com.zender.dubbo.spi.Banana
apple = com.zender.dubbo.spi.Apple

接口添加注解@SPI

@SPI
public interface Fruits {
    void getName();
}

测试类

public class DubboSPIDemo {

    @Test
    public void test() {
        ExtensionLoader<Fruits> extensionLoader = ExtensionLoader.getExtensionLoader(Fruits.class);
        Fruits banana = extensionLoader.getExtension("banana");
        banana.getName();

        Fruits apple = extensionLoader.getExtension("apple");
        apple.getName();
    }
}

image.png

Dubbo对配置文件目录的约定,不同于Java SPI ,Dubbo分为了三类目录。
  • META-INF/services/目录:该目录下的SPI配置文件是为了用来兼容Java SPI 。

  • META-INF/dubbo/目录:该目录存放用户自定义的SPI配置文件。

  • META-INF/dubbo/internal/目录:该目录存放Dubbo内部使用的SPI配置文件。

2.1、Dubbo SPI源码分析

ExtensionLoader表示某个接⼝的扩展点加载器,可以⽤来加载某个扩展点实例。在ExtensionLoader中除开有上⽂的static的Map外,还有两个非常重要的属性:
  1. Class<?> type:表示当前ExtensionLoader实例是哪个接⼝的扩展点加载器。

  2. ExtensionFactory objectFactory:扩展点工厂(对象工厂),可以获得某个对象。

ExtensionLoader和ExtensionFactory的区别

  1. ExtensionLoader最终所得到的对象是Dubbo SPI机制产⽣的。

  2. ExtensionFactory最终所得到的对象可能是Dubbo SPI机制所产⽣的,也可能是从Spring容器中所获得的对象。

ExtensionLoader中有三个常用的方法

  1. getExtension("dubbo"):表示获取名字为dubbo的扩展点实例。

  2. getAdaptiveExtension():表示获取⼀个⾃适应的扩展点实例。

  3. getActivateExtension(URL url, String[] values, String group):表示⼀个可以被url激活的扩展点。

什么是⾃适应扩展点实例?它其实就是当前这接口的⼀个代理对象。
Dubbo SPI大致流程就是先通过接口类找到一个ExtensionLoader ,然后再通过ExtensionLoader.getExtension(name) 得到指定名字的实现类实例。
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
    if (type == null) {
        throw new IllegalArgumentException("Extension type == null");
    }
    if (!type.isInterface()) {
        throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
    }
    if (!withExtensionAnnotation(type)) {
        throw new IllegalArgumentException("Extension type (" + type +
                                           ") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
    }
    // 从缓存里面找是否已经存在这个类型的ExtensionLoader
    ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    // 如果没有就新建一个塞入缓存。
    if (loader == null) {
        EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
        loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    }
    return loader;
}
在ExtensionLoader类的内部有⼀个static的ConcurrentHashMap,⽤来缓存某个接口类型所对应的ExtensionLoader实例
再看extensionLoader.getExtension()方法
public T getExtension(String name) {
    if (StringUtils.isEmpty(name)) {
        throw new IllegalArgumentException("Extension name == null");
    }
    // 获取默认扩展类
    if ("true".equals(name)) {
        return getDefaultExtension();
    }
    // 从缓存中获取
    final Holder<Object> holder = getOrCreateHolder(name);
    Object instance = holder.get();

    // 如果有两个线程同时来获取同一个name的扩展点对象,那只会有一个线程会进行创建
    // 包装一层holder就是用来当锁对象
    if (instance == null) {
        synchronized (holder) {
            instance = holder.get();
            if (instance == null) {
                // 创建扩展点实例对象
                instance = createExtension(name);   // 创建扩展点对象
                holder.set(instance);
            }
        }
    }
    return (T) instance;
}
在调⽤getExtension()方法去获取⼀个扩展点实例后,会对实例进⾏缓存,下次再获取同样名字的扩展点实例时就会从缓存中拿。
重点就是createExtension()方法,创建扩展点对象。在调⽤createExtension(String name)⽅法去创建⼀个扩展点实例时,要经过以下几个步骤:
  1. 根据name找到对应的扩展点实现类。

  2. 根据实现类⽣成⼀个实例,把实现类和对应⽣成的实例进⾏缓存。

  3. 对⽣成出来的实例进⾏依赖注⼊(给实例的属性进⾏赋值)。

  4. 对依赖注⼊后的实例进⾏AOP(Wrapper),把当前接⼝类的所有的Wrapper全部⼀层⼀层包裹在实例对象上,没包裹个Wrapper后,也会对Wrapper对象进⾏依赖注⼊。

  5. 返回最终的Wrapper对象。

private T createExtension(String name) {
    // 获取扩展类  {name: Class}  key-Value    接口的所有实现类
    Class<?> clazz = getExtensionClasses().get(name);
    if (clazz == null) {
        throw findException(name);
    }

    try {
        // 看看实例是否缓存
        T instance = (T) EXTENSION_INSTANCES.get(clazz);
        if (instance == null) {
            // 如果没有缓存,创建实例放入缓存
            EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
            instance = (T) EXTENSION_INSTANCES.get(clazz);
        }

        // 依赖注入 IOC
        injectExtension(instance);

        // AOP
        Set<Class<?>> wrapperClasses = cachedWrapperClasses;
        if (CollectionUtils.isNotEmpty(wrapperClasses)) {
            // 如果有包装类型,包装一下
            for (Class<?> wrapperClass : wrapperClasses) {
                // 比如:new CarWrapper(instance)--->CarWrapper实例
                instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
            }
        }

        return instance;
    } catch (Throwable t) {
        throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
                                        type + ") couldn't be instantiated: " + t.getMessage(), t);
    }
}
整体流程很清晰,先找实现类,判断缓存是否有实例,没有就反射创建实例,然后执行set方法依赖注入。如果有找到包装类(Wrapper)的话,再包一层
getExtensionClasses()方法是如何找到实现类的?
getExtensionClasses()是⽤来加载当前接⼝所有的扩展点实现类的,返回⼀个Map。之后可以从这个Map中按照指定的name获取对应的扩展点实现类。
当把当前接⼝的所有扩展点实现类都加载出来后也会进⾏缓存,下次需要加载时直接拿缓存中的。Dubbo在加载⼀个接⼝的扩展点时,思路是这样的:
  1. 根据接⼝的全限定名去META-INF/dubbo/internal/⽬录下寻找对应的⽂件,调⽤loadResource⽅法进⾏加载。

  2. 根据接⼝的全限定名去META-INF/dubbo/⽬录下寻找对应的⽂件,调⽤loadResource⽅法进⾏加载。

  3. 根据接⼝的全限定名去META-INF/services/⽬录下寻找对应的⽂件,调⽤loadResource⽅法进⾏加载。

/**
* 加载当前ExtensionLoader对象中指定的接口的所有扩展
*/
private Map<String, Class<?>> getExtensionClasses() {
    Map<String, Class<?>> classes = cachedClasses.get();
    if (classes == null) {
        synchronized (cachedClasses) {
            classes = cachedClasses.get();
            if (classes == null) {
                classes = loadExtensionClasses(); // 加载、解析文件 Map
                cachedClasses.set(classes);
            }
        }
    }
    return classes;
}

private Map<String, Class<?>> loadExtensionClasses() {
    // cache接口默认的扩展类
    cacheDefaultExtensionName();

    Map<String, Class<?>> extensionClasses = new HashMap<>();
    // 根据相应目录去查找
    loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName());
    loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
    loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName());
    loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
    loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName());
    loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
    return extensionClasses;
}

private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type) {
    String fileName = dir + type;
    try {
        // 根据文件中的内容得到urls, 每个url表示一个扩展http=org.apache.dubbo.rpc.protocol.http.HttpProtocol
        Enumeration<java.net.URL> urls;
        ClassLoader classLoader = findClassLoader();
        if (classLoader != null) {
            urls = classLoader.getResources(fileName);
        } else {
            urls = ClassLoader.getSystemResources(fileName);
        }
        if (urls != null) {
            while (urls.hasMoreElements()) {
                java.net.URL resourceURL = urls.nextElement();
                // 遍历url进行加载,把扩展类添加到extensionClasses中
                loadResource(extensionClasses, classLoader, resourceURL);
            }
        }
    } catch (Throwable t) {
        logger.error("Exception occurred when loading extension class (interface: " +
                     type + ", description file: " + fileName + ").", t);
    }
}

/**
*loadResource⽅法就是完成对⽂件内容的解析,按⾏进⾏解析,会解析出"="两边的内容,"="左边的内容就是扩展点的name,右边的内容就是扩展点实现类,
* 并且会利⽤ExtensionLoader类的类加载器来加载扩展点实现类。然后调⽤loadClass⽅法对name和扩展点实例进⾏详细的解析,并且最终把他们放到Map中。
*/
private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader, java.net.URL resourceURL) {
    try {
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), StandardCharsets.UTF_8))) {
            String line;
            while ((line = reader.readLine()) != null) {
                final int ci = line.indexOf('#');
                if (ci >= 0) {
                    line = line.substring(0, ci);
                }
                line = line.trim();
                if (line.length() > 0) {
                    try {
                        String name = null;
                        int i = line.indexOf('=');
                        if (i > 0) {
                            name = line.substring(0, i).trim();
                            line = line.substring(i + 1).trim();
                        }
                        if (line.length() > 0) {
                            // 加载类,并添加到extensionClasses中
                            loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name);
                        }
                    } catch (Throwable t) {
                        IllegalStateException e = new IllegalStateException("Failed to load extension class (interface: " + type + ", class line: " + line + ") in " + resourceURL + ", cause: " + t.getMessage(), t);
                        exceptions.put(line, e);
                    }
                }
            }
        }
    } catch (Throwable t) {
        logger.error("Exception occurred when loading extension class (interface: " +
                     type + ", class file: " + resourceURL + ") in " + resourceURL, t);
    }
}

private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
    if (!type.isAssignableFrom(clazz)) {
        throw new IllegalStateException("Error occurred when loading extension class (interface: " +
                                        type + ", class line: " + clazz.getName() + "), class "
                                        + clazz.getName() + " is not subtype of interface.");
    }
    // 当前接口手动指定了Adaptive类
    if (clazz.isAnnotationPresent(Adaptive.class)) {
        cacheAdaptiveClass(clazz);
    } else if (isWrapperClass(clazz)) {
        // 是一个Wrapper类
        cacheWrapperClass(clazz);
    } else {
        // 需要有无参的构造方法
        clazz.getConstructor();

        // 在文件中没有name,但是在类上指定了Extension的注解上指定了name
        if (StringUtils.isEmpty(name)) {
            name = findAnnotationName(clazz);
            if (name.length() == 0) {
                throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
            }
        }

        String[] names = NAME_SEPARATOR.split(name);
        if (ArrayUtils.isNotEmpty(names)) {
            // 缓存一下被Activate注解了的类
            cacheActivateClass(clazz, names[0]);

            // 有多个名字
            for (String n : names) {
                // clazz: name
                cacheName(clazz, n);
                // name: clazz
                saveInExtensionClass(extensionClasses, clazz, n);
            }
        }
    }
}

loadResource()方法

loadResource()方法就是完成对⽂件内容的解析,会解析出"="两边的内容,"="左边的内容就是扩展点的name,右边的内容就是扩展点实现类,并且会利⽤ExtensionLoader类的类加载器来加载扩展点实现类。然后调⽤loadClass⽅法对name和扩展点实例进⾏详细的解析,并且最终把他们放到Map中。

loadClass()⽅法

loadClass()方法会做如下几件事情:
  1. 当前扩展点实现类上是否存在@Adaptive注解,如果存在则把该类认为是当前接⼝的默认⾃适应类(接⼝代理类),并把该类存到cachedAdaptiveClass属性上。

  2. 当前扩展点实现是否是⼀个当前接⼝的⼀个Wrapper类,如果判断的?就是看当前类中是否存在⼀个构造⽅法,该构造⽅法只有⼀个参数,参数类型为接⼝类型,如果存在这⼀的构造⽅法,那么这个类就是该接⼝的Wrapper类,如果是,则把该类添加到cachedWrapperClasses中去,cachedWrapperClasses是⼀个set。

  3. 如果不是⾃适应类,或者也不是Wrapper类,则判断是有存在name,如果没有name,则报错。

  4. 如果有多个name,则判断⼀下当前扩展点实现类上是否存在@Activate注解,如果存在,则把该类添加到cachedActivates中,cachedWrapperClasses是⼀个map。

  5. 最后,遍历多个name,把每个name和对应的实现类存到extensionClasses中去,extensionClasses就是上⽂所提到的map。

2.2、Dubbo 自适应扩展(@Adaptive)

Dubbo自适应扩展是在运行时动态的选用哪一种扩展类来提供服务。对应于Adaptive机制,Dubbo提供了一个注解@Adaptive,该注解可以用于接口的某个子类上,也可以用于接口方法上。
如果用在接口的子类上,则表示Adaptive机制的实现会按照该子类的方式进行自定义实现。
如果用在方法上,则表示Dubbo会为该接口自动生成一个子类(动态拼接代码),并且按照一定的格式重写该方法,而其余没有标注@Adaptive注解的方法将会默认抛出异常。
前面分析的getAdaptiveExtension()方法会通过传入的type,通过这个type创建一个ExtensionLoader。源码如下:
// 从缓存里面找是否已经存在这个类型的ExtensionLoader
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
// 如果没有就新建一个塞入缓存。
if (loader == null) {
    EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
    loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}

在ExtensionLoader构造的时候就会去通过getAdaptiveExtension()方法获取指定的扩展类的ExtensionFactory。

private ExtensionLoader(Class<?> type) {
        this.type = type;
        // ExtensionFactory表示扩展类实例工厂,可以利用ExtensionFactory得到某个扩展的对象实例
        // 得到ExtensionFactory接口的adaptive实例-AdaptiveExtensionFactory实例,利用AdaptiveExtensionFactory实例来获取某个类型或名字的实例对象
        objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());   // AdaptiveExtensionFactory
    }

getAdaptiveExtension()方法继续跟进下去。

public T getAdaptiveExtension() {
    Object instance = cachedAdaptiveInstance.get();
    if (instance == null) {
        if (createAdaptiveInstanceError != null) {
            throw new IllegalStateException("Failed to create adaptive instance: " +
                                            createAdaptiveInstanceError.toString(),
                                            createAdaptiveInstanceError);
        }

        synchronized (cachedAdaptiveInstance) {
            instance = cachedAdaptiveInstance.get();
            if (instance == null) {
                try {
                    //创建
                    instance = createAdaptiveExtension();
                    cachedAdaptiveInstance.set(instance);
                } catch (Throwable t) {
                    createAdaptiveInstanceError = t;
                    throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
                }
            }
        }
    }

    return (T) instance;
}

private T createAdaptiveExtension() {
    try {
        return injectExtension((T) getAdaptiveExtensionClass().newInstance());
    } catch (Exception e) {
        throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
    }
}

private Class<?> getAdaptiveExtensionClass() {
    // 获取当前接口的所有扩展类
    getExtensionClasses(); //
    // 缓存了@Adaptive注解标记的类
    if (cachedAdaptiveClass != null) {
        return cachedAdaptiveClass;
    }
    // 如果某个接口没有手动指定一个Adaptive类,那么就自动生成一个Adaptive类
    return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

//动态拼接代码
private Class<?> createAdaptiveExtensionClass() {
    // cachedDefaultName表示接口默认的扩展类
    String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();

    ClassLoader classLoader = findClassLoader();
    org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
    return compiler.compile(code, classLoader);
}


//类似IOC
private T injectExtension(T instance) {

    if (objectFactory == null) {
        return instance;
    }

    try {
        for (Method method : instance.getClass().getMethods()) {
            if (!isSetter(method)) {
                continue;
            }

            // 利用set方法注入
            /**
                 * Check {@link DisableInject} to see if we need auto injection for this property
                 */
            if (method.getAnnotation(DisableInject.class) != null) {
                continue;
            }

            // set方法中的参数类型
            Class<?> pt = method.getParameterTypes()[0];   // Person接口
            if (ReflectUtils.isPrimitives(pt)) {
                continue;
            }

            try {
                // 得到setXxx中的xxx
                String property = getSetterProperty(method);   // person

                // 根据参数类型或属性名,从objectFactory中获取到对象,然后调用set方法进行注入
                Object object = objectFactory.getExtension(pt, property); // User.class, user
                if (object != null) {
                    method.invoke(instance, object);
                }
            } catch (Exception e) {
                logger.error("Failed to inject via method " + method.getName()
                             + " of interface " + type.getName() + ": " + e.getMessage(), e);
            }

        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
    return instance;
}

ExtensionFactory的实现类AdaptiveExtensionFactory,SpiExtensionFactory,通过这2个实现类获取实例。

@Override
public <T> T getExtension(Class<T> type, String name) {
    // 遍历两个ExtensionFactory,从ExtensionFactory中得到实例,只要从某个ExtensionFactory中获取到对象实例就可以了
    for (ExtensionFactory factory : factories) {
        T extension = factory.getExtension(type, name);  // SpringExtensionFactory或者SpiExtensionFactory
        if (extension != null) {
            return extension;
        }
    }
    return null;
}


版权声明

非特殊说明,本文由Zender原创或收集发布,欢迎转载。

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

作者文章
热门