23-消费者进行接口级服务发现订阅的逻辑
服务发现,即消费端自动发现服务地址列表的能力,是微服务框架需要具备的关键能力,借助于自动化的服务发现,微服务之间可以在无需感知对端部署位置与 IP 地址的情况下实现通信。
可以通过以下图的消费者与注册中心的交互逻辑来帮助理解:
消费者主动向注册中心订阅自己的信息,另外也会去注册中心拉取到提供者的信息来进行查询和订阅。
回到代码可以直接将位置定位到MigrationInvoker类型的migrateToApplicationFirstInvoker方法如下所示:
这里我们主要说的逻辑为消费者进行接口级服务发现订阅的逻辑,所以我们主要看代码refreshInterfaceInvoker这个方法调用即可
23.2 关于refreshInterfaceInvoker方法
这里我们直接来看代码:
MigrationInvoker类型中的refreshInterfaceInvoker方法。
//如果invoker对象存在则清理invoker对象的InvokersChangedListener
clearListener(invoker);
//invoker对象为空或者已经被销毁了则执行invoker对象创建的逻辑
if (needRefresh(invoker)) {
if (logger.isDebugEnabled()) {
logger.debug("Re-subscribing interface addresses for interface " + type.getName());
}
if (invoker != null) {
invoker.destroy();
}
//关键代码获取调用器invoker对象
invoker = registryProtocol.getInvoker(cluster, registry, type, url);
}
setListener(invoker, () -> {
latch.countDown();
if (reportService.hasReporter()) {
reportService.reportConsumptionStatus(
reportService.createConsumptionReport(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(), "interface"));
}
if (step == APPLICATION_FIRST) {
calcPreferredInvoker(rule);
}
});
}
前面我们可以看到当invoker对象不存在的时候会通过调用如下代码进行获取Invoker对象
//关键代码获取调用器invoker对象
//这个registryProtocol类型为InterfaceCompatibleRegistryProtocol
invoker = registryProtocol.getInvoker(cluster, registry, type, url);
23.2.1 InterfaceCompatibleRegistryProtocol的getInvoker方法
接下来我们详细看InterfaceCompatibleRegistryProtocol类型的getInvoker方法如下所示:
@Override
public <T> ClusterInvoker<T> getInvoker(Cluster cluster, Registry registry, Class<T> type, URL url) {
//创建注册中心目录(Directory这个英文单词翻译的结果为目录)
DynamicDirectory<T> directory = new RegistryDirectory<>(type, url);
//创建Invoker对象
return doCreateInvoker(directory, cluster, registry, type);
}
Directory 即服务目录, 服务目录中存储了一些和服务提供者有关的信息,通过服务目录,服务消费者可获取到服务提供者的信息
在继续看源码之前我们下来分析下Directory之间的关系
RegistryDirectory注册中心目录的构造器:
RegistryDirectory类型的构造器源码:
public RegistryDirectory(Class<T> serviceType, URL url) {
super(serviceType, url);
moduleModel = getModuleModel(url.getScopeModel());
//这里对应类型为:ConsumerConfigurationListener
consumerConfigurationListener = getConsumerConfigurationListener(moduleModel);
}
RegistryDirectory类型的父类型DynamicDirectory
DynamicDirectory的构造器
public DynamicDirectory(Class<T> serviceType, URL url) {
super(url, true);
ModuleModel moduleModel = url.getOrDefaultModuleModel();
//容错适配器,Cluster$Adaptive 默认的容错机制是失效转移 failover
this.cluster = moduleModel.getExtensionLoader(Cluster.class).getAdaptiveExtension();
//路由工厂适配器RouterFactory$Adaptive
this.routerFactory = moduleModel.getExtensionLoader(RouterFactory.class).getAdaptiveExtension();
if (serviceType == null) {
throw new IllegalArgumentException("service type is null.");
}
if (StringUtils.isEmpty(url.getServiceKey())) {
throw new IllegalArgumentException("registry serviceKey is null.");
}
this.shouldRegister = !ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true);
this.shouldSimplified = url.getParameter(SIMPLIFIED_KEY, false);
//这里对应我们的例子中的服务类型 为:link.elastic.dubbo.entity.DemoService
this.serviceType = serviceType;
//服务没有分组和版本 默认的key是服务信息 :link.elastic.dubbo.entity.DemoService
this.serviceKey = super.getConsumerUrl().getServiceKey();
this.directoryUrl = consumerUrl;
//分组信息查询
String group = directoryUrl.getGroup("");
this.multiGroup = group != null && (ANY_VALUE.equals(group) || group.contains(","));
//服务目录信息为空是否快速失败 默认为true
this.shouldFailFast = Boolean.parseBoolean(ConfigurationUtils.getProperty(moduleModel, Constants.SHOULD_FAIL_FAST_KEY, "true"));
}
先看前面调用道德构造器isUrlFromRegistry这个参数值为true
public AbstractDirectory(URL url, boolean isUrlFromRegistry) {
this(url, null, isUrlFromRegistry);
}
AbstractDirectory重载的构造器
public AbstractDirectory(URL url, RouterChain<T> routerChain, boolean isUrlFromRegistry) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
this.url = url.removeAttribute(REFER_KEY).removeAttribute(MONITOR_KEY);
Map<String, String> queryMap;
//注册中心中引用URL的关键字名称 这个查询到的是服务引用的一些配置信息
Object referParams = url.getAttribute(REFER_KEY);
if (referParams instanceof Map) {
queryMap = (Map<String, String>) referParams;
//查询
this.consumerUrl = (URL) url.getAttribute(CONSUMER_URL_KEY);
} else {
queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
}
// remove some local only parameters
ApplicationModel applicationModel = url.getOrDefaultApplicationModel();
this.queryMap = applicationModel.getBeanFactory().getBean(ClusterUtils.class).mergeLocalParams(queryMap);
if (consumerUrl == null) {
String host = isNotEmpty(queryMap.get(REGISTER_IP_KEY)) ? queryMap.get(REGISTER_IP_KEY) : this.url.getHost();
String path = isNotEmpty(queryMap.get(PATH_KEY)) ? queryMap.get(PATH_KEY) : queryMap.get(INTERFACE_KEY);
String consumedProtocol = isNotEmpty(queryMap.get(PROTOCOL_KEY)) ? queryMap.get(PROTOCOL_KEY) : CONSUMER;
URL consumerUrlFrom = this.url
.setHost(host)
.setPort(0)
.setProtocol(consumedProtocol)
.setPath(path);
if (isUrlFromRegistry) {
// reserve parameters if url is already a consumer url
consumerUrlFrom = consumerUrlFrom.clearParameters();
}
this.consumerUrl = consumerUrlFrom.addParameters(queryMap);
}
//用于检查连接的线程池 核心线程数为CPU核心数,线程的名字为:Dubbo-framework-connectivity-scheduler 分析线程时候看到这个名字要知道它的用处
this.connectivityExecutor = applicationModel.getFrameworkModel().getBeanFactory()
.getBean(FrameworkExecutorRepository.class).getConnectivityScheduledExecutor();
//获取全局配置,全局配置就是配置信息
Configuration configuration = ConfigurationUtils.getGlobalConfiguration(url.getOrDefaultModuleModel());
//选择尝试重新连接的每个重新连接任务的最大调用程序数。 默认为10
//从invokersToReconnect中选取调用程序限制最大重新连接任务TryCount,防止此任务长时间挂起所有ConnectionExecutor
this.reconnectTaskTryCount = configuration.getInt(RECONNECT_TASK_TRY_COUNT, DEFAULT_RECONNECT_TASK_TRY_COUNT);
//重连线程池两次触发重连任务的间隔时间,默认1000毫秒 重新连接任务的时间段(如果需要)。(单位:毫秒)
this.reconnectTaskPeriod = configuration.getInt(RECONNECT_TASK_PERIOD, DEFAULT_RECONNECT_TASK_PERIOD);
//路由调用链
setRouterChain(routerChain);
}
23.4 调用对象ClusterInvoker的创建过程doCreateInvoker
前面介绍了创建服务目录对象,服务目录对象仅仅创建了服务目录的对象没有进行一些逻辑性操作比如服务查询,接下来我们继续看看如何借助服务目录工具来发现服务然后创建调用器
RegistryProtocol的doCreateInvoker方法:
protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) {
//初始化服务目录 为其设置 当前类型的的注册中心和协议
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<>(directory.getConsumerUrl().getParameters());
//消费者配置转ServiceConfigURL
URL urlToRegistry = new ServiceConfigURL(
parameters.get(PROTOCOL_KEY) == null ? CONSUMER : parameters.get(PROTOCOL_KEY),
parameters.remove(REGISTER_IP_KEY),
0,
getPath(parameters, type),
parameters
);
urlToRegistry = urlToRegistry.setScopeModel(directory.getConsumerUrl().getScopeModel());
urlToRegistry = urlToRegistry.setServiceModel(directory.getConsumerUrl().getServiceModel());
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(urlToRegistry);
//这一行代码是将服务消费者的配置信息注册到注册中心的逻辑
registry.register(directory.getRegisteredConsumerUrl());
}
//这一行代码是用来创建路由链的
directory.buildRouterChain(urlToRegistry);
//服务发现并订阅的逻辑
directory.subscribe(toSubscribeUrl(urlToRegistry));
//cluster类型为 MockClusterWrapper 包装了 FailoverCluster
//这个是处理调用链路的 最前面的调用是容错然后回加上失效转移,过滤器负载均衡等等invoker执行的时候按顺序执行
return (ClusterInvoker<T>) cluster.join(directory, true);
}
ListenerRegistryWrapper 的register方法
@Override
public void register(URL url) {
try {
//这个registry类型为ZookeeperRegistry
if (registry != null) {
registry.register(url);
}
} finally {
if (CollectionUtils.isNotEmpty(listeners) && !UrlUtils.isConsumer(url)) {
RuntimeException exception = null;
for (RegistryServiceListener listener : listeners) {
if (listener != null) {
try {
listener.onRegister(url, registry);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
}
ZookeeperRegistry类型的父类型FailbackRegistry的register方法
@Override
public void register(URL url) {
if (!acceptable(url)) {
logger.info("URL " + url + " will not be registered to Registry. Registry " + this.getUrl() + " does not accept service of this protocol type.");
return;
}
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// Sending a registration request to the server side
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& (url.getPort() != 0);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
addFailedRegistered(url);
}
}
AbstractRegistry类型的register方法
@Override
public void register(URL url) {
if (url == null) {
throw new IllegalArgumentException("register url == null");
}
if (url.getPort() != 0) {
if (logger.isInfoEnabled()) {
logger.info("Register: " + url);
}
}
registered.add(url);
}
@Override
public void doRegister(URL url) {
try {
checkDestroyed();
//写入消费者路径 /dubbo/服务接口/consumers/消费者配置url 第二个参数是否为临时节点默认是的,如果动态配置为false就会是持久节点了
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
前面的zkClient会写入一个消费者的路径如下所示:
/dubbo/link.elastic.dubbo.entity.DemoService/consumers/consumer%3A%2F%2F192.168.1.169%2Flink.elastic.dubbo.entity.DemoService%3Fapplication%3Ddubbo-demo-api-consumer%26background%3Dfalse%26category%3Dconsumers%26check%3Dfalse%26dubbo%3D2.0.2%26interface%3Dlink.elastic.dubbo.entity.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D52237%26qos.enable%3Dfalse%26qos.port%3D-1%26release%3D3.0.10%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1659862505044
RegistryDirectory类型的subscribe方法
@Override
public void subscribe(URL url) {
super.subscribe(url);
if (moduleModel.getModelEnvironment().getConfiguration().convert(Boolean.class, org.apache.dubbo.registry.Constants.ENABLE_CONFIGURATION_LISTEN, true)) {
consumerConfigurationListener.addNotifyListener(this);
referenceConfigurationListener = new ReferenceConfigurationListener(moduleModel, this, url);
}
}
DynamicDirectory类型的subscribe方法
ListenerRegistryWrapper类型的subscribe方法
@Override
public void subscribe(URL url, NotifyListener listener) {
try {
if (registry != null) {
registry.subscribe(url, listener);
}
} finally {
if (CollectionUtils.isNotEmpty(listeners)) {
RuntimeException exception = null;
for (RegistryServiceListener registryListener : listeners) {
if (registryListener != null) {
try {
registryListener.onSubscribe(url, registry);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
}
FailbackRegistry类型的subscribe方法
@Override
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// Sending a subscription request to the server side
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
List<URL> urls = getCacheUrls(url);
if (CollectionUtils.isNotEmpty(urls)) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getCacheFile().getName() + ", cause: " + t.getMessage(), t);
} else {
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
addFailedSubscribed(url, listener);
}
AbstractRegistry类型的subscribe方法
@Override
public void subscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("subscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("subscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Subscribe: " + url);
}
// ConcurrentMap<URL, Set<NotifyListener>> subscribed集合用来记录消费者对应的通知监听器
Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n -> new ConcurrentHashSet<>());
listeners.add(listener);
}
ZookeeperRegistry的doSubscribe方法:
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
checkDestroyed();
if (ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
boolean check = url.getParameter(CHECK_KEY, false);
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChildren) -> {
for (String child : currentChildren) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(check)), k);
}
}
});
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(check)), listener);
}
}
} else {
CountDownLatch latch = new CountDownLatch(1);
try {
List<URL> urls = new ArrayList<>();
//接口级默认的路径有3个我们暂时需要关注的:
// 提供者:对应dubbo/link.elastic.dubbo.entity.DemoService/providers
//配置:dubbo/link.elastic.dubbo.entity.DemoService/configurators
//路由:/dubbo/link.elastic.dubbo.entity.DemoService/routers
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
//这里有个监听器RegistryChildListenerImpl
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, k, latch));
if (zkListener instanceof RegistryChildListenerImpl) {
((RegistryChildListenerImpl) zkListener).setLatch(latch);
}
//创建非临时节点,不存在时候会创建 比如/dubbo/link.elastic.dubbo.entity.DemoService/providers
///dubbo/link.elastic.dubbo.entity.DemoService/configurators
zkClient.create(path, false);
//服务目录创建完毕之后创建一个监听器用来监听子目录,同时要返回一个path目录的子子节点,比如providers下面的提供者节点列表,如果有多个可以返回多个,下面以1个的情况举例子
//dubbo%3A%2F%2F192.168.1.169%3A20880%2Flink.elastic.dubbo.entity.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-api-provider%26background%3Dfalse%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dlink.elastic.dubbo.entity.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D51534%26release%3D3.0.10%26service-name-mapping%3Dtrue%26side%3Dprovider%26timestamp%3D1659860685159
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
//urls存储的是当前服务对应服务配置的路径比如提供者
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
//通知方法(服务订阅)
notify(url, listener, urls);
} finally {
// tells the listener to run only after the sync notification of main thread finishes.
latch.countDown();
}
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
前面这里查询到的urls默认一共有3种这里我只有一个提供者无配置,无路由数据将会获取到如下的url信息
提供者
dubbo://192.168.1.169:20880/link.elastic.dubbo.entity.DemoService?anyhost=true&application=dubbo-demo-api-provider&background=false&category=providers,configurators,routers&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=link.elastic.dubbo.entity.DemoService&methods=sayHello,sayHelloAsync&pid=52816&qos.enable=false&qos.port=-1&release=3.0.10&service-name-mapping=true&side=provider&sticky=false
配置信息
empty://192.168.1.169/link.elastic.dubbo.entity.DemoService?application=dubbo-demo-api-consumer&background=false&category=configurators&dubbo=2.0.2&interface=link.elastic.dubbo.entity.DemoService&methods=sayHello,sayHelloAsync&pid=52816&qos.enable=false&qos.port=-1&release=3.0.10&side=consumer&sticky=false×tamp=1659863678563
路由信息:
empty://192.168.1.169/link.elastic.dubbo.entity.DemoService?application=dubbo-demo-api-consumer&background=false&category=routers&dubbo=2.0.2&interface=link.elastic.dubbo.entity.DemoService&methods=sayHello,sayHelloAsync&pid=52816&qos.enable=false&qos.port=-1&release=3.0.10&side=consumer&sticky=false×tamp=1659863678563
观察一下可以看到如果无配置或者无路由的时候会有empty协议的url
FailbackRegistry类型的notify方法:
@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
doNotify(url, listener, urls);
} catch (Exception t) {
// Record a failed registration request to a failed list
logger.error("Failed to notify addresses for subscribe " + url + ", cause: " + t.getMessage(), t);
}
}
FailbackRegistry类型的doNotify方法
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
super.notify(url, listener, urls);
}
AbstractRegistry类型的notify 服务通知模版方法
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((CollectionUtils.isEmpty(urls)) && !ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", url size: " + urls.size());
}
// keep every provider's category.
Map<String, List<URL>> result = new HashMap<>();
//这个例子中的urls会有3个可以看前面的说的
for (URL u : urls) {
//这里重点关注isMatch方法
//这个match方法判断了消费者和提供者的分区+服务接口是否一致 或者其中一个配置为泛化配置:*
if (UrlUtils.isMatch(url, u)) {
//获取当前分类
String category = u.getCategory(DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
//下面这个循环很重要会将所有的注册中心注册的数据推给notify方法包含配置,路由,服务提供者等
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
//RegistryDirectory类型的notify
listener.notify(categoryList);
// We will update our cache file after each notification.
// When our Registry has a subscribed failure due to network jitter, we can return at least the existing cache URL.
if (localCacheEnabled) {
//缓存当前服务信息:consumer://192.168.1.169/link.elastic.dubbo.entity.DemoService?application=dubbo-demo-api-consumer&background=false&category=providers,configurators,routers&dubbo=2.0.2&interface=link.elastic.dubbo.entity.DemoService&methods=sayHello,sayHelloAsync&pid=36046&qos.enable=false&qos.port=-1&release=3.0.10&side=consumer&sticky=false×tamp=1660987214249
saveProperties(url);
}
}
}
接收服务通知的方法
RegistryDirectory类型的notify方法
@Override
public synchronized void notify(List<URL> urls) {
if (isDestroyed()) {
return;
}
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(this::judgeCategory));
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);
// providers
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
// 3.x added for extend URL address
ExtensionLoader<AddressListener> addressListenerExtensionLoader = getUrl().getOrDefaultModuleModel().getExtensionLoader(AddressListener.class);
List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
if (supportedListeners != null && !supportedListeners.isEmpty()) {
for (AddressListener addressListener : supportedListeners) {
providerURLs = addressListener.notify(providerURLs, getConsumerUrl(), this);
}
}
refreshOverrideAndInvoker(providerURLs);
}
刷新服务通知的方法
RegistryDirectory类型的refreshOverrideAndInvoker方法
private synchronized void refreshOverrideAndInvoker(List<URL> urls) {
// mock zookeeper://xxx?mock=return null
refreshInvoker(urls);
}
RegistryDirectory类型的refreshInvoker方法
private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null");
if (invokerUrls.size() == 1
&& invokerUrls.get(0) != null
&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
routerChain.setInvokers(BitList.emptyList());
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
if (invokerUrls == Collections.<URL>emptyList()) {
invokerUrls = new ArrayList<>();
}
//使用本地引用以避免NPE。destroyAllInvokers()将cachedInvokerUrls设置为空。
// use local reference to avoid NPE as this.cachedInvokerUrls will be set null by destroyAllInvokers().
Set<URL> localCachedInvokerUrls = this.cachedInvokerUrls;
if (invokerUrls.isEmpty() && localCachedInvokerUrls != null) {
logger.warn("Service" + serviceKey + " received empty address list with no EMPTY protocol set, trigger empty protection.");
invokerUrls.addAll(localCachedInvokerUrls);
} else {
localCachedInvokerUrls = new HashSet<>();
//缓存的调用器URL,便于比较
localCachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
this.cachedInvokerUrls = localCachedInvokerUrls;
}
if (invokerUrls.isEmpty()) {
return;
}
////使用本地引用以避免NPE。urlInvokerMap将在destroyAllInvokers()处同时设置为null。
// use local reference to avoid NPE as this.urlInvokerMap will be set null concurrently at destroyAllInvokers().
Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;
//无法使用本地引用,因为oldUrlInvokerMap的映射可能会在toInvokers()处直接删除。
// can't use local reference as oldUrlInvokerMap's mappings might be removed directly at toInvokers().
Map<URL, Invoker<T>> oldUrlInvokerMap = null;
if (localUrlInvokerMap != null) {
// the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing.
oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + localUrlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));
localUrlInvokerMap.forEach(oldUrlInvokerMap::put);
}
//将URL转换为Invoker 这里会做一些协议的指定过滤操作
Map<URL, Invoker<T>> newUrlInvokerMap = toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map
/**
* If the calculation is wrong, it is not processed.
*
* 1. The protocol configured by the client is inconsistent with the protocol of the server.
* eg: consumer protocol = dubbo, provider only has other protocol services(rest).
* 2. The registration center is not robust and pushes illegal specification data.
*
*/
if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
.toString()));
return;
}
List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
this.setInvokers(multiGroup ? new BitList<>(toMergeInvokerList(newInvokers)) : new BitList<>(newInvokers));
// pre-route and build cache
routerChain.setInvokers(this.getInvokers());
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
// notify invokers refreshed
this.invokersChanged();
}
}
将invokerURL列表转换为调用器映射。转换规则如下:
- 如果URL已转换为invoker,它将不再被重新引用并直接从缓存中获取,请注意,URL中的任何参数更改都将被重新引用。
- 如果传入调用器列表不为空,则表示它是最新的调用器列表。
- 如果传入invokerUrl的列表为空,则意味着该规则只是一个覆盖规则或路由规则,需要重新对比以决定是否重新引用。
协议过滤,配置合并都会在这里面去做
将url转换为调用程序,如果url已被引用,则不会重新引用。将放入newUrlInvokeMap的项将从OldUrlInvoceMap中删除。
private Map<URL, Invoker<T>> toInvokers(Map<URL, Invoker<T>> oldUrlInvokerMap, List<URL> urls) {
Map<URL, Invoker<T>> newUrlInvokerMap = new ConcurrentHashMap<>(urls == null ? 1 : (int) (urls.size() / 0.75f + 1));
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
//这个配置怎么来呢,这个配置是就是要过滤的协议,如果我们指定了当前接口的协议比如dubbo.reference.<interface>.protocol这样的指定协议配置就可以在这里过滤出合法的协议
String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
//遍历所有提供者列表进行转换
for (URL providerUrl : urls) {
// If protocol is configured at the reference side, only the matching protocol is selected
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
continue;
}
}
//空协议 直接跳过
if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
//查询扩展是否存在,这个要注意如果是自定义扩展或者像webservice这样的扩展一般需要额外引入扩展包才可以
if (!getUrl().getOrDefaultFrameworkModel().getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
" in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
" to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
getUrl().getOrDefaultFrameworkModel().getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
//这个合并很重要决定了一个服务的配置优先级
//合并url参数。顺序是:覆盖override协议配置(比如禁用服务有时候就这样做)> -D(JVM参数) > 消费者 > 提供者
URL url = mergeUrl(providerUrl);
// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
if (invoker == null) { // Not in the cache, refer again
try {
if (url.hasParameter(DISABLED_KEY)) {
enabled = !url.getParameter(DISABLED_KEY, false);
} else {
enabled = url.getParameter(ENABLED_KEY, true);
}
if (enabled) {
//消费者引用服务提供者
invoker = protocol.refer(serviceType, url);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(url, invoker);
}
} else {
newUrlInvokerMap.put(url, invoker);
}
}
return newUrlInvokerMap;
}
上面代码有一个很重要的一行,是消费者引用服务提供者的代码如下所示:这行代码将会执行服务引用逻辑:
前面这个代码会经过协议调用链(wrapper机制的aop)逻辑,默认使用的是Dubbo协议,这里我们就不详细展开了直接定位到DubboProtocol来看
DubboProtocol类型的refer方法:
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
checkDestroyed();
return protocolBindingRefer(type, url);
}
DubboProtocol类型的protocolBindingRefer方法:
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
checkDestroyed();
//optimizer配置 序列化优化配置
optimizeSerialization(url);
// create rpc invoker
//DubboInvoker调用器对象创建 这里要重点关注的方法是getClients.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
注意这里分了两步,1)获取网络客户端 2)封装为DubboInvoker
DubboProtocol类型的getClients方法:
private ExchangeClient[] getClients(URL url) {
// whether to share connection
boolean useShareConnect = false;
int connections = url.getParameter(CONNECTIONS_KEY, 0);
List<ReferenceCountExchangeClient> shareClients = null;
// if not configured, connection is shared, otherwise, one connection for one service
if (connections == 0) {
useShareConnect = true;
/*
* The xml configuration should have a higher priority than properties.
*/
String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigurationUtils.getProperty(url.getOrDefaultApplicationModel(), SHARE_CONNECTIONS_KEY,
DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
shareClients = getSharedClient(url, connections);
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (useShareConnect) {
clients[i] = shareClients.get(i);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
DubboProtocol类型的getSharedClient方法:
@SuppressWarnings("unchecked")
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
String key = url.getAddress();
Object clients = referenceClientMap.get(key);
if (clients instanceof List) {
List<ReferenceCountExchangeClient> typedClients = (List<ReferenceCountExchangeClient>) clients;
if (checkClientCanUse(typedClients)) {
batchClientRefIncr(typedClients);
return typedClients;
}
}
List<ReferenceCountExchangeClient> typedClients = null;
synchronized (referenceClientMap) {
for (; ; ) {
clients = referenceClientMap.get(key);
if (clients instanceof List) {
typedClients = (List<ReferenceCountExchangeClient>) clients;
if (checkClientCanUse(typedClients)) {
batchClientRefIncr(typedClients);
return typedClients;
} else {
referenceClientMap.put(key, PENDING_OBJECT);
break;
}
} else if (clients == PENDING_OBJECT) {
try {
referenceClientMap.wait();
} catch (InterruptedException ignored) {
}
} else {
referenceClientMap.put(key, PENDING_OBJECT);
break;
}
}
}
try {
// connectNum must be greater than or equal to 1
//长连接数量 默认为1
connectNum = Math.max(connectNum, 1);
// If the clients is empty, then the first initialization is
if (CollectionUtils.isEmpty(typedClients)) {
//!!!!主要看这一行 构建客户端
typedClients = buildReferenceCountExchangeClientList(url, connectNum);
} else {
for (int i = 0; i < typedClients.size(); i++) {
ReferenceCountExchangeClient referenceCountExchangeClient = typedClients.get(i);
// If there is a client in the list that is no longer available, create a new one to replace him.
if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
typedClients.set(i, buildReferenceCountExchangeClient(url));
continue;
}
referenceCountExchangeClient.incrementAndGetCount();
}
}
} finally {
synchronized (referenceClientMap) {
if (typedClients == null) {
referenceClientMap.remove(key);
} else {
//这里key位IP:端口 值为当前客户端 一个IP:端口的服务提供者会缓存一个连接列表
referenceClientMap.put(key, typedClients);
}
referenceClientMap.notifyAll();
}
}
return typedClients;
}
DubboProtocol类型的 buildReferenceCountExchangeClientList方法: 创建网络交换器客户端
private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) {
List<ReferenceCountExchangeClient> clients = new ArrayList<>();
for (int i = 0; i < connectNum; i++) {
clients.add(buildReferenceCountExchangeClient(url));
}
return clients;
}
DubboProtocol类型的buildReferenceCountExchangeClient方法
private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
//初始化客户端对象
ExchangeClient exchangeClient = initClient(url);
//包装交换器客户端对象
ReferenceCountExchangeClient client = new ReferenceCountExchangeClient(exchangeClient, DubboCodec.NAME);
// read configs
int shutdownTimeout = ConfigurationUtils.getServerShutdownTimeout(url.getScopeModel());
client.setShutdownWaitTime(shutdownTimeout);
return client;
}
DubboProtocol类型的initClient方法:
/**
* Create new connection
*
* @param url
*/
private ExchangeClient initClient(URL url) {
/**
* Instance of url is InstanceAddressURL, so addParameter actually adds parameters into ServiceInstance,
* which means params are shared among different services. Since client is shared among services this is currently not a problem.
*/
String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
// BIO is not allowed since it has severe performance issue.
if (StringUtils.isNotEmpty(str) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
// Replace InstanceAddressURL with ServiceConfigURL.
url = new ServiceConfigURL(DubboCodec.NAME, url.getUsername(), url.getPassword(), url.getHost(), url.getPort(), url.getPath(), url.getAllParameters());
url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
// connection should be lazy
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
Exchangers类型的connect方法:
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
//交换器扩展对象获取默认为HeaderExchanger类型
return getExchanger(url).connect(url, handler);
}
HeaderExchanger类型的connect方法:
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
HeaderExchangeHandler对象创建
DecodeHandler处理器对象创建
Transporters的connect方法
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
//默认的传输器是netty4
return getTransporter(url).connect(url, handler);
}
org.apache.dubbo.remoting.transport.netty4.NettyTransporter的connect方法:
@Override
public Client connect(URL url, ChannelHandler handler) throws RemotingException {
return new NettyClient(url, handler);
}
org.apache.dubbo.remoting.transport.netty4.NettyClient对象创建
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
super(url, wrapChannelHandler(url, handler));
}
HeartbeatHandler
MultiMessageHandler创建
AbstractClient构造器
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
// set default needReconnect true when channel is not connected
needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, true);
//初始化线程池 默认为FixedThreadPool
initExecutor(url);
try {
//启动netty的核心代码初始化Bootstrap
//默认走的是NioSocketChannel
//初始化默认连接超时时间为3秒
doOpen();
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
try {
// connect.
//前面是初始化netty的客户端启动类Bootstrap 这里是执行连接的代码:bootstrap.connect(getConnectAddress());
//等待3秒连接失败则抛出异常
connect();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
}
} catch (RemotingException t) {
// If lazy connect client fails to establish a connection, the client instance will still be created,
// and the reconnection will be initiated by ReconnectTask, so there is no need to throw an exception
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() +
" connect to the server " + getRemoteAddress() +
" (the connection request is initiated by lazy connect client, ignore and retry later!), cause: " +
t.getMessage(), t);
return;
}
//默认必须连接,无法连接则抛出异常
if (url.getParameter(Constants.CHECK_KEY, true)) {
close();
throw t;
} else {
logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
}
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
}
AbstractEndpoint构造器
public AbstractEndpoint(URL url, ChannelHandler handler) {
super(url, handler);
//DubboCodec是实现RPC调用的Request和Response对象的编码和解码类,RPC调用实现的核心传输也就是这两个类对象。
//这里是封装了DubboCodec类型的DubboCountCodec
this.codec = getChannelCodec(url);
//默认连接超时时间为3000毫秒
}
AbstractPeer构造器创建