微前端架构如何改变企业的开发模式与效率提升
934
2022-11-01
Ⅵ:zookeeper的Watcher事件监听机制
❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️
文章目录
前置:--》把握住Watcher流程《--1、watcher的连接状态判断2、watcher机制下的exists
Ⅰ、连接对象的-Ⅱ、自定义watcherⅢ、watcher的多次监听Ⅳ、多个watcher同时监听一个节点
3、watcher机制下的getData
Ⅰ、连接对象的-Ⅱ、自定义watcher-Ⅲ、多次watcher监听Ⅳ、多个watcher同时监听一个节点
4、watcher机制下的getChildren
Ⅰ、连接对象的监视器Ⅱ、自定义watcher-Ⅲ、多次watcher监听Ⅳ、多个watcher同时监听一个节点
xshell7连接云服务器演示结果,如果未知请看第一章
前置:–》把握住Watcher流程《–
1、连接zookeeper服务器 2、连接时必须使当前线程等待(等待其他线程创建连接zookeeper服务成功,使用计数器实现) 3、执行回调函数process 4、释放当前线程
1、watcher的连接状态判断
package com.zookeeper.watcher;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import java.io.IOException;import java.util.concurrent.CountDownLatch;/** * @author:抱着鱼睡觉的喵喵 * @date:2021/5/7 * @description: */public class WatcherConnection implements Watcher {//计数器,使当前线程等待其他线程完成 static CountDownLatch countDownLatch = new CountDownLatch(1); static ZooKeeper zooKeeper; public static void main(String[] args) { try { //连接zookeeper服务 zooKeeper = new ZooKeeper("8.140.37.103:2181", 5000, new WatcherConnection()); //使当前线程等待其他线程完成(其他线程也就是连接zookeeper服务的线程) countDownLatch.await(); Thread.sleep(1000); zooKeeper.close(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } //回调函数,进性状态的判断 @Override public void process(WatchedEvent watchedEvent) { try { if (watchedEvent.getType() == Event.EventType.None) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { System.out.println("连接成功!"); countDownLatch.countDown(); } else if (watchedEvent.getState() == Event.KeeperState.Disconnected) { System.out.println("断开连接"); } else if (watchedEvent.getState() == Event.KeeperState.Expired) { System.out.println("超时了"); } else if (watchedEvent.getState() == Event.KeeperState.AuthFailed) { System.out.println("认证失败!"); } } } catch (Exception e) { e.printStackTrace(); } }}
2、watcher机制下的exists
Ⅰ、连接对象的-
public class WatcherExistsTest { private String IP = "8.140.37.103:2181"; private ZooKeeper zookeeper; @Before public void connection() throws IOException, InterruptedException { //计数器对象,使当前线程等待其他线程的完成 final CountDownLatch downLatch = new CountDownLatch(1); zookeeper = new ZooKeeper(IP, 5000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { //判断是否连接成功 if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { //使CountDownLatch减到0(初始为1),其他线程可以继续执行(该处应该是主线程可以继续执行了) downLatch.countDown(); System.out.println("连接成功!"); } System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); } }); //主线程进入等待态 downLatch.await(); } @Test public void watcherExists() throws KeeperException, InterruptedException { //第一个参数是节点路径 //第二个参数为Boolean类型,true代表监听path下的节点,false表示不进行监听 zookeeper.exists("/exists", true); Thread.sleep(10000); } @After public void close() { try { zookeeper.close(); } catch (InterruptedException e) { e.printStackTrace(); } }}
此时在zookeeper客户端创建/exists节点
IDEA控制台就会出现NodeCreated在这里插入代码片
Ⅱ、自定义watcher
public class WatcherExistsTest { private String IP = "8.140.37.103:2181"; private ZooKeeper zookeeper; @Before public void connection() throws IOException, InterruptedException { //计数器对象,使当前线程等待其他线程的完成 final CountDownLatch downLatch = new CountDownLatch(1); zookeeper = new ZooKeeper(IP, 6000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { //使CountDownLatch减到0(初始为1),其他线程可以继续执行(该处应该是主线程可以继续执行了) downLatch.countDown(); } } }); //主线程进入等待态 downLatch.await(); } @Test public void watcherExists2() throws KeeperException, InterruptedException { zookeeper.exists("/exists2", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("自定义watcher!"); System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); } }); Thread.sleep(10000); System.out.println("--------------"); } @After public void close() { try { zookeeper.close(); } catch (InterruptedException e) { e.printStackTrace(); } }}
执行@Test注解方法-》客户端创建/exists2节点-》IDEA控制台查看结果
当我修改/exists2节点的数据时,控制台出现了NodeDataChanged
Ⅲ、watcher的多次监听
本质上只能进性一次注册,一次监听;当然可以利用循环调用进行生命周期内的多次监听
@Test public void watcherExists2() throws KeeperException, InterruptedException { zookeeper.exists("/exists2", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { try { System.out.println("自定义watcher!"); System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); zookeeper.exists("/exists2", this); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread.sleep(10000); System.out.println("--------------"); }
Ⅳ、多个watcher同时监听一个节点
一般来说这种多个监听对象才比较符合发布-订阅模式,当节点中的数据发生变化时,会通知所有的监听对象。
@Test public void watcherExists3() throws KeeperException, InterruptedException { System.out.println("============================"); zookeeper.exists("/exists3", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("监听对象1"); System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); } }); zookeeper.exists("/exists3", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("监听对象2"); System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); } }); zookeeper.exists("/exists3", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("监听对象3"); System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); } }); Thread.sleep(10000); System.out.println("=========================="); }
3、watcher机制下的getData
getData(String path, boolean b, Stat stat)连接对象的- getData(String path, watcher watcher, Stat stat) 自定义的-
Ⅰ、连接对象的-
public class WatcherGetDataTest { static CountDownLatch countDownLatch = new CountDownLatch(1); static ZooKeeper zooKeeper; final String IP = "8.140.37.103:2181"; @Before public void before() throws IOException, InterruptedException { zooKeeper = new ZooKeeper(IP, 5000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { System.out.println("================="); countDownLatch.countDown(); } System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); } }); countDownLatch.await(); } @Test public void test() throws KeeperException, InterruptedException { zooKeeper.getData("/data",true, null); Thread.sleep(10000); System.out.println("======================="); } @After public void after() throws InterruptedException { zooKeeper.close(); }}
启动测试-》修改data节点的数据-》查看idea控制台结果
Ⅱ、自定义watcher-
@Test public void test2() throws KeeperException, InterruptedException { System.out.println("========================"); zooKeeper.getData("/data", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); } }, null); Thread.sleep(10000); System.out.println("============================"); }
Ⅲ、多次watcher监听
@Test public void test3() throws KeeperException, InterruptedException { System.out.println("========================="); Watcher watcher = new Watcher() { @Override public void process(WatchedEvent watchedEvent) { try { System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); zooKeeper.getData("/data", this, null); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }; zooKeeper.getData("/data", watcher, null); Thread.sleep(5000); System.out.println("======================="); }
Ⅳ、多个watcher同时监听一个节点
@Test public void test4() throws KeeperException, InterruptedException { System.out.println("======================="); zooKeeper.getData("/data", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("监听对象1"); System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); } }, null); zooKeeper.getData("/data", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("监听对象2"); System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); } }, null); zooKeeper.getData("/data", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("监听对象3"); System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); } }, null); Thread.sleep(5000); System.out.println("========================"); } @After public void after() throws InterruptedException { zooKeeper.close(); }
4、watcher机制下的getChildren
getChildren(String path, boolean b) //使用连接对象的监视器 getChildren(String path, watcher w) //自定义监视器 子节点的修改不会被监测到
Ⅰ、连接对象的监视器
public class WatcherGetChildrenTest { static CountDownLatch countDownLatch = new CountDownLatch(1); static ZooKeeper zooKeeper; final String IP = "8.140.37.103:2181"; @Before public void before() throws IOException, InterruptedException { zooKeeper = new ZooKeeper(IP, 5000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { System.out.println("================="); countDownLatch.countDown(); } System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); } }); countDownLatch.await(); } @Test public void test() throws KeeperException, InterruptedException { zooKeeper.getChildren("/data", true); Thread.sleep(5000); } @After public void after() throws InterruptedException { zooKeeper.close(); }}
Ⅱ、自定义watcher-
@Test public void test2() throws KeeperException, InterruptedException { zooKeeper.getChildren("/data", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("=================="); System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); } }); Thread.sleep(10000); System.out.println("===================="); }
Ⅲ、多次watcher监听
@Test public void test3() throws KeeperException, InterruptedException { zooKeeper.getChildren("/data", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("================"); if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) { try { System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); zooKeeper.getChildren("/data", this); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } }); Thread.sleep(5000); }
Ⅳ、多个watcher同时监听一个节点
@Test public void test4() throws KeeperException, InterruptedException { System.out.println("=================================="); zooKeeper.getChildren("/data", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("监听对象1"); System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getType()); } }); zooKeeper.getChildren("/data", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("监听对象2"); System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); } }); zooKeeper.getChildren("/data", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("监听对象3"); System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); } }); Thread.sleep(5000); System.out.println("================================"); } @After public void after() throws InterruptedException { zooKeeper.close(); }
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~