Zookeeper基础

Posted by Fioncat on March 12, 2018

Zookeeper(以下简称ZK), 动物管理员。是一个分布式应用程序的协调服务框架,是Hadoop的一个重要组成组件。

分布式应用需要解决的问题:

  • 数据一致性
  • 统一的命名服务
  • 配置管理
  • 分布式锁
  • 集群管理

ZK安装

参见官网教程…(需要安装在Linux系统下)

ZK指令和数据结构

ZK有一个最开始的节点(/)。ZK的节点叫做znode节点,每个znode节点都可以存储数据,每个znode节点都可以创建自己的子节点。

znode和ZK根节点组成了一个znode树。znode树维系在内存中,查询起来是很快的。

每个znode都有一个名称,并且这个名称是唯一的。

要想运行ZK的指令,需要先启动ZKServer,然后启动ZKClient。

在ZK的bin路径下,执行下面两个命令:

1
2
./zkServer.sh start
./zkCli.sh

启动ZK客户端后可以进入ZK的命令行模式,有如下常见的命令:

  • ls: 查看节点,例如:ls /
  • create znode data:创建节点,例如:create /zk01 hello
    • 不加参数表示创建普通持久节点
    • 加上-e参数表示创建普通临时节点,这样的节点在连接断开后将会消失。利用临时节点可以检查集群节点有没有挂掉。注意临时节点不能拥有子节点。
    • 加上-s表示创建一个顺序持久节点,这样的节点在不同的服务器抢占时会产生一个递增的顺序号。利用它可以实现分布式锁。
    • 加上-s -e就表示创建临时顺序持久节点。
  • get znode:查看节点数据,含有以下信息:
    • DATA: 具体数据
    • cZxid: 节点创建时的zxid
    • ctime: 创建节点的时间戳
    • mZxid: 节点最新一次更新发生的zxid
    • mtime: 修改此节点数据的最新时间戳
    • cversion: 子节点的更新次数
    • dataVsersion: 数据版本号,每次数据发生修改,递增1
    • aclVersion: 节点ACL(授权信息)更新次数
    • ephemeralOwner: 如果节点为临时的,表示和该节点绑定的session id。如果不是则为0。
    • dataLength: 数据大小
    • numChildren: 子节点个数
  • set znode data: 修改节点数据
  • delete znode: 删除一个节点。注意如果节点有子节点则禁止删除。
  • quit: 退出ZK客户端

对于ZK集群中每个节点来说,znode树都是一致的,命名也是完全一致的。具体在后面会有介绍。S

ZK API

Zookeeper是由Java实现的分布式框架。下面我们来看如果通过Java程序来操作znode。

基本方法

ZooKeeper是一个类,构造方法参数如下:

  • connectString: 包含连接信息的字符串,格式是:zk服务IP地址:端口号。zk默认服务监听端口是2181。
  • sessionTimeout: 表示会话的超时时间,单位是ms。
  • watcher: 观测者对象

在默认情况下,创建时采用的是非阻塞的模式。

以下代码可以检测是否连接上zkServer:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class TestConnect {

    public static void main(String[] args) throws IOException {

        ZooKeeper zk = new ZooKeeper("192.168.117.21:2181", 3000,
            (watchedEvent) -> System.out.println("连接成功!")
        );

        while (true);

    }

}

加上while(true)是为了防止main线程提前结束导致”连接成功”无法被及时打印出来。

在连接的时候可能产生java.net.ConnectException。这是因为在服务器上没有打开zkServer。

我们可以使用闭锁的技术来保证Zookeeper连接成功再执行接下来的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class TestConnect {

    public static void main(String[] args) throws IOException,
            InterruptedException {

        final CountDownLatch count = new CountDownLatch(1);

        ZooKeeper zk = new ZooKeeper("192.168.117.21:2181", 3000,
            (watchedEvent) -> {
                System.out.println("连接成功!");
                count.countDown();
            }
        );

        count.await();

        // 执行到此处,连接一定是成功的!
    }

}

利用zk对象可以做上面介绍的在终端中的操作。

创建znode的API如下:

  • String create(String path, byte data[], List<ACL> acl, CreateMode createMode);

其参数的含义如下:

  • path: 表示要创建znode的名称(绝对路径)
  • data: 表示节点保存的数据
  • acl: 表示节点的权限,在ZooDefs.Ids下有定义,例如OPEN_ACL_UNSAFE就表示所有人都可以对节点进行CRUD。
  • createMode: 表示创建的节点模式,在CreateMode下有如下选择:
    • PERSISTENT: 持久普通节点
    • EPHEMERAL: 临时普通节点
    • PERSISTENT_SEQUENTIAL: 持久序列节点
    • EPHEMERAL_SEQUENTIAL: 临时序列节点

创建节点的代码:

1
2
3
4
5
zk.create("/zk01/node1", "some data...".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

zk.create("/zk01/node2", "other data...".getBytes(),
        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

此时就可以在客户端看到创建的节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
[zk: localhost:2181(CONNECTED) 9] ls /zk01
[node2, node1]
[zk: localhost:2181(CONNECTED) 10] get /zk01/node1
some data...
cZxid = 0x1f
ctime = Wed Jan 17 08:32:41 CST 2018
mZxid = 0x1f
mtime = Wed Jan 17 08:32:41 CST 2018
pZxid = 0x1f
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 12
numChildren = 0
[zk: localhost:2181(CONNECTED) 11] get /zk01/node2
other data...
cZxid = 0x20
ctime = Wed Jan 17 08:32:41 CST 2018
mZxid = 0x20
mtime = Wed Jan 17 08:32:41 CST 2018
pZxid = 0x20
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 13
numChildren = 0

如果节点已经存在了,会抛出org.apache.zookeeper.KeeperException$NodeExistsException这个异常。

对于create()方法,还有一个重载版本:

  • void create(path, data, acl, mode, StringCallBack scb, Object ctx);

多的两个参数描述如下:

  • scb: 节点创建成功之后调用接口中的processResult(int rc, String path, Object ctx, String name)。方法参数描述如下:
    • rc: 0表示创建成功,-110表示创建失败
    • path: 创建后的节点路径,这个path是由create调用的时候传进来的。
    • ctx: 从外部传入的信息
    • name: 节点名称。创建成功,和path相同,失败则为null。
  • ctx: 传入的信息

这个重载create的使用示例如下:

1
2
3
4
5
6
7
8
9
zk.create("/zk03", "haha".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
        CreateMode.EPHEMERAL, (rc, path, ctx, name) -> {

    if (rc == 0) {
        System.out.println("节点创建成功!");
    }


}, 110);

取得节点详细信息(get)API定义如下:

  • byte[] getData(String path, Watcher watcher, Stat stat);

它对应的参数描述如下:

  • path: 节点的全路径
  • watcher:观测者对象
  • stat: 表示get后节点的状态

stat实际上是除了节点数据以外的节点相关信息。可以通过各种get方法获取这些信息。

使用getData来获取/zk01/node1的示例如下:

1
2
3
4
5
// getData():
Stat stat = new Stat();
byte data[] = zk.getData("/zk01/node1", null, stat);
System.out.println("data: " + new String(data));
System.out.println("stat: " + stat);

对应于getData,还有setData函数:

  • Stat setData(String path, byte data[], int version);

参数描述:

  • path: 要修改节点的路径
  • data: 修改的数据
  • version: 表示节点的版本号

特别注意version。它必须和节点原来的version一样,否则会抛出KeeperException$BadVersionException。如果设置为-1则表示不管版本是什么都强制执行。

这个方法会返回节点的状态。

所以我们可以这么使用getData():

1
2
// setData():
zk.setData("/zk01/node1", "aaaaaa".getBytes(), -1);

ZK还提供了获取某个节点所有子节点的方法:

  • List<String> getChildren(String path, Watcher watcher, [Stat stat]);

它返回的是子节点的名称。注意stat保存的是父节点的信息。

例如通过Java查看/zk01的子节点:

1
2
[zk: localhost:2181(CONNECTED) 14] ls /zk01
[node2, node1]

则使用代码:

1
2
3
4
5
// getChildren():
List<String> children = zk.getChildren("/zk01", null);
for (String child : children) {
    System.out.println("子节点:" + child);
}

终端输出:

1
2
子节点:node2
子节点:node1

删除节点使用:

  • void delete(String path, int version);

同样,如果不确定版本,version就传-1。

监听节点变动

我们发现几乎所有改变节点的额方法都可以传一个Watcher对象。这实际上就是一个监听器,它可以实时监听节点的变动并产生特定的行为。

这个接口的process(WatcherEvent)就是在每次节点变动的时候会调用的方法。watcherEvent记录了事务的类型,我们可以通过getType()方法来取出事务类型。

例如,我们在使用get的时候可以这么写:

1
2
3
4
5
6
7
8
9
10
Stat stat = new Stat();
byte data[] = zk.getData("/zk01/node1", (watcherEvent) -> {

    if (watcherEvent.getType() ==
            Watcher.Event.EventType.NodeDataChanged) {
        System.out.println("节点被修改了!");
    }

}, stat);
while (true);

死循环用于一直保持main线程的状态从而一直监听节点的变动。

这样,在zkClient中输入命令:

1
[zk: localhost:2181(CONNECTED) 17] set /zk01/node1 new

我们会发现Java终端打印出了”节点被修改了!”。

那么我们再修改一次数据,会发现Java终端没有再打印了。这是watcher的特性,它只会监听一次节点的事件。当事件发生完成之后,它就会被销毁。

如果要实现永久监听,可以修改代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Stat stat = new Stat();
while (true) {
    final CountDownLatch cdl = new CountDownLatch(1);
    byte data[] = zk.getData("/zk01/node1", (watcherEvent) -> {

        if (watcherEvent.getType() ==
                Watcher.Event.EventType.NodeDataChanged) {
            System.out.println("节点被修改了!");
            cdl.countDown();
        }

    }, stat);
    cdl.await();
}

采用这种轮询+闭锁的方式,就可以实现对节点某个事件的持久化监听。

我们还可以对某个节点的所有子节点进行监听:

1
2
3
zk.getChildren("/zk01", (watcherEvent) -> {
    System.out.println("zk01的子节点被改变啦!");
});

这个子节点监听只会监听子节点的创建和删除,对于数据的修改则不会监听。

ZooKeeper集群

上面的操作都是在单机情况下进行的,下面演示如何真正地创建Zookeeper集群。

conf/zoo.cfg是ZK的核心配置文件,集群的配置就是在此处完成的。

首先来看zoo.cfg的配置说明:

  • ticktime: 心跳间隔周期,毫秒。
  • initLimit: 初始连接超时的预值。指的是follower初始连接leader时会等待initLimit * ticktime的时间,如果时间过了还没有连接上,则放弃连接。
  • syncLimit: 指的是follower和leader做数据交互的超时时间,也是syncLimit * ticktime。
  • dataDir: 指的是zookeeper的持久化目录。
  • clientPort: 指的是zookeeper服务使用的端口号。

在zoo.cfg中需要配置各个节点的信息:

  • server.#=ip:port1:port2

#表示这个节点的选举号。ip表示节点的ip地址,port1表示选举端口,port2表示原子广播的端口。

例如,有三个节点的集群可以在后面这样配置:

1
2
3
server.1=192.168.117.22:2888:3888
server.2=192.168.117.23:2888:3888
server.3=192.168.117.24:2888:3888

随后需要在dataDir指定的目录中创建一个myid文件,保存当前机器的选举号。

配置完以后就可以启动zookeeper服务了。

ZK集群中,有一个节点是作为leader的,它是用来管理整个分布式资源的。其余节点作为follower,它们是被leader管理的。

通过./zkServer status可以查看当前节点是leader还是follower。区别在于Mode上。例如,下面就是一个leader节点:

1
2
3
4
[root@localhost bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/soft/zookeeper-3.4.7/bin/../conf/zoo.cfg
Mode: leader

选举机制

znode节点的状态信息中包含czxid和mzxid,ZK状态的每一次改变,都对应一个递增的Transaction id,该id称为zxid。由于该zxid递增的性质,如果zxid1小于zxid2,那么zxid1肯定先于zxid2发生。创建节点、更新节点数据、删除节点,都会导致zxid加一。

在ZK选举新的leader的时候。会遵循以下的选举原则:

  • 首先比较zxid,最大的作为leader。
  • 如果zxid比较不出来,则比较myid(选举id),最大的当leader。
  • 选举的前提是满足过半同意

因为过半同意的机制,所以ZK集群做单数节点数,选举出leader的概率更大。

ZK的选举分为两个阶段:

  • 数据回复阶段:当ZK服务器启动时,会先从本地磁盘找到本机的最大事务id
  • 选举阶段
    • ZK服务器会把zxid和myid提交从而进行选举
    • ZK维护一个逻辑时钟值,确保每个节点在同一轮选举中
    • ZK服务器一共有四种状态:
      • Looking:选举阶段
      • Following:小弟
      • Leading:领导
      • Observering:观察者。这种节点不会参与选举,适合大集群情况。

在Leader选取出来之后,其身上肯定是有最新数据的(有最大的事务id),所以首先做的事就是原子广播(通过原子广播端口),让follower更新自己的数据。这是为了确保数据的一致性,防止因为leader的宕机产生的数据不一致。

对于每次事务更新,leader会通过原子广播征询其它follower,只有过半同意才会真正地执行事务。而且这样的事务执行是在所有节点上都会执行的。

一般来说,在投票的时候,节点都会投赞同票。除非网络异常或节点宕机才会让节点投反对票。

那么也就是说,在一个集群中如果宕机的机器太多(超过半数),那么这个集群就无法工作了(任何事务投票都不会通过),直到运维人员修复至少一半的节点。

所以我们在一个follower节点执行一个create操作,那么在其它所有节点执行ls /操作会发现这个数据已经全部被创建了。

在节点3(follower)操作:

1
2
[zk: localhost:2181(CONNECTED) 1] create /zk01 nihao
Created /zk01

在节点2(leader)查看:

1
2
[zk: localhost:2181(CONNECTED) 0] ls /
[zk01, zookeeper]

要是集群节点正常,理论上对一个znode的修改会导致集群所有节点的zxid递增。所以ZK为什么要在选举的时候优先选择zxid高的,就是为了保证即使在异常发生以后,数据也能尽可能保证最新化,一致化。