使用Eureka做服务发现(二)

概述

这不是一篇使用指南,使用指南应该是循序渐进的;也不是源码解析,源码解析应该高屋建瓴,然后层层递进。我只是轻松的把该文写成在看代码的过程中的简单记录,有空再详细整理吧。

Eureka Server提供了一个Web管理页面,还有一系列的基于HTTP的API,提供注册、解除绑定,查找服务等功能。API文档位于:API文档
EurekaServer没有把信息在后端存储中持久化,而是存储在内存中。注册表中的每个服务实例都会采用向Server发送心跳的方式保持他们在注册表中的状态更新。Client端也在自己的内存中缓存了注册表信息,使得他们访问远程服务时,不必每次都向Server请求注册表信息。

默认情况下,每个Eureka Server同时也是一个Eureka Client,并且至少需要一个serviceUrl来定位Server。

为什么Eureka Server能够在提供HTTP API的同时,还是一个Eureka Client?Eureka Client主要完成什么工作?下面我们逐步的来解析。

Web.xml

Eureka Server是一个Web工程,所以我们就先来看一下web.xml,比较重要的有以下两个配置:

1
2
3
4
5
6
7
<listener>
<listener-class>com.netflix.eureka.EurekaBootStrap</listener-class>
</listener>
<init-param>
<param-name>com.sun.jersey.config.property.packages</param-name>
<param-value>com.sun.jersey;com.netflix</param-value>
</init-param>

其中,系统启动的时候会调用com.netflix.eureka.EurekaBootStrap;并且,系统通过扫描com.sun.jersey;com.netflix中的Resource来对外提供HTTP服务。

contextInitialized

  • 首先,会调用contextInitialized方法,该方法的主要作用是初始化Eureka,包括发布注册表和Eureka Server之间的同步功能。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public void contextInitialized(ServletContextEvent event) {
    try {
    initEurekaEnvironment();
    initEurekaServerContext();
    ServletContext sc = event.getServletContext();
    sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
    } catch (Throwable e) {
    logger.error("Cannot bootstrap eureka server :", e);
    throw new RuntimeException("Cannot bootstrap eureka server :", e);
    }
    }

从函数名称就可以看出,主要做了三件事情:

  1. 初始化EurekaServer所需要的一些变量
  2. 初始化ServerContext
  3. 把ServerContext保存在attribute中
    下面我们逐一进行分析。

初始化EurekaServer所需要变量

initEurekaEnvironment()方法的作用是初始化Eureka的环境配置信息,Euerka基于Archaius,配置都是通过ConfigurationManager.getConfigInstance().getXXX来加载的。该方法主要配置了两个参数DataCenter和Environment。其中DataCenter配置我们是否使用了云;Environment配置我们是在测试环境、生产环境、开发环境等。

  • 首先配置DataCenter, 默认情况下是default,另一个可配置的值是cloud,表示使用AWS。可以在配置文件中通过”eureka.datacenter”修改。

    1
    2
    private static final String CLOUD = "cloud";
    private static final String DEFAULT = "default";
  • 随后设置了Environment,默认情况下是test。可以在配置文件中通过”eureka.environment”修改。

初始化ServerContext

Eureka Server的启动需要很多配置信息,都在initEurekaServerContext()方法中进行加载。
这里有几个概念需要明确一下:

  • Eureka Server:是一个注册中心,provider可以向注册中心注册自己,consumer可以在注册中心中查询provider
  • Eureka Client:与Eureka Server相区别,是Server的使用者,provider和consumer都可以称作是Eureka Client。Client和Instance是完全不同的,Client的作用是和Server交互,而instance的作用是对外提供服务。
  • Eureka Service:一个微服务,在Eureka中注册,通过一个ID获取到,是一组Eureka Instance。
  • Eureka Instance:一个微服务的实例,一个微服务可以扩展为多个实例来提高可用性和可靠性。

配置信息:

  • EurekaServerConfig : EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig(); 该配置都是Server需要的信息,在DefaultEurekaServerConfig中都提供了默认的配置, 可以加载”eureka-server.properties”配置文件覆盖默认的配置,当然,文件名是可以修改的。通过DefaultEurekaServerConfig提供的配置都是可以动态修改的。(但是否能够动态生效呢?)

  • EurekaInstanceConfig : 该配置主要提供了注册到Eureka Server上的实例需要提供的信息,这样在服务注册后,就可以用通用的方式通过virtual hostname(VIPAddress, 可以简单想象成类似域名)获取信息,方便不同的实例之间进行调用通信。在不使用亚马逊云时,调用new MyDataCenterInstanceConfig()获取配置。MyDataCenterInstanceConfig只是PropertiesInstanceConfig的封装,配置信息都在PropertiesInstanceConfig中。

  • EurekaClientConfig : 是EurekaClient需要的信息,在DefaultEurekaClientConfig中提供了默认配置。默认读取”eureka-client.properties”配置。

  • ApplicationInfoManager: 在EurekaInstanceConfig的基础上,能够提供初始化注册到Server所需要的信息,并且能够被其他模块获取。

新建EurekaClient

EurekaClient eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);这行代码创建了一个EurekaClient,EurekaClient提供了以下功能:

  • 能够以多种方式获取InstanceInfo信息(作为consumer)
  • 能够获取到本地Client的信息(regions,serviceUrl等)
  • 能够注册健康检查,获取健康检查handler
    DiscoveryClient作为EurekaClient的一个实现,还提供了以下功能:
  • 向Eureka Server注册当前实例
  • 向Eureka Server更新租约
  • 当Eureka Server停止的时候取消租约
  • 查找Eureka Server中注册的服务和实例
    Eureka Client需要一个Eureka Server列表来实现高可用,当其中某个实例出现故障时,请求其他实例。
新建Executor

新建了两个ThreadPoolExecutor和一个ScheduledThreadPoolExecutor,其中scheduler用来实现定时调用,具体的工作还是通过调用heartbeatExecutor和cacheRefreshExecutor的execute完成的。其中heartbeatExecutor主要用于发送心跳,cacheRefreshExecutor主要用于同步信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
scheduler = Executors.newScheduledThreadPool(3,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff

scheduleServerEndpointTask(eurekaTransport, args);

该方法中创建了两个EurekaHttpClient, newRegistrationClient和newQueryClient。EurekaHttpClient是用来与Eureka Server进行通信的Eureka Client的实现。

  • newRegistrationClient:生成时是通过装饰模式层层包装的,SessionedEurekaHttpClient-> RetryableEurekaHttpClient-> RedirectingEurekaHttpClient, RedirectingEurekaHttpClient中通过EurekaJerseyClientImpl发送HTTP请求。
  • newQueryClient:与newRegistrationClient情况类似

fetchRegistry(false)

getAndStoreFullRegistry();: 从Eureka server获取所有的已注册的instance, 经过层层装饰,最终会调用RetryableEurekaHttpClient的execute方法,然后会调用JerseyApplicationClient的getApplications方法,请求“http://localhost:8080/eureka/v2/apps/”, 如果server还没有启动,则会抛出异常。随后就会调用fetchRegistryFromBackup();, 调用apps = backupRegistryInstance.fetchRegistry(); backupRegistryInstance是com.netflix.discovery.NotImplementedRegistryImpl,fetchRegistry()直接返回null。如果获取到了apps,则保存在localRegionApps中。

fetchRegistryFromBackup()

如果fetchRegistry没有获取到结果,返回false,则会继续调用fetchRegistryFromBackup(),实际上目前没有实现。

initScheduledTasks();

  • 定时执行CacheRefreshThread,用来刷新注册表信息,调用流程与上面的fetchRegistry一致。
    1
    2
    3
    4
    5
    class CacheRefreshThread implements Runnable {
    public void run() {
    refreshRegistry();
    }
    }

在refreshRegistry()中调用了fetchRegistry方法。

InstanceInfoReplicator:当当前实例发生变化时,把本地的信息复制到远程Server. 调用InstanceInfoReplicator的start方法。 Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS); 调用run()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void run() {
try {
//刷新instance信息
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
//重新注册
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}

提供HTTP服务

上面的代码新建了EurekaHttpClient进行HTTP请求,但是服务端是如何实现的呢?秘密就在com.netflix包下的众多Resource中。具体的包是:com.netflix.eureka.resources。就是使用jersey的一堆http服务,没什么过多可说的。

总结

对应《微服务注册发现概述》中的服务发现所需要的功能,梳理一下Eureka是如何实现的:

功能 Eureka实现
数据如何存储 内存
如何提供注册服务 jersey实现的HTTP服务
如何提供查询服务 jersey实现的HTTP服务
Provider变化时,如何通知Consumer CacheRefreshThread定时执行时会更新本地缓存
服务注册表的节点之间如何进行信息的同步和复制 每个Eureka Server同时也会启动一个Eureka Client,由Eureka Client负责复制和同步信息
当所有的服务注册表节点都Down掉后重新启动,如何重新获取注册信息 Eureka Client重新连接