Ribbon如何做负载均衡

负载均衡

什么是负载均衡

负载均衡是由多台服务器以对称的方式组成一个服务器集合,每台服务器都
具有等价的地位,都可以单独对外提供服务而无须其他服务器的辅助。通过某
种负载分担技术,将外部发送来的请求均匀分配到对称结构中的某一台服务器
上,而接收到请求的服务器独立地回应客户的请求。

负载均衡的意义

  1. 提供了水平扩展的功能,增强了集群的处理能力
  2. 可以实现故障转移,快速失败等功能,实现了高可用性

分类及分层

大家可以在网上搜索或者参考附录中的文章,这里就不做解释。

Ribbon简介

简单说,Ribbon是一个客户端RPC框架,主要提供了一下功能:

  • 负载均衡
  • 容错
  • 多协议支持,响应式编程模型
  • 缓存和批处理

由于Eureka本身提供的负载均衡的功能比较弱,Ribbon通过ribbon-eureka提供了负载均衡的功能,所以本文中我们主要关心Ribbon的两个重要功能。

  • 如何实现负载均衡功能
  • 如何获取和更新Eureka Server中的Server信息

Ribbon负载均衡

要实现软负载均衡,需要以下几种能力:

  1. 获取server列表,取得server的信息,至少需要ip和port
  2. 可以根据某种方法从server列表中选择一个server, 选择的策略要灵活可配置
  3. 当server列表中的某个server不能访问时,需要能够得到状态的通知,选择server时排除

ILoadBalancer接口

ILoadBalancer接口提供了选择Server列表的能力

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* Interface that defines the operations for a software loadbalancer. A typical
* loadbalancer minimally need a set of servers to loadbalance for, a method to
* mark a particular server to be out of rotation and a call that will choose a
* server from the existing list of server.
*
* @author stonse
*
*/
public interface ILoadBalancer {
/**
* Initial list of servers.
* This API also serves to add additional ones at a later time
* The same logical server (host:port) could essentially be added multiple times
* (helpful in cases where you want to give more "weightage" perhaps ..)
*
* @param newServers new servers to add
* 调用addServers方法进行列表的初始化
*/
public void addServers(List<Server> newServers);
/**
* Choose a server from load balancer.
*
* @param key An object that the load balancer may use to determine which server to return. null if
* the load balancer does not use this parameter.
* @return server chosen
* 从Server列表中选择一个Server
*/
public Server chooseServer(Object key);
/**
* To be called by the clients of the load balancer to notify that a Server is down
* else, the LB will think its still Alive until the next Ping cycle - potentially
* (assuming that the LB Impl does a ping)
*
* @param server Server to mark as down
* 标识Server处于不可用状态
*/
public void markServerDown(Server server);
/**
* @return Only the servers that are up and reachable.
* 获取可达的server列表,不包括不可用的server
*/
public List<Server> getReachableServers();
/**
* @return All known servers, both reachable and unreachable.
* 获取所有server
*/
public List<Server> getAllServers();
}

Server

Server主要包含以下属性,提供了网络调用所需要的信息,可以拼接URL:http://${host}:${port}/xxx来进行HTTP请求。

1
2
3
4
5
6
public static final String UNKNOWN_ZONE = "UNKNOWN";
private String host;
private int port = 80;
private volatile String id;
private boolean isAliveFlag; //是否alive
private String zone = UNKNOWN_ZONE;

ILoadBalancer的实现

ILoadBalancer层级

  • BaseLoadBalancer实现了LoadBanlancer的主要功能,其他类可以直接继承BaseLoadBalancer很方便的实现
  • DynamicServerListLoadBalancer就是在BaseLoadBalancer基础上实现的动态获取ServerList的LoadBalancer

下面我们看一下BaseLoadBalancer的几个重要属性:

  • IRule: server选择策略接口
  • IPing: 检查一个server是否存活
  • IPingStrategy: 检查所有servers的策略,与上面的IPing向区别,IPing是具体如何检查一个server,两个接口配合完成判断所有server是否存活的功能

IRule

从IRule接口可以看出,choose是IRule的主要功能,也就是选取的规则的定义。但是ServerList还是要从LoadBalancer中获取,所以需要传入LoadBalancer。在BaseLoadBanlancer中,LoadBalancer通过Server svr = rule.choose(key); 委托IRule选择server。

1
2
3
4
5
6
7
public interface IRule{
public Server choose(Object key);
public void setLoadBalancer(ILoadBalancer lb);
public ILoadBalancer getLoadBalancer();
}

RoundRobinRule

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//构造函数中传入lb
public RoundRobinRule(ILoadBalancer lb) {
this();
setLoadBalancer(lb);
}
//RoundRobin
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
int count = 0;
while (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
// Next.
server = null;
}
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}

IPing

IPing只有一个方法,用来判断server是否存活。IPingStrategy负责判断多台Server, 可以有多种策略,具体每台server的判断仍然使用IPing接口提供的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public interface IPing {
public boolean isAlive(Server server);
}
```
public interface IPingStrategy {
boolean[] pingServers(IPing ping, Server[] servers);
}
public class NIWSDiscoveryPing extends AbstractLoadBalancerPing {
...
public boolean isAlive(Server server) {
boolean isAlive = true;
if (server!=null && server instanceof DiscoveryEnabledServer){
DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;
//从InstanceInfo中的status判断server的状态
InstanceInfo instanceInfo = dServer.getInstanceInfo();
if (instanceInfo!=null){
InstanceStatus status = instanceInfo.getStatus();
if (status!=null){
isAlive = status.equals(InstanceStatus.UP);
}
}
}
return isAlive;
}
...
}

BaseLoadBalancer

IRule、IPing、IPingStrategy上面已经解释过了,下面主要看ILoadBalancer接口的实现。具体解释见注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
protected IRule rule = DEFAULT_RULE;
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
protected IPing ping = null;
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
.synchronizedList(new ArrayList<Server>());
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections
.synchronizedList(new ArrayList<Server>());
/*
* 先新建一个newList, 然后调用setServersList替换旧的list.
* 这样做的理由是只需要在setServersList加锁就可以了
*/
public void addServer(Server newServer) {
if (newServer != null) {
try {
ArrayList<Server> newList = new ArrayList<Server>();
newList.addAll(allServerList);
newList.add(newServer);
setServersList(newList);
} catch (Exception e) {
logger.error("Exception while adding a newServer", e);
}
}
}
/*
+ Get the alive server dedicated to key
+
+ @return the dedicated server
* 调用rule的choose方法进行选择
*/
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Throwable t) {
return null;
}
}
}
/*
* 首先设置server的状态
* 然后调用listener进行通知
*/
public void markServerDown(Server server) {
if (server == null) {
return;
}
if (!server.isAlive()) {
return;
}
logger.error("LoadBalancer: markServerDown called on ["
- server.getId() + "]");
server.setAlive(false);
// forceQuickPing();
notifyServerStatusChangeListener(singleton(server));
}
@Override
public List<Server> getReachableServers() {
return Collections.unmodifiableList(upServerList);
}
@Override
public List<Server> getAllServers() {
return Collections.unmodifiableList(allServerList);
}

从以上过程可以看出,在有了serverList以后,利用IRule, IPing, BaseLoadBalancer来完成负载均衡还是比较简单的。

DynamicServerListLoadBalancer

下面我们主要看一下如何使用DynamicServerListLoadBalancer与Eureka进行集成。

配置

  • com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList从Eureka获取server list的实现类。
  • erwa.ribbon.ServerListRefreshInterval是刷新的间隔
1
2
3
erwa.ribbon.DeploymentContextBasedVipAddresses=erwa
erwa.ribbon.NIWSServerListClassName=com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList
erwa.ribbon.ServerListRefreshInterval=60000

使用

从代码可以看出,关键有两步:

  • 获取DynamicServerListLoadBalancer
  • 使用DynamicServerListLoadBalancer获取server list
1
2
3
4
DynamicServerListLoadBalancer<?> lb = (DynamicServerListLoadBalancer<?>) ClientFactory.getNamedLoadBalancer("erwa");
// show all servers in the list
List<Server> list = lb.getServerList(false);
Iterator<Server> it = list.iterator();

新建DynamicServerListLoadBalancer

DynamicServerListLoadBalancer的新建通过getNamedLoadBalancer(name, DefaultClientConfigImpl.class);方法,name是vipAddress。接着会初始化IClientConfig的实现,然后调用registerNamedLoadBalancerFromclientConfig(name, clientConfig)->ClientFactory.instantiateInstanceWithClientConfig(loadBalancerClassName, clientConfig);

实例化IPing和IRule

在实例化的时候,首先会调用使用Class.forName, 然后调用newInstance。随后,逐层进行配置的初始化。需要进行配置初始化的类都实现了IClientConfigAware接口。

1
2
3
4
5
6
7
8
9
10
public interface IClientConfigAware {
/**
* Concrete implementation should implement this method so that the configuration set via
* {@link IClientConfig} (which in turn were set via Archaius properties) will be taken into consideration
*
* @param clientConfig
*/
public abstract void initWithNiwsConfig(IClientConfig clientConfig);
}

首先调用的是DynamicServerListLoadBalancer的initWithNiwsConfig方法,其中通过super.initWithNiwsConfig调用了B的aseLoadBalancer的initWithNiwsConfig方法,在其中给IRule和IPing赋值,其中

  • IRule: com.netflix.loadbalancer.AvailabilityFilteringRule
  • IPing: com.netflix.loadbalancer.DummyPing
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void initWithNiwsConfig(IClientConfig clientConfig) {
String ruleClassName = (String) clientConfig
.getProperty(CommonClientConfigKey.NFLoadBalancerRuleClassName);
String pingClassName = (String) clientConfig
.getProperty(CommonClientConfigKey.NFLoadBalancerPingClassName);
IRule rule;
IPing ping;
try {
rule = (IRule) ClientFactory.instantiateInstanceWithClientConfig(
ruleClassName, clientConfig);
ping = (IPing) ClientFactory.instantiateInstanceWithClientConfig(
pingClassName, clientConfig);
} catch (Exception e) {
throw new RuntimeException("Error initializing load balancer", e);
}
initWithConfig(clientConfig, rule, ping);
}

然后回到DynamicServerListLoadBalancer的initWithNiwsConfig方法。

实例化ServerList接口

ServerList接口定义了如何获取Server列表。

1
2
3
4
5
6
7
8
9
10
11
12
public interface ServerList<T extends Server> {
public List<T> getInitialListOfServers();
/**
* Return updated list of servers. This is called say every 30 secs
* (configurable) by the Loadbalancer's Ping cycle
*
*/
public List<T> getUpdatedListOfServers();
}

实现类就是配置文件中指定的com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList。接口中的两个方法都调用了obtainServersViaDiscovery()方法。

1
2
3
4
5
6
7
8
9
@Override
public List<DiscoveryEnabledServer> getInitialListOfServers(){
return obtainServersViaDiscovery();
}
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
return obtainServersViaDiscovery();
}

obtainServersViaDiscovery()中主要的代码如下:

  1. 获取EurekaClient,这里调用了使用了依赖注入的方式(javax.inject包),Provider eurekaClientProvider;
  2. 使用EurekaClient获取List
  3. 初始化DiscoveryEnabledServer,放到serverList中
1
2
3
4
5
6
7
8
9
10
11
12
13
//1.
EurekaClient eurekaClient = eurekaClientProvider.get();
//......
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
// if targetRegion is null, it will be interpreted as the same region of client
//2
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
//......
//3
DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
des.setZone(DiscoveryClient.getZone(ii));
serverList.add(des);

实例化ServerListFilter

ServerListFilter实现统计相关的内容,这里就略过了。

1
2
3
4
5
6
if (niwsServerListImpl instanceof AbstractServerList) {
AbstractServerListFilter<T> niwsFilter = ((AbstractServerList) niwsServerListImpl)
.getFilterImpl(clientConfig);
niwsFilter.setLoadBalancerStats(getLoadBalancerStats());
this.filter = niwsFilter;
}

实例化ServerListUpdater

ServerListUpdater的作用是更新server list,默认是PollingServerListUpdater。通过一个线程池,周期性的执行updateAction.doUpdate();方法更新列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}

ServerList接口何时调用

ServerList的接口已经明确了,那么什么时候调用呢?

  • 初始化完成后,会调用restOfInit方法,代码如下
  • enableAndInitLearnNewServersFeature方法调用serverListUpdater.start(updateAction)启动线程池
  • 属性updateAction是ServerListUpdater.UpdateAction的一个实现,会被定时调用
  • doUpdate方法调用了updateListOfServers()
  • updateListOfServers()首先通过ServerList接口获取接口列表,然后通过循环调用s.setAlive(true); 设置状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature();
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
public void enableAndInitLearnNewServersFeature() {
LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
serverListUpdater.start(updateAction);
}
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
protected void updateAllServerList(List<T> ls) {
// other threads might be doing this - in which case, we pass
if (serverListUpdateInProgress.compareAndSet(false, true)) {
for (T s : ls) {
s.setAlive(true); // set so that clients can start using these
// servers right away instead
// of having to wait out the ping cycle.
}
setServersList(ls);
super.forceQuickPing();
serverListUpdateInProgress.set(false);
}
}

至此,整个过程结束。

示例代码

附录

  1. 大型网站架构系列:负载均衡详解