SOFA源码学习

2018-09-19  本文已影响0人  旺旺大仙贝

这里源码主要学习服务的发布和引用

为什么要有服务的发布和引用

基本功能

VM服务发布和引用

用于同一个Sofa应用不同模块之间服务的相互调用,有XML配置和注解两个方式

1.XML配置

服务发布

public interface JvmService(){
    String get();
}
public Class JvmServiceImpl extend JvmService(){
    @Overrride
    public String get(){
        return "Jvm";
    }
}
<bean id="jvmService" class="cn.test.impl.JvmServiceImpl"/>
<sofa:service ref="jvmService" interface="cn.test.JvmService"/>

服务引用

<sofa:reference id="jvmService" interface="cn.test.JvmService"/>
@Autowired
private JvmService jvmService;

2.注解

服务发布

@SofaService
public class JvmServiceImpl implements JvmService {
    @Override
    public String get() {
        return "JVM";
    }
}

服务引用

@SofaReference
private JvmService jvmService;

RPC服务发布和引用

用于跨机调用不同Sofa应用之间服务的发布和引用,目前只支持XML配置方式

服务发布

<sofa:service ref="rpcService " interface="cn.test.RpcService">
        <sofa:binding.tr/><!-- 加上这句,表示这个服务被注册为基于taobao remoteing的RPC服务 -->
</sofa:service>

服务引用

<sofa:reference id="rpcService" interface="cn.test.RpcService">
    <sofa:binding.tr/><!-- 加上这句,表示要引入的这个服务是基于taobao remoteing的RPC服务 -->
</sofa:reference>
@Autowired
private RpcService rpcService;

使用总结

源码分析

sofa4源代码地址:http://gitlab.alipay-inc.com/jiuzhou-middleware/sofa4
sofa-rpc源代码地址:http://gitlab.alipay-inc.com/jiuzhou-middleware/sofa-rpc

服务发布和引用

标签解析:生成bean

1.在xml中引入sofa命名空间

// sofa命名空间
xmlns:sofa="http://schema.alipay.com/sofa/schema/service"
// 该命名空间的唯一标示
xsi:schemaLocation="http://schema.alipay.com/sofa/schema/service
http://schema.alipay.com/sofa/sofa-service-4-0-0.xsd"

2.通知Spring加载sofa标签

sofa标签是通过Spring来进行解析和加载的,我们需要做的是告诉Spring如何解析这个标签,通知Spring的方式就是通过spring.schemas和spring.handlers两个文件来完成的。在sofa-runtime-service-x.x.x.jar这个jar包下的META-INF中定义了这两个文件:

http\://schema.alipay.com/sofa/common/sofa-service-4-0-0.xsd=com/alipay/sofa/service/config/common/sofa-service.xsd
http\://schema.alipay.com/sofa/schema/service=com.alipay.sofa.runtime.spring.SofaNamespaceHandler

可以看出,sofa:service和sofa:reference标签的解析是通过com.alipay.sofa.runtime.spring.SofaNamespaceHandler这个类来完成的(实际上这个类并没有进行解析,只是为相应标签注册解析器)。

3.解析sofa:service和sofa:reference标签

可以看出,SofaNamespaceHandler首先获得这个appname下的所有BeanDefinitionParser,将其注册到spring中,然后这些BeanDefinitionParser会根据不同的tagName对其进行解析。

public class SofaNamespaceHandler extends NamespaceHandlerSupport {
    ...//
    public void init() {
        registerBeanDefinitionParsers();
        registerBeanDefinitionDecorators();
    }
    private void registerBeanDefinitionParsers() {
        BeanDefinitionParserRegistry beanDefinitionParserRegistry = SofaFrameworkHolder.injector
            .getInstance(PluginServiceManager.class).getBeanDefinitionParserRegistry(appName);
        Set<BeanDefinitionParser> beanDefinitionParsers = beanDefinitionParserRegistry
            .getBeanDefinitionParsers();
        if (beanDefinitionParsers == null) {
            return;
        }
        for (BeanDefinitionParser beanDefinitionParser : beanDefinitionParsers) {
            if (!(beanDefinitionParser instanceof TagNameSupport)) {
                continue;
            }
            String tagName = ((TagNameSupport) beanDefinitionParser).supportTagName();
            registerBeanDefinitionParser(tagName, beanDefinitionParser);
        }
    }
}
package com.alipay.sofa.runtime.service.spring;
public class ServiceDefinitionParser extends AbstractContractDefinitionParser {
    private static final String                  REF      = "ref";
    private static final String                  BEAN_ID  = "beanId";
    private static final ServiceDefinitionParser instance = new ServiceDefinitionParser();

    private ServiceDefinitionParser() { }
    public static ServiceDefinitionParser getInstance() { return instance;}
    @Override
    protected void doParseInternal(Element element, ParserContext parserContext,
                                   BeanDefinitionBuilder builder) {
        String ref = element.getAttribute(REF);
        builder.addPropertyReference(REF, ref);
        builder.addPropertyValue(BEAN_ID, ref);
    }
    @Override
    protected Class getBeanClass(Element element) {
        return ServiceFactoryBean.class;
    }
    // 配置 <sofa:service> 的时候是不需要配置 id 属性的,所以我们必须给生成一个
    @Override
    protected boolean shouldGenerateIdAsFallback() { return true;}
    @Override
    public String supportTagName() { return "service";}
}
package com.alipay.sofa.runtime.service.spring;
public class ReferenceDefinitionParser extends AbstractContractDefinitionParser {
    ...//
    @Override
    protected Class getBeanClass(Element element) {
        return ReferenceFactoryBean.class;
    }
    @Override
    public String supportTagName() {
        return "reference";
    }
}

服务发布:生成&注册组件

服务发布解析结果是生成ServiceFactoryBean,这是一个工厂类,根据getObject方法来生成service对象实例,对象生成时会触发doAfterPropertiesSet进行初始化,生成一个sofa服务组件对象componentInfo,并通过register方法将组件注册到sofa上下文sofaRuntimeContext中。

package com.alipay.sofa.runtime.service.spring;
public class ServiceFactoryBean extends AbstractContractFactoryBean {
    protected Object  ref;
    protected String  beanId;
    protected Service service;

    @Override
    protected void doAfterPropertiesSet() throws Exception {
        // Issue #62 判断 Bean 的实现类上是否有 @SofaService 的注解
        if (hasSofaServiceAnnotation()) {
            throw new ServiceRuntimeException(
                "Bean " + beanId + " of type " + ref.getClass()
                        + " has already annotated by @SofaService,"
                        + " can not be registered using xml. Please check it.");
        }
        Implementation implementation = new DefaultImplementation();
        implementation.setTarget(ref);
        service = buildService();
        if (bindings.size() == 0) { bindings.add(new JvmBinding());}
        for (Binding binding : bindings) { service.addBinding(binding);}

        ComponentInfo componentInfo = new ServiceComponent(implementation, service,
            sofaRuntimeContext);
        sofaRuntimeContext.getComponentManager().register(componentInfo);
    }
    ...//
    @Override
    public Object getObject() throws Exception {
        return service;
    }
    ...//
}

查看ComponentManager的register方法实现,可以看到依次调用服务组件的register(),resolve(),activate()三个方法,把组件对象塞到全局的registry中,程序运行中,所有模块都可以通过registry获得想要的服务。

public class ComponentManagerImpl implements ComponentManager {
    @Override
    public synchronized void register(ComponentInfo componentInfo) {
        _register(componentInfo);
    }
    private ComponentInfo _register(ComponentInfo ci) {
        ComponentName name = ci.getName();
        if (isRegistered(name)) {
            LOGGER.error("Component was already registered: " + name);
            return getComponentInfo(name);
        }
        try {
            ci.register();
        } catch (Exception e) {
            LOGGER.error("Failed to register component: " + ci.getName(), e);
            return null;
        }
        LOGGER.info("Registering component: " + ci.getName());
        try {
            registry.put(ci.getName(), ci);
            if (ci.resolve()) {
                // 组册到类型容器中
                _typeRegistry(ci);
                boolean isLazyActivate;
                if (ci instanceof AbstractComponent) {
                    isLazyActivate = ((AbstractComponent) ci).isLazyActivateInternal();
                } else {
                    isLazyActivate = ci.isLazyActivate();
                }
                if (!isLazyActivate) {
                    ci.activate();
                } else {
                    lazyComponents.put(ci.getName(), ci);
                }
            }
        } catch (Exception e) {
            ci.exception(e);
            LOGGER.error("Failed to create the component " + ci.getName(), e);
        }
        return ci;
    }
}

查看register(),resolve(),active()三个方法的实现,可以看出register和resolve两个方法只是简单的改变一下组件的状态

UNREGISTERED(0, "撤销注册"), REGISTERED(0, "已注册"), RESOLVED(0, "已解析"), ACTIVATED(0, "已激活");
public abstract class AbstractComponent implements ComponentInfo {
    @Override
    public void register() {
        if (componentStatus != ComponentStatus.UNREGISTERED) {
            return;
        }
        componentStatus = ComponentStatus.REGISTERED;
    }

    @Override
    public boolean resolve() {
        if (componentStatus != ComponentStatus.REGISTERED) {
            return false;
        }
        componentStatus = ComponentStatus.RESOLVED;
        return true;
    }

    @Override
    public void activate() throws ServiceRuntimeException {
        if (componentStatus != ComponentStatus.RESOLVED) {
            return;
        }
        if (!checkImplementation()) {
            return;
        }
        Object target = this.implementation.getTarget();
        if (target instanceof ComponentLifeCycle) {
            ((ComponentLifeCycle) target).activate();
        } else {
            invokeWithArgs("activate");
        }
        componentStatus = ComponentStatus.ACTIVATED;
    }
}

ServiceComponent重写的active方法如下,可以看到activate会获取到服务的 Binding 集,然后迭代加载每一个 Binding 对应 BindingAdapter,再调用 outBinding 对外暴露服务。

public class ServiceComponent extends AbstractComponent {
    @Override
    public void activate() throws ServiceRuntimeException {
        activateBinding();
        super.activate();
    }

    private void activateBinding() {
        Object target = service.getTarget();
        if (target == null) {
            throw new ServiceRuntimeException(
                "Must contains the target object whiling registering Service.");
        }
        // 存在绑定。
        if (service.hasBinding()) {
            Set<Binding> bindings = service.getBindings();
            Iterator<Binding> it = bindings.iterator();
            boolean allPassed = true;
            while (it.hasNext()) {
                Binding binding = it.next();
                BindingAdapter<Binding> bindingAdapter = this.bindingAdapterManager.loadAdapter(binding.getBindingType());
                if (bindingAdapter == null) {
                    ...//异常
                }
                Object outBindingResult;
                SofaLogger.BINDING_LOG.info(" <<Out Binding [" + binding.getBindingType() + "] Begins - " + service);
                try {
                    outBindingResult = bindingAdapter.outBinding(service, binding, target, getContext());
                } catch (Throwable t) {
                    allPassed = false;
                    SofaLogger.BINDING_LOG.error(" <<Out binding [" + binding.getBindingType() + "] for [" + service + "] occur exception, ", t);
                    continue;
                }
                if (outBindingResult != null && !Boolean.FALSE.equals(outBindingResult)) {
                    SofaLogger.BINDING_LOG.info(" <<Out Binding [" + binding.getBindingType() + "] Ends - " + service);
                } else {
                    binding.setHealthy(false);
                    SofaLogger.BINDING_LOG.info(" <<Out Binding [" + binding.getBindingType() + "] Fails, Don't publish service - " + service);
                }
            }
        }
        SofaLogger.BINDING_LOG.info("Register Service - " + service);
    }
}

JVM服务的Adater是JvmBindingAdapter,对于JVM服务来说是内部调用不需要暴露给外部,因此JVM服务的outBinding返回的是null,同一应用下其他模块调用该服务直接查本地注册中心registry即可。

public class JvmBindingAdapter implements BindingAdapter<JvmBinding> {
    @Override
    public Object outBinding(Object contract, JvmBinding binding, Object target,
                             SofaRuntimeContext sofaRuntimeContext) {
        return null;
    }
}

Rpc服务的Adapter是RpcBindingAdapter,对于RPC服务需要暴露给外部,因此RpcBindingAdapter的outBinding方法中会将服务推送到远程注册中心中。这样其他应用就可以通过注册中心调用该服务。

public abstract class RpcBindingAdapter<T extends RpcBinding> implements BindingAdapter<T> {
    @Override
    public Object outBinding(Object contract, T binding, Object target,
                             SofaRuntimeContext sofaRuntimeContext) {
        if (!judgeBinding(contract, binding, target, sofaRuntimeContext)) {
            return Boolean.FALSE;
        }
        String uniqueName = getUniqueName((Contract) contract);
        ServiceMetadata metadata = getServiceMetadata(uniqueName);
        if (metadata == null) {
            if (notNeedPublishServiceSet.contains(uniqueName)) {
                return Boolean.FALSE;
            } else {
                throw new ServiceRuntimeException(LogCodes.getLog(LogCodes.INFO_SERVICE_METADATA_IS_NULL, uniqueName));
            }
        }
        try {
            // 将服务推送到远程注册中心
            this.getProcessService().provide(metadata);
        } catch (Exception e) {
            throw new ServiceRuntimeException(LogCodes.getLog(LogCodes.ERROR_PROXY_PUBLISH_FAIL), e);
        }
        return Boolean.TRUE;
    }
}

服务引用:找到&注册组件

标签解析的结果是生成ReferenceFactoryBean,Reference的目标就是把sofa runtime context的一个组件变成spring中的一个bean,核心代码在于proxy = ReferenceRegisterHelper.registerReference(reference, sofaRuntimeContext,applicationContext);这一句,这句代码会从SOFA上下文中,拿到服务对应的代理对象。getObject方法会会返回该对象proxy。

package com.alipay.sofa.runtime.spring.factory;
public class ReferenceFactoryBean extends AbstractContractFactoryBean {
    private Object    proxy;
    protected boolean localFirst = true;
    protected boolean jvmService;

    @Override
    protected void doAfterPropertiesSet() throws Exception {
        Reference reference = buildReference();
        Assert
            .isTrue(bindings.size() <= 1,
                "Found more than one binding in <sofa:reference/>, <sofa:reference/> can only have one binding.");
        if (bindings.size() == 0) {
            bindings.add(new JvmBinding());
        }
        reference.addBinding(bindings.get(0));
        proxy = ReferenceRegisterHelper.registerReference(reference, sofaRuntimeContext);
    }

    @Override
    public Object getObject() throws Exception {
        return proxy;
    }
}

深入看到registerReference方法来看它是如何拿到服务对应的对象的。主要做了两个事:

  1. 给RPC服务引用自动增加一个JVM绑定,也就是一个RPC服务默认会有JVM和RPC两个binding,以达到优先本地JVM调用
  2. sofa的componentManager将该reference注册成一个组件,注册之后生成引用服务的代理对象。
package com.alipay.sofa.runtime.service.helper;
public class ReferenceRegisterHelper {
    public static Object registerReference(Reference reference,
                                           SofaRuntimeContext sofaRuntimeContext) {
        Binding binding = (Binding) reference.getBindings().toArray()[0];
        if (reference.jvmService() && binding.getBindingType().equals(JvmBinding.JVM_BINDING_TYPE)) {
            throw new ServiceRuntimeException(
                "jvm-service=\"true\" can not be used with JVM binding.");
        }
        // 优先本地调用
        if (!binding.getBindingType().equals(JvmBinding.JVM_BINDING_TYPE)
            && isLocalFirst(reference, sofaRuntimeContext)) {
            reference.addBinding(new JvmBinding());
        }
        ComponentManager componentManager = sofaRuntimeContext.getComponentManager();
        ReferenceComponent referenceComponent = new ReferenceComponent(reference,
            new DefaultImplementation(), sofaRuntimeContext);
        // 保证若不存在,则注册为原子操作
        // 去掉此处 ReferenceRegisterHelper 的锁,此处的锁不需要,ComponentManager 中已带锁,背景见 http://gitlab.alipay-inc.com/jiuzhou-middleware/sofa4/issues/257
        // 如果component已注册,返回已注册的component
        if (componentManager.isRegistered(referenceComponent.getName())) {
            return componentManager.getComponentInfo(referenceComponent.getName())
                .getImplementation().getTarget();
        }
        ComponentInfo componentInfo = componentManager.registerAndGet(referenceComponent);
        return componentInfo.getImplementation().getTarget();
    }
}

再查看ComponentManager的registerAndGet方法,可以看到也是调用了之间注册组件的_register方法。register() -> resolve() -> activate() ,其中register和resolve也只是改变状态,代理对象的生成在active方法中完成。

public class ComponentManagerImpl implements ComponentManager {
    @Override
    public void register(ComponentInfo componentInfo) {
        _register(componentInfo);
    }
}

引用组件ReferenceComponent的active实现如下,代理对象的生成与binding有很大关系,不同类型的binding会生成不同类型的代理对象(JVM和RPC)。如果reference上只有一个binding,这只要使用这个binding生成代理对象即可;如果有多个binding,则优先使用jvm binding来生成本地调用的代理对象,其他类型的binding生成的远程调用的代理对象作为jvm binding生成的对象的备选,这样做也是为了优先本地调用,如果本地代理对象不存在,则会使用远程代理对象。

public class ReferenceComponent extends AbstractComponent {
    @Override
    public void activate() throws ServiceRuntimeException {
        if (reference.hasBinding()) {
            Binding candidate = null;
            Set<Binding> bindings = reference.getBindings();
            // 如果binding个数大于1,说明设置了jvm优先调用
            if (bindings.size() == 1) {
                candidate = bindings.iterator().next();
            } else if (bindings.size() > 1) {
                // 有多个说明设置了local-first=true,要求本地调用优先
                Object backupProxy = null;
                for (Binding binding : bindings) {
                    if ("jvm".equals(binding.getName())) {
                        candidate = binding;
                    } else {
                        backupProxy = createProxy(reference, binding);
                    }
                }
                if (candidate != null) {
                    // 一般不会出现 candidate 为 null 的情况,一个 Reference 的绑定,至少会有一个 JVM 绑定
                    ((JvmBinding) candidate).setBackupProxy(backupProxy);
                }
            }
            Object proxy = null;
            // 生成代理对象
            if (candidate != null) {
                proxy = createProxy(reference, candidate);
            }
            if (proxy != null) {
                try {
                    // Spring xml 引用服务, 具体参考 http://gitlab.alipay-inc.com/alipay-sofa/sofa-dynamic-module-runtime/issues/6
                    // 这里直接 new 空的 ApplicationContext 单纯为了兼容 XTS 使用的老接口
                    InterfaceMode interfaceMode = reference.getInterfaceMode();
                    if (interfaceMode.equals(InterfaceMode.spring)) {
                        proxy = postProcessReferenceProxy(reference, proxy,
                            new GenericApplicationContext());
                    } else {
                        proxy = postProcessReferenceProxy(reference, proxy, null);
                    }
                } catch (Throwable e) {
                    SofaLogger.BINDING_LOG.error(
                        "Failed to invoke post process reference, use origin proxy instead.", e);
                }
            }
            this.implementation = new DefaultImplementation();
            implementation.setTarget(proxy);
        }
        publishAsService(reference, implementation.getTarget());
        super.activate();
        latch.countDown();
    }
}

那么,如何通过binding来生成代理对象呢?就是通过createProxy中调用不同bindingAdapter的inBinding方法,借助于动态代理技术进行生成。

private Object createProxy(Reference reference, Binding binding) {
    BindingAdapter<Binding> bindingAdapter = this.bindingAdapterManager.loadAdapter(binding
        .getBindingType());
    if (bindingAdapter == null) {
        ...// 异常
    }
    SofaLogger.BINDING_LOG.info(" >>In Binding [" + binding.getBindingType() + "] Begins - " + reference);
    Object proxy = null;
    try {
        proxy = bindingAdapter.inBinding(reference, binding, sofaRuntimeContext);
    } finally {
        SofaLogger.BINDING_LOG.info(" >>In Binding [" + binding.getBindingType() + "] Ends - " + reference);
    }
    return proxy;
}
public class JvmBindingAdapter implements BindingAdapter<JvmBinding> {
    public Object inBinding(Object contract, JvmBinding binding,
                            SofaRuntimeContext sofaRuntimeContext) {
        return createServiceProxy((Contract) contract, binding, sofaRuntimeContext);
    }
    protected Object createServiceProxy(Contract contract, JvmBinding binding,
                                        SofaRuntimeContext sofaRuntimeContext) {
        ...//
        ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(newClassLoader);
            // 生成代理对象
            ServiceProxy handler = new JvmServiceInvoker(contract, binding, sofaRuntimeContext);
            ...
        }
    }
}

总结图

image

总结

解析

生成组件

服务发布注册组件

服务引用注册组件

上一篇下一篇

猜你喜欢

热点阅读