avatar

Zookeeper Curator客户端

简介

CuratorNetflix 公司开源的一套zk客户端框架,与ZKclient一样,其也封装了zk原生API,其目前已经称为Apache的顶级项目。同时Curator还提供了一套易用性、可读性 更强Fluent风格的客户端API框架。

API介绍

这里主要以Fluent风格客户端API为主进行介绍。

创建会话

普通API创建newClient()

在CuratorFrameworkFactory类中提供了两个静态方法用于完成会话的创建

  • newClient(String connectString,int sessionTimeoutMs,int connectionTimeoutMs,RetryPolicy retrypolicy)
  • newClient(Stirng connectString,RetryPolicy retryPolicy)
参数名称 意义
connectString 指定zk服务器 列表 ,由英文逗号分开的host:port字符串组成
sessionTimeoutMs 设置会话超时时间,单位毫秒,默认60
connectionTimeoutMs 设置连接超时时间,单位毫秒,默认15
retryPolicy 重试策略,内置了四种策略:ExponentialBackoffRetryRetryNTimesRetryOneTimeRetryUntilElapsed

Fluent风格创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class FluentTest{
public static void main(String[] args) throws Excepetion{
//创建重试策略对象;第一秒重试一次,最多重试3次
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000,3);

//创建客户端
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("zkOS:2180")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.retryPolicy(retryPolicy)
.namespace("drunk")
.build();
//开启客户端
client.start();
}
}

创建节点

下面使用的client为前面所创建的Curator客户端实例

  • 创建一个持久节点,初始内容为空
    client.create().forPath(path)

  • 创建一个持久节点,附带初始内容;Curator在指定数据内容时,只能使用byte[]作为方法参数。
    client.create().forPath(path,"mydata".getBytes())

  • 创建一个临时节点,初始内容为空。 说明CreateMode为枚举类型
    client.create().withMode(CreateMode.EPHEMERAL).forPath(path)

  • 创建一个临时节点,并自动递归创建 父节点; 若指定的节点多级父节点均不存在,则会自动创建。
    client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path)

删除节点

delete()

  • 删除一个节点,只能删除叶子节点,其父节点 不会删除
    client.delete().forPath(path)

  • 删除一个节点,并递归删除所有子节点
    client.delete().deletingChildrenIfNeeded().forPath(path)

更新数据

setData()

  • 设置一个节点的数据内容,返回值为org.apache.zookeeper.data.Stat状态对象
    client.setData().forPath(path,newData)

检查节点是否存在

checkExists()

  • 检查节点是否存在,返回值为org.apache.zookeeper.data.Stat状态对象,若stat为null,说明该节点不存在,否则说明该节点是存在的
    Stat stat = client.checkExists().forPath(path)

获取节点数据内容

getData()

  • 读取一个节点的数据内容,返回值为byte[]数组
    byte[] data = client.getData().forPath(path)

获取子节点列表

getChildren()

  • 读取一个节点的所有子节点
    List<String> childrenNames = client.getChildren().forPath(path)

watcher注册

usingWatcher()

curator绑定watcher的操作有三个:checkExists()、getData()、getChildren()。这三个方法的共性是,它们都是用于获取的,这三个操作用于watcher注册的方法是相同的,都是usingWatcher()。
usingWatcher(CuratorWatcher watcher)
usingWacther(Watcher watcher)

这两个方法中的参数CuratorWatcher与Watcher都为接口,这两个接口均包含一个process()方法,它们的区别是CuratorWatcher中的process()方法能够抛出异常,这样就可以在日志中记录该异常。

  • 监听节点的存在性变化

    1
    2
    3
    Stat stat = client.checkExists().usingWatcher(
    (CuratorWatcher)event -> {System.out.println("节点存在性发生变化")}
    ).forPath(path)
  • 监听节点的内容变化

    1
    2
    3
    Byte[] data = client.getData().usingWatcher(
    (CuratorWatcher)event -> {System.out.println("节点数据内容发生变化")}
    ).forPath(path)
  • 监听节点子节点列表的变化

    1
    2
    3
    List<String> sons = client.getChildren().usingWatcher(
    (CuratorWatcher)event -> {System.out.println("节点的子节点列表发生变化")}
    ).forPath(path)

    代码演示

    1
    2
    3
    4
    5
    6
    <!-- curator依赖 -->
    <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.12.0</version>
    </dependency>
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    public class FluentTest{
    public static void main(String[] args) throws Excepetion{
    //创建重试策略对象;第一秒重试一次,最多重试3次
    ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000,3);

    //创建客户端
    CuratorFramework client = CuratorFrameworkFactory
    .builder()
    .connectString("zkOS:2180")
    .sessionTimeoutMs(5000)
    .connectionTimeoutMs(3000)
    .retryPolicy(retryPolicy)
    .namespace("drunk")
    .build();
    //开启客户端
    client.start();

    //指定要创建和操作的节点,注意,其是相对于/logs节点的
    String nodePath = “/host”;

    //创建节点
    String nodeName = client.create().forPath(nodePath,"drunk".getBytes());
    System.out.println("新创建的节点名称为: " + nodeName);

    //获取数据内容并注册watcher
    byte[] data = client.getData().usingWatcher((CuratorWatcher)event -> {
    System.out.println(event.getPath + "数据内容发生变化");
    }).forPath(nodePath);
    System.out.println("节点数据内容为: " + new String(data));

    //更新数据内容
    client.setData().forPath(nodePath,"new---Drunk".getBytes());
    //获取更新后的数据内容
    byte[] newData = client.getData().forPath(nodePath);
    System.out.println("更新过的数据内容为: " + new String(newData));

    //删除节点
    client.delete().forPath(nodePath);

    //判断节点是否存在
    Stat stat = client.checkExists().forPath(nodePath);
    boolean isExists = stat == null ? false : true;
    System.out.println(nodePath + "节点还存在吗?" + isExists)
    }
    }
文章作者:
文章链接: https://www.fundodoo.com/zh-CN/2020/04/07/12.html
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 醉探索戈壁
打赏
  • 微信
    微信
  • 支付寶
    支付寶

评论