up:: 处理watcher事件

说明: 介绍另外一种操作zookeeper的方式

Java操作Zookeeper

该客户端是zookeeper官方自带的,编程不是那么方便:

会话超时异常时,不支持自动重连,需要手动重新连接,编程繁琐 watcher 是一次性的,注册一次后会失效 节点数据是二进制,对象数据都需要转换为二进制保存 前面的data.getBytes()就是把字符串转为二进制。。。 不支持递归创建节点,需要先创建父节点再创建子节点 不支持递归删除节点,需要先删除子节点再删除父节点 原生zookeeper客户端和服务器端会话的建立是一个异步的过程,也就是说在程序中,我们程序方法在处理完客户端初始化后,立即返回(程序往下执行代码,这样,大多数情况下我们并没有真正构建好一个可用会话,在会话的声明周期处于”CONNECTED”时才算真正建立完毕,所以我们需要使用多线程中的一个工具类CountDownLatch来控制,真正的连接上zk客户端后,才可以继续操作zNode节点)


Java客户端Curator的使用

Netflix Curator(已经捐献给Apache维护):https://github.com/Netflix/curator Apache curator官网:http://curator.apache.org

Curator是Netflix公司在原生zookeeper基础上开源的一个Zookeeper Java客户端,目前Curator捐献给了Apache,现在是Apache下的一个开源项目,与Zookeeper提供的原生客户端相比,Curator的进行了高度的抽象和封装,简化了Zookeeper客户端的开发操作

Curator与ZkClient相比,功能更加的强大,不仅除了解决原生的zk Api的遗留问题,还提供了很多常用的工具类,也提供了很多解决方案,比如分布式锁。Curator API的使用更简洁方便,提供了流式的操作,可以点.点.点.的方式进行方法的调用,所以Curator是目前比较主流的zk客户端。

通过查看官方文档,可以发现Curator主要解决了三类问题:

封装ZooKeeper client与ZooKeeper server之间的连接处理 提供了一套Fluent风格的操作API(点.点.点.) 提供ZooKeeper各种应用场景(recipe)的抽象封装,比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等

使用Curator需要依赖jar包,使用以下方式,添加maven依赖

添加以下依赖:

 
<!-- curator-framework -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.2.0</version>
</dependency>
<!-- curator-recipes -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.2.0</version>
</dependency>
 

正式开发

package com.imooc.curator;
 
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
 
/**
 * 描述:     用Curator来操作ZK
 */public class CuratorTests {
 
    public static void main(String[] args) throws Exception {
        String connectString = "127.0.0.1:2181";
        String path = "/curator1";
        ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 10);
        CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retry);
        client.start();
        client.getCuratorListenable().addListener((CuratorFramework c, CuratorEvent event) -> {
            switch (event.getType()) {
                case WATCHED:
                    WatchedEvent watchedEvent = event.getWatchedEvent();
                    if (watchedEvent.getType() == EventType.NodeDataChanged) {
                        System.out.println(new String(c.getData().forPath(path)));
                        System.out.println("触发事件");
                    }
            }
        });
        String data = "test";
        String data2 = "test2";
        client.create().withMode(CreateMode.EPHEMERAL).forPath(path, data.getBytes());
 
        byte[] bytes = client.getData().watched().forPath(path);
        System.out.println(new String(bytes));
        client.setData().forPath(path, data2.getBytes());
        client.setData().forPath(path, data2.getBytes());
        client.setData().forPath(path, data2.getBytes());
        Thread.sleep(2000);
        client.delete().forPath(path);
        Thread.sleep(2000);
 
 
    }
}

**注意: if (watchedEvent.getType() == EventType.NodeDataChanged)语句,创建时是不会触发watch的! **

说明:


测试

如何多次触发监听?

    //永久监听
    String pathNew = "/curatorNew";
    client.create().withMode(CreateMode.EPHEMERAL).forPath(pathNew, data.getBytes());
 
    NodeCache nodeCache = new NodeCache(client, pathNew);
    nodeCache.start();
    nodeCache.getListenable().addListener(new NodeCacheListener() {
        @Override
        public void nodeChanged() throws Exception {
            ChildData currentData = nodeCache.getCurrentData();
            if (currentData != null) {
                System.out.println("触发了永久监听的回调,当前值为:" + new String(currentData.getData()));
            }
        }
    });
    client.setData().forPath(pathNew, data2.getBytes());
    Thread.sleep(2000);
    client.setData().forPath(pathNew, data2.getBytes());
    Thread.sleep(2000);
    client.setData().forPath(pathNew, data2.getBytes());
    Thread.sleep(2000);
    client.delete().forPath(pathNew);
}

说明: