DK's Notes DK's Notes
首页
导航站
  • Java-Se

    • Java基础
  • Java-Se进阶-多线程

    • 多线程
  • Java-Se进阶-java8新特性

    • java8新特性
  • Java-ee

    • JavaWeb
  • Java虚拟机

    • JVM
  • SQL 数据库

    • MySQL
  • NoSQL 数据库

    • Redis
    • ElasticSearch
    • MongoDB
  • ORM

    • MyBatis
    • MyBatis-Plus
  • Spring

    • Spring
  • SpringMVC

    • SpringMVC1
    • SpringMVC2
  • SpringCloud

    • SpringCloud
  • 中间件

    • RabbitMQ
    • Dubbo
  • 秒杀项目
  • Git
  • Linux
  • Docker
  • JWT
  • 面试
  • 刷题
开发问题😈
设计模式
关于💕
归档🕛
GitHub (opens new window)

风

摸鱼
首页
导航站
  • Java-Se

    • Java基础
  • Java-Se进阶-多线程

    • 多线程
  • Java-Se进阶-java8新特性

    • java8新特性
  • Java-ee

    • JavaWeb
  • Java虚拟机

    • JVM
  • SQL 数据库

    • MySQL
  • NoSQL 数据库

    • Redis
    • ElasticSearch
    • MongoDB
  • ORM

    • MyBatis
    • MyBatis-Plus
  • Spring

    • Spring
  • SpringMVC

    • SpringMVC1
    • SpringMVC2
  • SpringCloud

    • SpringCloud
  • 中间件

    • RabbitMQ
    • Dubbo
  • 秒杀项目
  • Git
  • Linux
  • Docker
  • JWT
  • 面试
  • 刷题
开发问题😈
设计模式
关于💕
归档🕛
GitHub (opens new window)
  • mybatis

  • mybatis-plus

  • Spring

  • SpringMvc

  • RabbitMQ

  • Dubbo

    • Dubbo知识体系
    • Dubbo
    • 服务注册(provider服务暴露)
    • 服务发现(consumer服务引入)
      • 摘要
      • 服务发现大致原理
      • 服务发现开端
      • BeanFactory 、FactoryBean、ObjectFactory
      • 服务发现的三种方式
        • 本地服务发现
        • 远程服务发现
        • 注册中心发现
      • 服务发现源码解析
        • init()源码
        • createProxy()
        • RegistryProtocol.refer()
        • RegistryProtocol.doCreateInvoker()
      • 总结
    • 服务调用过程
    • SPI机制
    • 负载均衡机制
    • 服务容错、降级
    • Dubbo的服务异常处理
  • SpringCloud

  • 框架
  • Dubbo
zdk
2022-08-01
目录

服务发现(consumer服务引入)

Table of Contents generated with DocToc (opens new window)

  • 摘要
  • 服务发现大致原理
  • 服务发现开端
  • BeanFactory 、FactoryBean、ObjectFactory
  • 服务发现的三种方式
    • 本地服务发现
    • 远程服务发现
    • 注册中心发现
  • 服务发现源码解析
    • init()源码
    • createProxy()
    • RegistryProtocol.refer()
    • RegistryProtocol.doCreateInvoker()
  • 总结

# 摘要

Dubbo服务引入全流程

# 服务发现大致原理

Provider将注册自己的信息到注册中心, Consumer操作从注册中心得知 Provider 的信息,然后自己封装一个调用类和Provider 进行深入地交流。而之前已经提到在 Dubbo中一个可执行体就是 Invoker,所有调用都要向 Invoker 靠拢,因此可以推断出应该要先生成一个 Invoker,然后又因为框架需要往不侵入业务代码的方向发展,那我们的 Consumer 需要无感知的调用远程接口,因此需要搞个代理类,包装一下屏蔽底层的细节。整体大致流程如下:

# 服务发现开端

服务的引入和服务的暴露一样,也是通过 spring 自定义标签机制解析生成对应的 Bean,Provider Service 对应解析的是 ServiceBean 而 Consumer Reference 对应的是 ReferenceBean

image.png

服务在 Spring 容器刷新完成之后开始暴露,而服务的引入时机有两种,第一种是饿汉式,第二种是懒汉式。

  • 饿汉式是通过实现 Spring 的InitializingBean接口中的 afterPropertiesSet 方法,容器通过调用 ReferenceBean 的 afterPropertiesSet 方法时引入服务。
  • 懒汉式是只有当这个服务被注入到其他类中时启动引入流程,也就是说用到了才会开始服务引入。
  • 默认情况下,Dubbo 使用懒汉式引入服务,如果需要使用饿汉式,可通过配置 dubbo:reference 的 init 属性开启。

# BeanFactory 、FactoryBean、ObjectFactory

  • BeanFactory 其实就是 IOC 容器,简单的说就是 Spring 里面的 Bean 都归它管,而FactoryBean也是 Bean 所以说也是归 BeanFactory 管理的。

  • 那 FactoryBean 到底是个什么 Bean 呢?它其实就是把我们真实想要的 Bean 封装了一层,在真正要获取这个 Bean 的时候容器会调用 FactoryBean#getObject() 方法,而在这个方法里面你可以进行一些复杂的组装操作。这个方法就封装了我们真实想要的对象复杂的创建过程。使用 FactoryBean 封装了一层,屏蔽了底层创建的细节,便于 Bean 的使用。

  • ObjectFactory 这个是用于延迟查找的场景,它就是一个普通工厂,当得到 ObjectFactory 对象时,相当于 Bean 没有被创建,只有当 getObject() 方法时,才会触发 Bean 实例化等生命周期。 主要用于暂时性地获取某个 Bean Holder 对象,如果过早的加载,可能会引起一些意外的情况,比如当 Bean A 依赖 Bean B 时,如果过早地初始化 A,那么 B 里面的状态可能是中间状态,这时候使用 A 容易导致一些错误。(循环依赖的错误)。

总结的说 BeanFactory 就是 IOC 容器,FactoryBean 是特殊的 Bean, 用来封装创建比较复杂的对象,而 ObjectFactory 主要用于延迟查找的场景,延迟实例化对象。

# 服务发现的三种方式

服务的引入又分为了三种,第一种是本地引入;第二种是直接连接引入远程服务;第三种是通过注册中心引入远程服务。 image.png

# 本地服务发现

含义

之前服务暴露的流程每个服务都会通过一个本地暴露,走 injvm 协议(当然要是 scope = remote 就没本地引用了),因为存在一个服务端既是 Provider 又是 Consumer 的情况,然后有可能自己会调用自己的服务,因此就弄了一个本地引入,这样就避免了远程网络调用的开销。所以服务引入会先去本地缓存找找看有没有本地服务。

# 远程服务发现

这个其实就是平日测试的情况下用用,不需要启动注册中心,由 Consumer 直接配置写死Provider 的地址,然后直连即可

# 注册中心发现

含义

Consumer 通过注册中心得知 Provider 的相关信息,然后进行服务的引入,这里还包括多注册中心,同一个服务多个提供者的情况,如何抉择如何封装,如何进行负载均衡、容错并且让使用者无感知

# 服务发现源码解析

默认是懒汉式的,所以服务引入的入口就是 ReferenceBean 的 getObject 方法

@Override
public Object getObject() {
    //懒汉式的
    return get();
}
1
2
3
4
5
public synchronized T get() {
    //懒汉式的
    if (destroyed) {
        throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
    }
    if (ref == null) {
        init();
    }
    return ref;
}
1
2
3
4
5
6
7
8
9
10

# init()源码

//大部分就是检查配置然后将配置构建成 map  主要的方式的是 ref = createProxy(map);
public synchronized void init() {
    //******** 省略前面的代码
        
    serviceMetadata.getAttachments().putAll(map);
    //从名字可以得到就是要创建的一个代理
    ref = createProxy(map);
    
    serviceMetadata.setTarget(ref);
    serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
    ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
    consumerModel.setProxyObject(ref);
    consumerModel.init(attributes);
    
    initialized = true;
    
    checkInvokerAvailable();
    
    // dispatch a ReferenceConfigInitializedEvent since 2.7.4
    dispatch(new ReferenceConfigInitializedEvent(this, invoker));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# createProxy()

    @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
    private T createProxy(Map<String, String> map) {
        if (shouldJvmRefer(map)) {
            //如果是走本地的话,那么直接构建个走本地协议的 URL 然后进行服务的引入
            //  LOCAL_PROTOCOL: injvm , LOCALHOST_VALUE: 127.0.0.1
            URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
            invoker = REF_PROTOCOL.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else {
            //如果配置里面设置了url ,那要么是点对点直连,要么是配置中心地址,都经过处理加入到urls中
            urls.clear();
            if (url != null && url.length() > 0) {
                String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        //得到配置的url,进行循环
                        URL url = URL.valueOf(u);
                        if (StringUtils.isEmpty(url.getPath())) {
                            url = url.setPath(interfaceName);
                        }
                        if (UrlUtils.isRegistry(url)) {
                            //如果是注册中心地址将 map转换为查询字符串,并作为 refer 参数的值添加到 url中
                            urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            //如果是点对点会合并url,移除服务提供者的一些配置
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else {
                // 然后就是没配置 url 的情况  如果配置了url 那么不是直连的地址,到这里肯定走的就是注册中心引入远程服务了。
                if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
                    checkRegistry();
                    List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
                    if (CollectionUtils.isNotEmpty(us)) {
                        for (URL u : us) {
                            URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
                            if (monitorUrl != null) {
                                map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            }
                            urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                        }
                    }
                    if (urls.isEmpty()) {
                        throw new IllegalStateException(
                                "No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() +
                                        " use dubbo version " + Version.getVersion() +
                                        ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                    }
                }
            }
 
            if (urls.size() == 1) {
                //如果只有一个URL直接转换成invoker
                invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    // 多个url挨个转换成invoker
                    Invoker<?> referInvoker = REF_PROTOCOL.refer(interfaceClass, url);
                    if (shouldCheck()) {
                        if (referInvoker.isAvailable()) {
                            invokers.add(referInvoker);
                        } else {
                            referInvoker.destroy();
                        }
                    } else {
                        invokers.add(referInvoker);
                    }
 
                    if (UrlUtils.isRegistry(url)) {
                        //用最后一个注册中心的地址
                        registryURL = url; // use last registry url
                    }
                }
 
                if (shouldCheck() && invokers.size() == 0) {
                    throw new IllegalStateException("Failed to check the status of the service "
                            + interfaceName
                            + ". No provider available for the service "
                            + (group == null ? "" : group + "/")
                            + interfaceName +
                            (version == null ? "" : ":" + version)
                            + " from the multi registry cluster"
                            + " use dubbo version " + Version.getVersion());
                }
 
                if (registryURL != null) { // registry url is available
                    // for multi-subscription scenario, use 'zone-aware' policy by default
                    String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
                    // The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
                    // //创建StaticDirectory实例,并由 Cluster对多个Invoker 进行合并,只暴露出一个 invoker便于调用
                    invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
                } else { // not a registry url, must be direct invoke.
                    String cluster = CollectionUtils.isNotEmpty(invokers)
                            ?
                            (invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) :
                                    Cluster.DEFAULT)
                            : Cluster.DEFAULT;
                    invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
                }
            }
        }
 
        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }
 
        URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
        MetadataUtils.publishServiceDefinition(consumerURL);
 
        // create service proxy
        // 通过代理封装invoker返回代理
        return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118

  • 整个流程简述一下就是,先检查配置,通过配置构建一个 map ,然后利用 map 来构建 URL,再通过 URL 上的协议利用自适应扩展机制调用对应的 protocol.refer 得到相应的 invoker 。
  • 在有多个 URL 的时候,先遍历构建出 invoker 然后再由 StaticDirectory 封装一下,然后通过 cluster 进行合并,只暴露出一个 invoker 。
  • 然后再构建代理,封装 invoker 返回服务引用,之后 Comsumer 调用的就是这个代理类。

# RegistryProtocol.refer()

    @Override
    @SuppressWarnings("unchecked")
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        //取registry参数值,并将其设置为协议头
        url = getRegistryUrl(url);
        // 获取中心实例
        Registry registry = getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }
 
        // group="a,b" or group="*"  将参数转为map 
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        String group = qs.get(GROUP_KEY);
        if (group != null && group.length() > 0) {
            // 如果分group 的话
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                return doRefer(Cluster.getCluster(MergeableCluster.NAME), registry, type, url, qs);
            }
        }
 
        Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
        return doRefer(cluster, registry, type, url, qs);
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# RegistryProtocol.doCreateInvoker()

主要就是获取注册中心实例,然后调用doCreateInvoker()进行真正的 refer

 

protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) {
        // 设置注册中心实例
        directory.setRegistry(registry);
        // 设置动态生成的protocol $Adaptive
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
        // 生成服务者消费链接
        URL urlToRegistry = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (directory.isShouldRegister()) {
            directory.setRegisteredConsumerUrl(urlToRegistry);
            // 向注册中心 注册消费者 在consumer 目录下创建新节点
            registry.register(directory.getRegisteredConsumerUrl());
        }
        directory.buildRouterChain(urlToRegistry);
        //再订阅注册中心的 providers目录、 configurators目录和routers目录,订阅好了之后就会触发 DubboProtocol的refer方法.
        directory.subscribe(toSubscribeUrl(urlToRegistry));
        //利用cluser封装direcotry其实就是封装多个invoker
        return (ClusterInvoker<T>) cluster.join(directory);
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    // 设置注册中心实例
    directory.setRegistry(registry);
    // 设置动态生成的protocol $Adaptive
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY
    Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
    // 生成服务者消费链接
    URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    if (directory.isShouldRegister()) {
        directory.setRegisteredConsumerUrl(subscribeUrl);
        // 向注册中心 注册消费者 在consumer 目录下创建新节点
        registry.register(directory.getRegisteredConsumerUrl());
    }
    directory.buildRouterChain(subscribeUrl);
    //再订阅注册中心的 providers目录、 configurators目录和routers目录,
    //订阅好了之后就会触发 DubboProtocol的refer方法.
    directory.subscribe(toSubscribeUrl(subscribeUrl));
    //利用cluser封装direcotry:其实就是封装多个invoker
    Invoker<T> invoker = cluster.join(directory);
    List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
    if (CollectionUtils.isEmpty(listeners)) {
        return invoker;
    }
    RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
    for (RegistryProtocolListener listener : listeners) {
        listener.onRefer(this, registryInvokerWrapper);
    }
    return registryInvokerWrapper;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

这个方法很关键,可以看到生成了 RegistryDirectory 这个 directory,并向其中set了注册中心实例,它自身也实现了 NotifyListener 接口,因此注册中心的监听其实是它来处理的。

然后向注册中心注册自身的信息,并且向注册中心订阅了 providers 节点、 configurators 节点 和 routers 节点,订阅了之后 RegistryDirectory 会收到这几个节点下的信息,就会触发 DubboInvoker的生成了,即用于远程调用的 Invoker。

然后通过 cluster 再包装一下得到 Invoker,因此一个服务可能有多个提供者,最终在 ProviderConsumerRegTable 中记录这些信息,然后返回 Invoker。 这时我们从注册中心拿到了远程provider的信息,然后执行DubboProtocol.getClients()方法进行服务的引入

private ExchangeClient[] getClients(URL url) {
    // 是否共享连接
    boolean useShareConnect = false;
    int connections = url.getParameter(CONNECTIONS_KEY, 0);
    List<ReferenceCountExchangeClient> shareClients = null;
    // 如果没有配置,连接是共享的,否则,一个服务一个连接
    if (connections == 0) {
        useShareConnect = true;
        /*
         * xml 配置应该比属性配置具有更高的优先级
         */
        String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
        connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS
                DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
        shareClients = getSharedClient(url, connections);
    }
    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        //如果使用的共享连接
        if (useShareConnect) {
            //得到共享的客户端
            clients[i] = shareClients.get(i);
        } else {
            //得到初始化的新的客户端
            clients[i] = initClient(url);
        }
    }
    return clients;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

这里的重点在 getClients方法,因为终究是要跟远程服务进行网络调用的,而 getClients 就是用于获取客户端实例,实例类型为 ExchangeClient,底层依赖 Netty 来进行网络通信,并且可以看到默认是共享连接 image.png

然后最终得到的 Invoker 如下图,可以看到记录的很多信息,基本上该有的都有了,我这里走的是对应的服务只有一个 url 的情况,多个 url 无非也是利用 directory 和 cluster封装一层 image.png

# 总结

总结

总的来说就是,通过配置组成 URL ,然后通过自适应得到对于的实现类进行服务引入,如果是注册中心那么会向注册中心注册自己的信息,然后订阅注册中心相关信息,得到远程 provider 的 ip 等信息,再通过netty客户端进行连接。

并且通过directory 和cluster 进行底层多个服务提供者的屏蔽、容错和负载均衡等,最终得到封装好的 invoker 再通过动态代理封装得到代理类,让接口调用者无感知的调用方法

在 GitHub 上编辑此页 (opens new window)
#Dubbo#服务发现
最后更新: 2022/10/04, 8:10:00
服务注册(provider服务暴露)
服务调用过程

← 服务注册(provider服务暴露) 服务调用过程→

Theme by Vdoing | Copyright © 2022-2023 zdk | notes
湘ICP备2022001117号-1
川公网安备 51142102511562号
提供加速服务
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式