如何用zookeeper实现注册中心

ZooKeeper简介

这里只会对zookeeper做简单的介绍,如果想要详细的了解ZooKeeper,请参考ZooKeeper官方网站或相关书籍。
ZooKeeper是一个开源的分布式协调服务,设计目标是将复杂的分布式一致性服务封装起来,构成一个高效可靠的原语集,并以一系列简单易用的接口提供给用户使用。

ZooKeeper特性

ZooKeeper可以保证如下分布式一致性特性:

  • 顺序一致性:从一个客户端发起的事务请求,最终将会严格的按照发起顺序并应用到中
  • 原子性:保证操作在整个集群是原子性的。所有机器要么都成功应用某一个事务,要么都不应用。
  • 单一视图:无论客户端连接的是哪个ZooKeeper服务器,看到的数据都是一致的。
  • 实时性:ZooKeeper保证在一定的时间段内,客户端最终一定能够从服务端上读取到最新的数据状态。
  • 可靠性:一旦服务端完成了一个事务,并完成了对客户端的响应,该事务引起的变更会被一直保留下来。

    ZooKeeper相关基本概念

  • 会话(Session):session是客户端会话。ZooKeeper的客户端与服务器之间是一个TCP长连接,连接开始建立,session也就建立了;只要该连接断开时间不超过一个设定值,会话就一直有效。
  • 数据节点(Znode):ZooKeeper的数据模型是一棵树,例如:/root/foo。其中每个路径就是一个Znode,每个Znode上都可以保存自己的数据。Znode可以分为持久节点和临时节点两类。
    • 持久节点是指一旦Znode被创建了,除非进行移除操作,否则这个Znode会一直保持在Znode上。
    • 临时节点在生命周期内会和客户端会话绑定,一旦会话失效,那么这个客户端所创建的所有临时节点都会被移除。
  • Watcher:ZooKeeper允许用户在指定节点上注册一些Watcher,在一些特定事件触发的时候,ZooKeeper客户端会将事件通知感兴趣的客户端。

注册中心与zk特性

了解了ZooKeeper的特性和基本概念以后,我们就可以开始用zk实现注册中心了。在上篇文章中,我们提到了注册中心所必备的一些功能,下面我们就逐一分析一下,如何使用zk来实现这些功能。需要说明的是,本实例中,我们只考虑最简单的情况,以演示基本功能为目的,肯定是非常不完善的。

服务注册表

功能 ZooKeeper
数据如何存储 Znode
如何提供注册服务 在某个目录注册临时节点
如何提供查询服务 查询某个目录下的临时节点
通知Consumer的方式 利用Watcher机制,Consumer自动获取通知
服务注册表节点之间如何进行信息的同步和复制 ZooKeeper单一视图,保证一致性
当所有的服务注册表节点都Down掉后重新启动,如何重新获取注册信息 Provider失败重连

Service Provider

功能 ZooKeeper
启动时向服务注册表发送注册信息 在某个目录注册临时节点
关闭时向服务注册表发送取消信息 断开后
保持心跳 客户端与Zookeeper长连接

Consumer

功能 ZooKeeper
通过服务注册表查询Provider的实例 查询某个目录下的临时节点
客户端负载均衡 Zookeeper不提供,需自己实现
失败重试 Zookeeper不提供,需自己实现
客户端缓存 Zookeeper不提供,需自己实现

可以看出,服务注册表、Service Provider、Consumer三者的绝大多数重要功能,都可以通过Zookeeper来实现。

具体实现

接口定义

注册接口

对应服务注册表的功能,首先定义以下接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public interface Registry {
/**
* 注册数据,比如:提供者地址,消费者地址,路由规则,覆盖规则,等数据。
*/
void register(Entity entity) throws Exception;
/**
* 取消注册.
*/
void unregister(Entity entity) throws Exception;
/**
* 订阅符合条件的已注册数据,当有注册数据变更时自动推送.
*
*/
void subscribe(Entity entity, ChildListener listener) throws Exception;
/**
* 取消订阅.
*/
void unsubscribe(Entity entity, ChildListener listener) throws Exception;
/**
* 根据服务名称查询服务
*/
List<String> lookup(String serviceName) throws Exception;
}

ZooKeeper接口

用于操作ZooKeeper,可以使用不同的客户端框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* ZK操作工具接口
*
*/
public interface ZkClient {
/**
* 创建临时节点
*/
void create(String url, boolean ephemeral) throws Exception;
/**
* 删除临时节点
*/
void delete(String url) throws Exception;
/**
* 监测节点
*/
void watchChild(String url, ChildListener listener) throws Exception;
/**
* 获取子节点
*/
List<String> getChild(String url) throws Exception;
}

服务实体

用来标识一个服务。每个服务有两个主要要素:

  1. 服务名称,Consumer要通过服务名称来查询一个服务。
  2. 节点信息,用来标识一个服务实例,一个服务可能存在多个实例,node把自己与其他实例区分开来。
  3. 还有一个辅助要素,type字段,用来标识是Consumer还是Provider,便于生成url。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Entity {
public static final int PROVIDER = 0;
public static final int CONSUMER = 1;
/**
* 服务名称
*/
private String service;
/**
* 节点,标识一个服务实例
*/
private String node;
/**
* 类型
*/
private int type;
public String getService() {
return service;
}
public void setService(String service) {
this.service = service;
}
public int getType() {
return type;
}
public void setType(int type) {
this.type = type;
}
public String getNode() {
return node;
}
public void setNode(String node) {
this.node = node;
}
}

接口实现

ZK操作工具类Curator实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* ZK操作工具类Curator实现
*/
public class CuratorZkClient implements ZkClient{
private CuratorFramework client;
public CuratorZkClient(String url) {
RetryPolicy retry = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.newClient(url, 5000, 3000,retry);
client.start();
}
public void create(String url, boolean ephemeral) throws Exception {
CreateMode mode = ephemeral ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT;
client.create().creatingParentsIfNeeded().withMode(mode).forPath(url);
}
public void delete(String url) throws Exception {
client.delete().forPath(url);
}
public void watchChild(String url, ChildListener listener) throws Exception {
PathChildrenCache cache = new PathChildrenCache(client, url, true);
cache.start(StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(listener);
}
public List<String> getChild(String url) throws Exception {
return client.getChildren().forPath(url);
}
}

ZooKeeper注册类实现

  • 根节点是”/zookeeper-registry-example”
  • Provider节点格式:$ROOT/provider/$ServiceName/$Node
  • Consumer节点格式:$ROOT/consumer/$ServiceName/$Node
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    public class ZookeeperRegistry implements Registry {
    private static final String ROOT = "/zookeeper-registry-example";
    private ZkClient client;
    public ZookeeperRegistry(String url) {
    this.client = new CuratorZkClient(url);
    }
    public void register(Entity entity) throws Exception {
    client.create(getUrl(entity), true);
    }
    public void unregister(Entity entity) throws Exception {
    client.delete(getUrl(entity));
    }
    public void subscribe(Entity entity, ChildListener listener) throws Exception {
    client.watchChild(getProviderUrl(entity.getService()), listener);
    }
    public void unsubscribe(Entity entity, ChildListener listener)
    throws Exception {
    // TODO Auto-generated method stub
    }
    @Override
    public List<String> lookup(String serviceName) throws Exception {
    return client.getChild(getProviderUrl(serviceName));
    }
    private String getUrl(Entity entity) {
    if(Entity.PROVIDER == entity.getType()) {
    return getProviderNodeUrl(entity);
    }else{
    return getConsumerNodeUrl(entity);
    }
    }
    private String getProviderUrl(String serviceName) {
    return ROOT + "/" + "provider/" + serviceName;
    }
    private String getProviderNodeUrl(Entity entity) {
    return getProviderUrl(entity.getService()) + "/" + entity.getNode();
    }
    private String getConsumerUrl(Entity entity) {
    return ROOT + "/" + "consumer/" + entity.getService() + "/" + entity.getNode();
    }
    private String getConsumerNodeUrl(Entity entity) {
    return getConsumerUrl(entity) + "/" + entity.getNode();
    }
    }

Provider

由于仅仅是示例,具体的操作不封装在provider中,而只包括实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Provider {
private Entity entity;
public Entity getEntity() {
return entity;
}
public void setEntity(Entity entity) {
this.entity = entity;
}
public Provider(Entity entity) {
this.entity = entity;
}
}

Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class Consumer implements ChildListener {
//provider列表
private Set<String> providers;
private Entity entity;
public Consumer(String service, String node) {
this.entity = new Entity();
this.entity.setService(service);
this.entity.setNode(node);
this.entity.setType(Entity.CONSUMER);
this.providers = new HashSet<String>();
}
/**
* 获取节点变化状态,更新provider列表
*/
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
// System.out.println("CHILD_ADD," + event.getData().getPath());
providers.add(event.getData().getPath());
break;
case CHILD_UPDATED:
// System.out.println("CHILD_UPDATEED" + event.getData().getPath());
providers.add(event.getData().getPath());
break;
case CHILD_REMOVED:
// System.out.println("CHILD_REMOVED," + event.getData().getPath());
providers.remove(event.getData().getPath());
break;
default:
break;
}
}
/**
* 方便查看provider信息
*/
public void printProviders() {
System.out
.println("------------->Print providres by consumer:start<----------");
for (String url : providers) {
System.out.println(url);
}
System.out
.println("------------->Print providres by consumer:end<----------");
}
public Set<String> getProviders() {
return providers;
}
public Entity getEntity() {
return entity;
}
}

测试

Provider注册

  • 运行test, 使用zkClient连接zookeeper,查看zk, 存在/zookeeper-registry-example/provider/test_provider_register/node1节点
  • test运行结束后,node1节点被删除,/zookeeper-registry-example/provider/test_provider_register下为空
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
public void test_provider_register() throws Exception {
String serviceName = "test_provider_register";
String nodeName = "node1";
String zkServer = "127.0.0.1:2181";
CountDownLatch latch = new CountDownLatch(1);
Entity entity = new Entity();
entity.setService(serviceName);
entity.setNode(nodeName);
entity.setType(Entity.PROVIDER);
Registry registry = new ZookeeperRegistry(zkServer);
registry.register(entity);
latch.await(180, TimeUnit.SECONDS);
}

Provider取消注册

  • 调用registry.register(entity);注册后,/zookeeper-registry-example/provider/test_provider_unregister/node1节点存在
  • 调用registry.unregister(entity);后,/zookeeper-registry-example/provider/test_provider_unregister/node1节点被删除
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Test
public void test_provider_unregister() throws Exception {
String serviceName = "test_provider_unregister";
String node = "node1";
String zkServer = "127.0.0.1:2181";
CountDownLatch latch = new CountDownLatch(1);
Entity entity = new Entity();
entity.setService(serviceName);
entity.setNode(node);
entity.setType(Entity.PROVIDER);
Registry registry = new ZookeeperRegistry(zkServer);
registry.register(entity);
System.out.println("-----------------> register");
Thread.sleep(30000);
registry.unregister(entity);
System.out.println("-----------------> unregister");
latch.await(180, TimeUnit.SECONDS);
}

Consumer Lookup

  • 首先provider实例node1,node2进行注册
  • consumers实例node1注册
  • consumer lookup, 获取到provider的node1, node2实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Test
public void test_consumer_lookup() throws Exception {
String ServiceName = "test_consumer_lookup";
String zkServer = "127.0.0.1:2181";
String node = "node1";
//注册
Entity entity = new Entity();
entity.setService(ServiceName);
entity.setNode(node);
entity.setType(Entity.PROVIDER);
Registry provider1Registry = new ZookeeperRegistry(zkServer);
provider1Registry.register(entity);
System.out.println("-----------------> node1注册");
Registry provider2Registry = new ZookeeperRegistry(zkServer);
entity.setNode("node2");
provider2Registry.register(entity);
System.out.println("-----------------> node2注册");
Consumer consumer = new Consumer(ServiceName, node);
Registry consumerRegistry = new ZookeeperRegistry(zkServer);
consumerRegistry.register(consumer.getEntity());
System.out.println("-----------------> consumer lookup ");
consumerRegistry.lookup(ServiceName).forEach(name -> System.out.println(name));
}

Consumer获取通知

provider实例变化时,consumer自动获取通知

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Test
public void test_provider_register_consumer_watch() throws Exception{
ZkClient zkClient = new CuratorZkClient("127.0.0.1:2181");
//provider注册
Entity entity = new Entity();
entity.setService("test_provider_register_consumer_watch");
entity.setNode("node1");
entity.setType(Entity.PROVIDER);
Registry provider1Registry = new ZookeeperRegistry("127.0.0.1:2181");
provider1Registry.register(entity);
System.out.println("-----------------> node1注册");
System.out.println("providers: " + zkClient.getChild("/zookeeper-registry-example/provider/test_provider_register_consumer_watch"));
//consumer订阅
Consumer consumer = new Consumer("test_provider_register_consumer_watch", "node1");
Registry consumerRegistry = new ZookeeperRegistry("127.0.0.1:2181");
System.out.println("-----------------> consumer注册");
consumerRegistry.register(consumer.getEntity());
System.out.println("consumers:" + zkClient.getChild("/zookeeper-registry-example/consumer/test_provider_register_consumer_watch"));
System.out.println("-----------------> consumer订阅");
consumerRegistry.subscribe(entity, consumer);
Thread.sleep(2000);
consumer.printProviders();
//provider2注册
Registry provider2Registry = new ZookeeperRegistry("127.0.0.1:2181");
entity.setNode("node2");
provider2Registry.register(entity);
System.out.println("-----------------> node2注册");
// System.out.println(zkClient.getChild("/zookeeper-registry-example/provider/test_provider_register_consumer_watch"));
Thread.sleep(2000);
consumer.printProviders();
//provider3注册
Registry provider3Registry = new ZookeeperRegistry("127.0.0.1:2181");
entity.setNode("node3");
provider3Registry.register(entity);
System.out.println("-----------------> node3注册");
// System.out.println(zkClient.getChild("/zookeeper-registry-example/provider/test_provider_register_consumer_watch"));
Thread.sleep(2000);
consumer.printProviders();
//provider3 退出
System.out.println("-----------------> node3退出");
provider3Registry.unregister(entity);
Thread.sleep(2000);
consumer.printProviders();
}

源码

源码