up:: 处理watcher事件
说明: 介绍另外一种操作zookeeper的方式
Java操作Zookeeper
该客户端是zookeeper官方自带的,编程不是那么方便:
会话超时异常时,不支持自动重连,需要手动重新连接,编程繁琐 watcher 是一次性的,注册一次后会失效 节点数据是二进制,对象数据都需要转换为二进制保存 前面的data.getBytes()就是把字符串转为二进制。。。 不支持递归创建节点,需要先创建父节点再创建子节点 不支持递归删除节点,需要先删除子节点再删除父节点 原生zookeeper客户端和服务器端会话的建立是一个异步的过程,也就是说在程序中,我们程序方法在处理完客户端初始化后,立即返回(程序往下执行代码,这样,大多数情况下我们并没有真正构建好一个可用会话,在会话的声明周期处于”CONNECTED”时才算真正建立完毕,所以我们需要使用多线程中的一个工具类CountDownLatch来控制,真正的连接上zk客户端后,才可以继续操作zNode节点)
Java客户端Curator的使用
Netflix Curator(已经捐献给Apache维护):https://github.com/Netflix/curator Apache curator官网:http://curator.apache.org
Curator是Netflix公司在原生zookeeper基础上开源的一个Zookeeper Java客户端,目前Curator捐献给了Apache,现在是Apache下的一个开源项目,与Zookeeper提供的原生客户端相比,Curator的进行了高度的抽象和封装,简化了Zookeeper客户端的开发操作。
Curator与ZkClient相比,功能更加的强大,不仅除了解决原生的zk Api的遗留问题,还提供了很多常用的工具类,也提供了很多解决方案,比如分布式锁。Curator API的使用更简洁方便,提供了流式的操作,可以点.点.点.的方式进行方法的调用,所以Curator是目前比较主流的zk客户端。
通过查看官方文档,可以发现Curator主要解决了三类问题:
封装ZooKeeper client与ZooKeeper server之间的连接处理 提供了一套Fluent风格的操作API(点.点.点.) 提供ZooKeeper各种应用场景(recipe)的抽象封装,比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等
使用Curator需要依赖jar包,使用以下方式,添加maven依赖
添加以下依赖:
<!-- curator-framework -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.2.0</version>
</dependency>
<!-- curator-recipes -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
</dependency>
正式开发
package com.imooc.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
/**
* 描述: 用Curator来操作ZK
*/public class CuratorTests {
public static void main(String[] args) throws Exception {
String connectString = "127.0.0.1:2181";
String path = "/curator1";
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 10);
CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retry);
client.start();
client.getCuratorListenable().addListener((CuratorFramework c, CuratorEvent event) -> {
switch (event.getType()) {
case WATCHED:
WatchedEvent watchedEvent = event.getWatchedEvent();
if (watchedEvent.getType() == EventType.NodeDataChanged) {
System.out.println(new String(c.getData().forPath(path)));
System.out.println("触发事件");
}
}
});
String data = "test";
String data2 = "test2";
client.create().withMode(CreateMode.EPHEMERAL).forPath(path, data.getBytes());
byte[] bytes = client.getData().watched().forPath(path);
System.out.println(new String(bytes));
client.setData().forPath(path, data2.getBytes());
client.setData().forPath(path, data2.getBytes());
client.setData().forPath(path, data2.getBytes());
Thread.sleep(2000);
client.delete().forPath(path);
Thread.sleep(2000);
}
}
**注意: if (watchedEvent.getType() == EventType.NodeDataChanged)语句,创建时是不会触发watch的! **
说明:
测试
如何多次触发监听?
//永久监听
String pathNew = "/curatorNew";
client.create().withMode(CreateMode.EPHEMERAL).forPath(pathNew, data.getBytes());
NodeCache nodeCache = new NodeCache(client, pathNew);
nodeCache.start();
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
ChildData currentData = nodeCache.getCurrentData();
if (currentData != null) {
System.out.println("触发了永久监听的回调,当前值为:" + new String(currentData.getData()));
}
}
});
client.setData().forPath(pathNew, data2.getBytes());
Thread.sleep(2000);
client.setData().forPath(pathNew, data2.getBytes());
Thread.sleep(2000);
client.setData().forPath(pathNew, data2.getBytes());
Thread.sleep(2000);
client.delete().forPath(pathNew);
}
说明: