Ⅵ:zookeeper的Watcher事件监听机制

网友投稿 934 2022-11-01

Ⅵ:zookeeper的Watcher事件监听机制

Ⅵ:zookeeper的Watcher事件监听机制

❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️

文章目录

​​前置:--》把握住Watcher流程《--​​​​1、watcher的连接状态判断​​​​2、watcher机制下的exists​​

​​Ⅰ、连接对象的-​​​​Ⅱ、自定义watcher​​​​Ⅲ、watcher的多次监听​​​​Ⅳ、多个watcher同时监听一个节点​​

​​3、watcher机制下的getData​​

​​Ⅰ、连接对象的-​​​​Ⅱ、自定义watcher-​​​​Ⅲ、多次watcher监听​​​​Ⅳ、多个watcher同时监听一个节点​​

​​4、watcher机制下的getChildren​​

​​Ⅰ、连接对象的监视器​​​​Ⅱ、自定义watcher-​​​​Ⅲ、多次watcher监听​​​​Ⅳ、多个watcher同时监听一个节点​​

xshell7连接云服务器演示结果,如果未知请看第一章

前置:–》把握住Watcher流程《–

1、连接zookeeper服务器 2、连接时必须使当前线程等待(等待其他线程创建连接zookeeper服务成功,使用计数器实现) 3、执行回调函数process 4、释放当前线程

1、watcher的连接状态判断

package com.zookeeper.watcher;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import java.io.IOException;import java.util.concurrent.CountDownLatch;/** * @author:抱着鱼睡觉的喵喵 * @date:2021/5/7 * @description: */public class WatcherConnection implements Watcher {//计数器,使当前线程等待其他线程完成 static CountDownLatch countDownLatch = new CountDownLatch(1); static ZooKeeper zooKeeper; public static void main(String[] args) { try { //连接zookeeper服务 zooKeeper = new ZooKeeper("8.140.37.103:2181", 5000, new WatcherConnection()); //使当前线程等待其他线程完成(其他线程也就是连接zookeeper服务的线程) countDownLatch.await(); Thread.sleep(1000); zooKeeper.close(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } //回调函数,进性状态的判断 @Override public void process(WatchedEvent watchedEvent) { try { if (watchedEvent.getType() == Event.EventType.None) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { System.out.println("连接成功!"); countDownLatch.countDown(); } else if (watchedEvent.getState() == Event.KeeperState.Disconnected) { System.out.println("断开连接"); } else if (watchedEvent.getState() == Event.KeeperState.Expired) { System.out.println("超时了"); } else if (watchedEvent.getState() == Event.KeeperState.AuthFailed) { System.out.println("认证失败!"); } } } catch (Exception e) { e.printStackTrace(); } }}

2、watcher机制下的exists

Ⅰ、连接对象的-

public class WatcherExistsTest { private String IP = "8.140.37.103:2181"; private ZooKeeper zookeeper; @Before public void connection() throws IOException, InterruptedException { //计数器对象,使当前线程等待其他线程的完成 final CountDownLatch downLatch = new CountDownLatch(1); zookeeper = new ZooKeeper(IP, 5000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { //判断是否连接成功 if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { //使CountDownLatch减到0(初始为1),其他线程可以继续执行(该处应该是主线程可以继续执行了) downLatch.countDown(); System.out.println("连接成功!"); } System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); } }); //主线程进入等待态 downLatch.await(); } @Test public void watcherExists() throws KeeperException, InterruptedException { //第一个参数是节点路径 //第二个参数为Boolean类型,true代表监听path下的节点,false表示不进行监听 zookeeper.exists("/exists", true); Thread.sleep(10000); } @After public void close() { try { zookeeper.close(); } catch (InterruptedException e) { e.printStackTrace(); } }}

此时在zookeeper客户端创建/exists节点

IDEA控制台就会出现NodeCreated​​在这里插入代码片​​

Ⅱ、自定义watcher

public class WatcherExistsTest { private String IP = "8.140.37.103:2181"; private ZooKeeper zookeeper; @Before public void connection() throws IOException, InterruptedException { //计数器对象,使当前线程等待其他线程的完成 final CountDownLatch downLatch = new CountDownLatch(1); zookeeper = new ZooKeeper(IP, 6000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { //使CountDownLatch减到0(初始为1),其他线程可以继续执行(该处应该是主线程可以继续执行了) downLatch.countDown(); } } }); //主线程进入等待态 downLatch.await(); } @Test public void watcherExists2() throws KeeperException, InterruptedException { zookeeper.exists("/exists2", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("自定义watcher!"); System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); } }); Thread.sleep(10000); System.out.println("--------------"); } @After public void close() { try { zookeeper.close(); } catch (InterruptedException e) { e.printStackTrace(); } }}

执行@Test注解方法-》客户端创建/exists2节点-》IDEA控制台查看结果

当我修改/exists2节点的数据时,控制台出现了NodeDataChanged

Ⅲ、watcher的多次监听

本质上只能进性一次注册,一次监听;当然可以利用循环调用进行生命周期内的多次监听

@Test public void watcherExists2() throws KeeperException, InterruptedException { zookeeper.exists("/exists2", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { try { System.out.println("自定义watcher!"); System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); zookeeper.exists("/exists2", this); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread.sleep(10000); System.out.println("--------------"); }

Ⅳ、多个watcher同时监听一个节点

一般来说这种多个监听对象才比较符合发布-订阅模式,当节点中的数据发生变化时,会通知所有的监听对象。

@Test public void watcherExists3() throws KeeperException, InterruptedException { System.out.println("============================"); zookeeper.exists("/exists3", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("监听对象1"); System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); } }); zookeeper.exists("/exists3", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("监听对象2"); System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); } }); zookeeper.exists("/exists3", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("监听对象3"); System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); } }); Thread.sleep(10000); System.out.println("=========================="); }

3、watcher机制下的getData

getData(String path, boolean b, Stat stat)连接对象的- getData(String path, watcher watcher, Stat stat) 自定义的-

Ⅰ、连接对象的-

public class WatcherGetDataTest { static CountDownLatch countDownLatch = new CountDownLatch(1); static ZooKeeper zooKeeper; final String IP = "8.140.37.103:2181"; @Before public void before() throws IOException, InterruptedException { zooKeeper = new ZooKeeper(IP, 5000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { System.out.println("================="); countDownLatch.countDown(); } System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); } }); countDownLatch.await(); } @Test public void test() throws KeeperException, InterruptedException { zooKeeper.getData("/data",true, null); Thread.sleep(10000); System.out.println("======================="); } @After public void after() throws InterruptedException { zooKeeper.close(); }}

启动测试-》修改data节点的数据-》查看idea控制台结果

Ⅱ、自定义watcher-

@Test public void test2() throws KeeperException, InterruptedException { System.out.println("========================"); zooKeeper.getData("/data", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); } }, null); Thread.sleep(10000); System.out.println("============================"); }

Ⅲ、多次watcher监听

@Test public void test3() throws KeeperException, InterruptedException { System.out.println("========================="); Watcher watcher = new Watcher() { @Override public void process(WatchedEvent watchedEvent) { try { System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); zooKeeper.getData("/data", this, null); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }; zooKeeper.getData("/data", watcher, null); Thread.sleep(5000); System.out.println("======================="); }

Ⅳ、多个watcher同时监听一个节点

@Test public void test4() throws KeeperException, InterruptedException { System.out.println("======================="); zooKeeper.getData("/data", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("监听对象1"); System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); } }, null); zooKeeper.getData("/data", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("监听对象2"); System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); } }, null); zooKeeper.getData("/data", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("监听对象3"); System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); } }, null); Thread.sleep(5000); System.out.println("========================"); } @After public void after() throws InterruptedException { zooKeeper.close(); }

4、watcher机制下的getChildren

getChildren(String path, boolean b) //使用连接对象的监视器 getChildren(String path, watcher w) //自定义监视器 子节点的修改不会被监测到

Ⅰ、连接对象的监视器

public class WatcherGetChildrenTest { static CountDownLatch countDownLatch = new CountDownLatch(1); static ZooKeeper zooKeeper; final String IP = "8.140.37.103:2181"; @Before public void before() throws IOException, InterruptedException { zooKeeper = new ZooKeeper(IP, 5000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { System.out.println("================="); countDownLatch.countDown(); } System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); } }); countDownLatch.await(); } @Test public void test() throws KeeperException, InterruptedException { zooKeeper.getChildren("/data", true); Thread.sleep(5000); } @After public void after() throws InterruptedException { zooKeeper.close(); }}

Ⅱ、自定义watcher-

@Test public void test2() throws KeeperException, InterruptedException { zooKeeper.getChildren("/data", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("=================="); System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); } }); Thread.sleep(10000); System.out.println("===================="); }

Ⅲ、多次watcher监听

@Test public void test3() throws KeeperException, InterruptedException { zooKeeper.getChildren("/data", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("================"); if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) { try { System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); zooKeeper.getChildren("/data", this); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } }); Thread.sleep(5000); }

Ⅳ、多个watcher同时监听一个节点

@Test public void test4() throws KeeperException, InterruptedException { System.out.println("=================================="); zooKeeper.getChildren("/data", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("监听对象1"); System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getType()); } }); zooKeeper.getChildren("/data", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("监听对象2"); System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); } }); zooKeeper.getChildren("/data", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("监听对象3"); System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); } }); Thread.sleep(5000); System.out.println("================================"); } @After public void after() throws InterruptedException { zooKeeper.close(); }

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Vivid: 为 Apple Core Image 框架提供的一系列效果和工具
下一篇:CNStream是用于构建Cambricon机器学习管道的流式框架
相关文章

 发表评论

暂时没有评论,来抢沙发吧~