3、Zeekeeper客户端、Curator框架
文章标签
Zookeeper
一、Java客户端
zookeeper官方的客户端没有和服务端代码分离,他们为同一个jar 文件,所以我们直接引入zookeeper的maven即可, 这里版本请保持与服务端版本一致。
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.8</version> </dependency>
初始化
@Slf4j
public class ZookeeperClientTest {
private static final String ZK_ADDRESS="192.168.109.200:2181";
private static final int SESSION_TIMEOUT = 5000;
private static ZooKeeper zooKeeper;
private static final String ZK_NODE="/zk-node";
@Before
public void init() throws IOException, InterruptedException {
final CountDownLatch countDownLatch=new CountDownLatch(1);
zooKeeper=new ZooKeeper(ZK_ADDRESS, SESSION_TIMEOUT, event -> {
if (event.getState()== Watcher.Event.KeeperState.SyncConnected &&
event.getType()== Watcher.Event.EventType.None){
countDownLatch.countDown();
log.info("连接成功!");
}
});
log.info("连接中....");
countDownLatch.await();
}
}创建Zookeeper实例的方法列表:
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, ZKClientConfig) ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider) ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider, ZKClientConfig) ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, ZKClientConfig) ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long, byte[]) ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long, byte[], boolean, HostProvider) ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long, byte[], boolean, HostProvider, ZKClientConfig) ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long, byte[], boolean)
参数含义
connectString:ZooKeeper服务器列表,由英文逗号分开的host:port字符串组成,每一个都代表一台ZooKeeper机器。
如:host1:port1,host2:port2,host3:port3。
另外,也可以在connectString中设置客户端连接上ZooKeeper后的根目录,方法是在host:port字符串之后添加上这个根目录。、
例如:host1:port1,host2:port2,host3:port3/zk-base。
这样就指定了该客户端连接上ZooKeeper服务器之后,所有对ZooKeeper的操作,都会基于这个根目录。
sessionTimeout:会话的超时时间,是一个以“毫秒”为单位的整型值。
在ZooKeeper中有会话的概念,在一个会话周期内,ZooKeeper客户端和服务器之间会通过心跳检测机制来维持会话的有效性,一旦在sessionTimeout时间内没有进行有效的心跳检测,会话就会失效。
watcher:允许客户端在构造方法中传入一个接口watcher(org.apache. zookeeper.Watcher)的实现类对象来作为默认的 Watcher事件通知处理器。
该参数可以设置为null 以表明不需要设置默认的Watcher处理器。
canBeReadOnly:这是一个boolean类型的参数,用于标识当前会话是否支持“read-only(只读)”模式。
在ZooKeeper集群中,一个机器如果和集群中过半及以上机器失去了网络连接,那么这个机器将不再处理客户端请求(包括读写请求)。
但是在某些使用场景下,当ZooKeeper服务器发生此类故障的时候,我们还是希望ZooKeeper服务器能够提供读服务(当然写服务肯定无法提供)——这就是 ZooKeeper的“read-only”模式。
sessionId、sessionPasswd:分别代表会话ID和会话秘钥。
这两个参数能够唯一确定一个会话,同时客户端使用这两个参数可以实现客户端会话复用,从而达到恢复会话的效果。
具体使用方法是,第一次连接上ZooKeeper服务器时,通过调用ZooKeeper对象实例的以下两个接口,即可获得当前会话的ID和秘钥:
long getSessionId();
byte[] getSessionPasswd( );
荻取到这两个参数值之后,就可以在下次创建ZooKeeper对象实例的时候传入构造方法了。
1.1、创建同步节点
@Test
public void createTest() throws KeeperException, InterruptedException {
String path = zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
log.info("created path: {}",path);
}1.2、创建异步节点
@Test
public void createAsycTest() throws InterruptedException {
zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
(rc, path, ctx, name) -> log.info("rc {},path {},ctx {},name {}",rc,path,ctx,name),"context");
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}1.3、修改节点数据
@Test
public void setTest() throws KeeperException, InterruptedException {
Stat stat = new Stat();
byte[] data = zooKeeper.getData(ZK_NODE, false, stat);
log.info("修改前: {}",new String(data));
zooKeeper.setData(ZK_NODE, "changed!".getBytes(), stat.getVersion());
byte[] dataAfter = zooKeeper.getData(ZK_NODE, false, stat);
log.info("修改后: {}",new String(dataAfter));
}二、Curator框架
Curator是一套由netflix公司开源的,Java 语言编程的 ZooKeeper 客户端框架,Curator项目是现在ZooKeeper 客户端中使用最多,对ZooKeeper 版本支持最好的第三方客户端,并推荐使用。
Curator把我们平时常用的很多 ZooKeeper 服务开发功能做了封装,例如 Leader 选举、分布式计数器、分布式锁。
这就减少了技术人员在使用 ZooKeeper 时的大部分底层细节开发工作。在会话重新连接、Watch 反复注册、多种异常处理等使用场景中,用原生的 ZooKeeper处理比较复杂。而在使用 Curator时,由于其对这些功能都做了高度的封装,使用起来更加简单,不但减少了开发时间,而且增强了程序的可靠性。
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
如下面的代码所示,我们通过将 Curator 相关的引用包配置到Maven 工程的 pom 文件中,将 Curaotr 框架引用到工程项目里,在配置文件中分别引用了两个 Curator 相关的包,第一个是 curator-framework 包,该包是对 ZooKeeper 底层 API 的一些封装。另一个是 curator-recipes 包,该包封装了一些 ZooKeeper 服务的高级特性,如:Cache 事件监听、选举、分布式锁、分布式 Barrier。
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.0.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.8</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.0.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.8</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency>
2.1、会话创建
Curator 提供了多种方式创建会话,比如用静态工厂方式创建
// 重试策略,第一个参数代表两次连接的等待时间,第二个参数表示最大的尝试连接次数。 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3) CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); client.start();
fluent风格创建
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.128.129:2181")
.sessionTimeoutMs(5000) // 会话超时时间
.connectionTimeoutMs(5000) // 连接超时时间
.retryPolicy(retryPolicy)
.namespace("base") // 包含隔离名称
.build();
client.start();connectionString:服务器地址列表。
在指定服务器地址列表的时候可以是一个地址,也可以是多个地址。如果是多个地址,那么每个服务器地址列表用逗号分隔。
retryPolicy:重试策略。
当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 ZooKeeper 服务端。
而Curator提供了一次重试、多次重试等不同种类的实现方式。在Curator内部,可以通过判断服务器返回的keeperException的状态代码来判断是否进行重试处理,如果返回的是OK 表示一切操作都没有问题,而SYSTEMERROR表示系统或服务端错误。
sessionTimeoutMs、connectionTimeoutMs:超时时间。
Curator客户端创建过程中,有两个超时时间的设置。
sessionTimeoutMs会话超时时间,用来设置该条会话在ZooKeeper服务端的失效时间。
connectionTimeoutMs客户端创建会话的超时时间,用来限制客户端发起一个会话连接到接收ZooKeeper服务端应答的时间。
sessionTimeoutMs作用在服务端,而connectionTimeoutMs 作用在客户端。
2.2、创建节点
@Test
public void testCreate() throws Exception {
String path = curatorFramework.create().forPath("/curator-node");
// curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/curator-node","some-data".getBytes())
log.info("curator create node :{} successfully.",path);
}在Curator中,可以使用 create 函数创建数据节点,并通过withMode函数指定节点类型(持久化节点,临时节点,顺序节点,临时顺序节点,持久化顺序节点等),默认是持久化节点,之后调用 forPath 函数来指定节点的路径和数据信息。
一次性创建带层级结构的节点
@Test
public void testCreateWithParent() throws Exception {
String pathWithParent="/node-parent/sub-node-1";
String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);
log.info("curator create node :{} successfully.",path);
}2.3、获取数据
@Test
public void testGetData() throws Exception {
byte[] bytes = curatorFramework.getData().forPath("/curator-node");
log.info("get data from node :{} successfully.",new String(bytes));
}2.4、更新节点
@Test
public void testSetData() throws Exception {
curatorFramework.setData().forPath("/curator-node","changed!".getBytes());
byte[] bytes = curatorFramework.getData().forPath("/curator-node");
log.info("get data from node /curator-node :{} successfully.",new String(bytes));
}通过客户端实例的setData()方法更新ZooKeeper服务上的数据节点,在setData方法的后边,通过forPath函数来指定更新的数据节点路径以及要更新的数据。
2.5、删除节点
@Test
public void testDelete() throws Exception {
String pathWithParent="/node-parent";
curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);
}guaranteed:该函数的功能如字面意思一样,主要起到一个保障删除成功的作用,其底层工作方式是:只要该客户端的会话有效,就会在后台持续发起删除请求,直到该数据节点在 ZooKeeper 服务端被删除。
deletingChildrenIfNeeded:指定了该函数后,系统在删除该数据节点的时候会以递归的方式直接删除其子节点,以及子节点的子节点。
2.6、异步接口
Curator引入了BackgroundCallback接口,用来处理服务器端返回来的信息,这个处理过程是在异步线程中调用,默认在EventThread中调用,也可以自定义线程池。
public interface BackgroundCallback{
/**
* Called when the async background operation completes
*
* @param client the client client 客户端
* @param event operation result details 服务端事件event
* @throws Exception errors
*/
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
}例如:
@Test
public void test() throws Exception {
curatorFramework.getData().inBackground((item1, item2) -> {
log.info(" background: {}", item2);
}).forPath(ZK_NODE);
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}inBackground异步处理默认在EventThread中执行。
指定线程池来异步处理
@Test
public void test() throws Exception {
ExecutorService executorService = Executors.newSingleThreadExecutor();
curatorFramework.getData().inBackground((item1, item2) -> {
log.info(" background: {}", item2);
},executorService).forPath(ZK_NODE);
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}2.7、监听器
/**
* Receives notifications about errors and background events
*/
public interface CuratorListener{
/**
* Called when a background task has completed or a watch has triggered
*
* @param client client
* @param event the event
* @throws Exception any errors
*/
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
}针对background通知和错误通知。使用此监听器之后,调用inBackground方法会异步获得监听。
Curator Caches
Curator引入了Cache来实现对Zookeeper服务端事件监听。
Cache事件监听可以理解为一个本地缓存视图与远程Zookeeper视图的对比过程。Cache提供了反复注册的功能。Cache分为两类注册类型:节点监听和子节点监听。
node cache
NodeCache对某一个节点进行监听。
public NodeCache(CuratorFramework client, String path); //可以通过注册监听器来实现,对当前节点数据变化的处理。 public void addListener(NodeCacheListener listener);
@Slf4j
public class NodeCacheTest extends AbstractCuratorTest{
public static final String NODE_CACHE="/node-cache";
@Test
public void testNodeCacheTest() throws Exception {
createIfNeed(NODE_CACHE);
NodeCache nodeCache = new NodeCache(curatorFramework, NODE_CACHE);
//添加监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
log.info("{} path nodeChanged: ",NODE_CACHE);
//输出结果
byte[] bytes = curatorFramework.getData().forPath(NODE_CACHE);
log.info("data: {}",new String(bytes));
}
});
nodeCache.start();
}
}path cache
PathChildrenCache会对子节点进行监听,但是不会对二级子节点进行监听。
public PathChildrenCache(CuratorFramework client, String path,boolean cacheData);
@Slf4j
public class PathCacheTest extends AbstractCuratorTest{
public static final String PATH="/path-cache";
@Test
public void testPathCache() throws Exception {
createIfNeed(PATH);
PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, PATH, true);
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
log.info("event: {}",event);
}
});
// 如果设置为true则在首次启动时就会缓存节点内容到Cache中
pathChildrenCache.start(true);
}
}tree cache
TreeCache使用一个内部类TreeNode来维护这个一个树结构。并将这个树结构与ZK节点进行了映射。所以TreeCache可以监听当前节点下所有节点的事件。
public TreeCache(CuratorFramework client, String path, boolean cacheData); //也可以通过注册监听器来实现,对当前节点的子节点,及递归子节点数据变化的处理。 public void addListener(TreeCacheListener listener);
@Slf4j
public class TreeCacheTest extends AbstractCuratorTest{
public static final String TREE_CACHE="/tree-path";
@Test
public void testTreeCache() throws Exception {
createIfNeed(TREE_CACHE);
TreeCache treeCache = new TreeCache(curatorFramework, TREE_CACHE);
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
log.info(" tree cache: {}",event);
}
});
treeCache.start();
}
}版权声明
非特殊说明,本文由Zender原创或收集发布,欢迎转载。
上一篇:2、Zeekeeper基础 下一篇:4、Zookeeper集群模式
ZENDER
发表评论:
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。