zookeeper框架入门介绍与应用
宋标 Lv5

Zookeeper框架入门介绍与应用

1.简介

ZooKeeper 是一个开源的分布式协调服务,它的设计目标是将那些复杂且容易出错的分布式一致性服务封装起来,构成一个高效可靠的原语集,并以一系列简单易用的接口提供给用户使用。

特点:

  • 顺序一致性: 从同一客户端发起的事务请求,最终将会严格地按照顺序被应用到 ZooKeeper 中去。
  • 原子性: 所有事务请求的处理结果在整个集群中所有机器上的应用情况是一致的,也就是说,要么整个集群中所有的机器都成功应用了某一个事务,要么都没有应用。
  • 单一系统映像 : 无论客户端连到哪一个 ZooKeeper 服务器上,其看到的服务端数据模型都是一致的。
  • 可靠性: 一旦一次更改请求被应用,更改的结果就会被持久化,直到被下一次更改覆盖。

简单来说,分布式面临的就是数据一致性的问题,而Zookeeper框架解决这些问题。

2.数据结构

Zookeeper的数据结构类似于linux的文件系统结构,且保证每个路径唯一。

2.1 数据节点

每个数据节点在 ZooKeeper 中被称为 znode,它是 ZooKeeper 中数据的最小单元。你要存放的数据就放在上面。

每个 znode 由 2 部分组成:

  • stat :状态信息(具体不展开)
  • data : 节点存放的数据的具体内容

znode 分为 4 大类:

  • 持久(PERSISTENT)节点 :一旦创建就一直存在即使 ZooKeeper 集群宕机,直到将其删除。
  • 临时(EPHEMERAL)节点 :临时节点的生命周期是与 客户端会话(session) 绑定的,会话消失则节点消失 。并且,临时节点只能做叶子节点 ,不能创建子节点。
  • 持久顺序(PERSISTENT_SEQUENTIAL)节点 :除了具有持久(PERSISTENT)节点的特性之外, 子节点的名称还具有顺序性。比如 /node1/app0000000001 、/node1/app0000000002 。
  • 临时顺序(EPHEMERAL_SEQUENTIAL)节点 :除了具备临时(EPHEMERAL)节点的特性之外,子节点的名称还具有顺序性

3.基本命令

1.创建数据节点

1
2
[zk: 127.0.0.1:2181(CONNECTED) 0] create /codeRevolt hello
Created /codeRevolt

2.查看数据节点,查看根目录”/“下的

1
2
[zk: 127.0.0.1:2181(CONNECTED) 1] ls /
[codeRevolt, zookeeper]

3.查看数据节点信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[zk: 127.0.0.1:2181(CONNECTED) 2] get -s /codeRevolt
# 数据
hello
# 状态信息
cZxid = 0x35a
ctime = Sun Apr 09 06:43:17 UTC 2023
mZxid = 0x35a
mtime = Sun Apr 09 06:43:17 UTC 2023
pZxid = 0x35a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0

4.修改数据节点数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[zk: 127.0.0.1:2181(CONNECTED) 3] set /codeRevolt world
[zk: 127.0.0.1:2181(CONNECTED) 4] get -s /codeRevolt
world
cZxid = 0x35a
ctime = Sun Apr 09 06:43:17 UTC 2023
mZxid = 0x35d
mtime = Sun Apr 09 06:48:53 UTC 2023
pZxid = 0x35a
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0

5.删除数据节点
删除数据节点及所有子节点用deleteall

1
2
3
[zk: 127.0.0.1:2181(CONNECTED) 5] delete /codeRevolt
[zk: 127.0.0.1:2181(CONNECTED) 6] ls /
[zookeeper]

4.实战

用Zookeeper框架设计和实现分布式可重入锁,具体代码如下。

  1. maven引入Zookeeper客户端的jar包,这里用的是curator
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    <!--Zookeeper客户端-->
    <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.2.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.2.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>4.2.0</version>
    </dependency>
  2. 封装Zookeeper分布式锁实现方法。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    /**
    * @author CodeRevolt
    */
    public class ZkContributeLock {

    public static final CuratorFramework ZK_CLIENT;

    static {
    ZK_CLIENT = CuratorFrameworkFactory.builder()
    .connectString("127.0.0.1:2181")
    // 重试策略
    .retryPolicy(new ExponentialBackoffRetry(1000, 3))
    .build();
    ZK_CLIENT.start();
    }

    public static boolean isLock(String path) {
    try {
    return ZK_CLIENT.checkExists().forPath(path) != null;
    } catch (Exception e) {
    e.printStackTrace();
    }
    return true;
    }

    /**
    * 添加锁
    * @param mode 数据节点类型
    * @param path 路径
    * @param values 值
    * @return
    */
    public static boolean addLock(CreateMode mode, String path, byte[] values) {
    try {
    if (isLock(path) && Arrays.toString(values).equals(Arrays.toString(ZK_CLIENT.getData().forPath(path)))) {
    return true;
    }
    return ZK_CLIENT.create().creatingParentsIfNeeded().withMode(mode).forPath(path, values) != null;
    } catch (Exception e) {
    System.out.println(Thread.currentThread().getName() + " 抢锁失败...");
    }
    return false;
    }

    public static void releaseLock(String path) {
    try {
    ZK_CLIENT.delete().deletingChildrenIfNeeded().forPath(path);
    } catch (Exception e) {
    e.printStackTrace();
    }
    }


    }
  3. 测试代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    @Test
    void test() throws InterruptedException {
    String orderLock = "/locks/payLock";
    for (int i = 0; i < 10; i++) {
    new Thread(new Runnable() {
    @Override
    public void run() {
    while (true) {
    if (ZkContributeLock.addLock(CreateMode.PERSISTENT, orderLock, (Thread.currentThread().getId() + "").getBytes(StandardCharsets.UTF_8))) {
    System.out.println(Thread.currentThread().getName() + "获得锁...");
    // do something...
    if (Math.random() < 0.5) {
    ZkContributeLock.addLock(CreateMode.PERSISTENT, orderLock, (Thread.currentThread().getId() + "").getBytes(StandardCharsets.UTF_8));
    System.out.println(Thread.currentThread().getName() + "重入锁...");
    // do something...
    }
    ZkContributeLock.releaseLock(orderLock);
    System.out.println(Thread.currentThread().getName() + "释放锁...");
    return;
    }
    System.out.println(Thread.currentThread().getName() + "争抢锁中...");
    try {
    TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    }).start();
    }
    Thread.currentThread().join();
    }

    5.总结

    zookeeper生于解决分布式协调、数据一致性问题,基于zookeeper实现的分布式锁有着更高的稳定性和可靠性。zookeeper还有一个特别实用的watcher事件监听机制,篇幅有限,以后的文章将会对这个机制给出详细的应用场景和案例实现代码。

参考:

[1]https://javaguide.cn/distributed-system/distributed-process-coordination/zookeeper/zookeeper-intro.html
[2]https://blog.csdn.net/trntaken/article/details/108949114
[3]https://zookeeper.apache.org/doc/r3.5.8/zookeeperProgrammers.html#ch_zkGuarantees

 评论