黑马程序员技术交流社区

标题: 【成都校区*精品*zookeeper一文精通】 [打印本页]

作者: 小蜀哥哥    时间: 2019-3-1 15:05
标题: 【成都校区*精品*zookeeper一文精通】
本帖最后由 小蜀哥哥 于 2019-3-1 15:10 编辑

zookeeper一文精通

一、zookeeper简介
二、Zookeeper的数据模型
Zookeeper的数据模型是什么样子呢?它很像数据结构当中的树,也很像文件系统的目录。

树是由节点所组成,Zookeeper的数据存储也同样是基于节点,这种节点叫做Znode。
但是,不同于树的节点,Znode的引用方式是路径引用,类似于文件路径:
       / 动物 / 仓鼠
       / 植物 / 荷花
      这样的层级结构,让每一个Znode节点拥有唯一的路径,就像命名空间一样对不同信息作出清晰的隔离。

Znode节点中包含以下数据:
这里需要注意一点,Zookeeper是为读多写少的场景所设计。Znode并不是用来存储大规模业务数据,而是用于存储少量的状态和配置信息,每个节点的数据最大不能超过1MB。
三、Zookeeper的基本操作和事件通知
基本操作:  
这其中,exists,getData,getChildren属于读操作。Zookeeper客户端在请求读操作的时候,可以选择是否设置Watch。我们可以理解成是注册在特定Znode上的触发器。当这个Znode发生改变,也就是调用了create,delete,setData方法的时候,将会触发Znode上注册的对应事件,请求Watch的客户端会接收到异步通知。
具体交互过程如下:
1.客户端调用getData方法,watch参数是true。服务端接到请求,返回节点数据,并且在对应的哈希表里插入被Watch的Znode路径,以及Watcher列表。

2.当被Watch的Znode已删除,服务端会查找哈希表,找到该Znode对应的所有Watcher,异步通知客户端,并且删除哈希表中对应的Key-Value。
四、java客户端操作Zookeeper
[XML] 纯文本查看 复制代码
<dependency>
        <groupId>org.apache.zookeeper</groupId>
          <artifactId>zookeeper</artifactId>
          <version>3.4.6</version>
</dependency>



[Java] 纯文本查看 复制代码
public class ZookeeperBase {
                    static final String CONNECT_ADDR = "192.168.2.2:2181";
                    /** session超时时间 */
                    static final int SESSION_OUTTIME = 2000;// ms
                    /** 信号量,阻塞程序执行,用于等待zookeeper连接成功,发送成功信号 */
                    static final CountDownLatch connectedSemaphore = new CountDownLatch(1);
                    
                    public static void main(String[] args) throws Exception {
         
                        ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME,
                                new Watcher() {
                                    @Override
                                    public void process(WatchedEvent event) {
                                        // 获取事件的状态
                                        KeeperState keeperState = event.getState();
                                        EventType eventType = event.getType();
                                        // 如果是建立连接
                                        if (KeeperState.SyncConnected == keeperState) {
                                            if (EventType.None == eventType) {
                                                // 如果建立连接成功,则发送信号量,让后续阻塞程序向下执行
                                                System.out.println("zk 建立连接");
                                                connectedSemaphore.countDown();
                                            }
                                        }
                                    }
                                });
                    
                        // 进行阻塞
                        connectedSemaphore.await();
                    
                        System.out.println("..");
                        // 创建父节点
                        // zk.create("/testRoot", "testRoot".getBytes(), Ids.OPEN_ACL_UNSAFE,
                        // CreateMode.PERSISTENT);
                    
                        // 创建子节点
                        // zk.create("/testRoot/children", "children data".getBytes(),
                        // Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    
                        // 获取节点洗信息
                        // byte[] data = zk.getData("/testRoot", false, null);
                        // System.out.println(new String(data));
                        // System.out.println(zk.getChildren("/testRoot", false));
                    
                        // 修改节点的值
                        // zk.setData("/testRoot", "modify data root".getBytes(), -1);
                        // byte[] data = zk.getData("/testRoot", false, null);
                        // System.out.println(new String(data));
                    
                        // 判断节点是否存在
                        // System.out.println(zk.exists("/testRoot/children", false));
                        // 删除节点
                        // zk.delete("/testRoot/children", -1);
                        // System.out.println(zk.exists("/testRoot/children", false));
                    
                        zk.close();
                    
                            }
                    
                  }


五、zookeeper核心原理-Watcher、ZK状态、事件类型、权限
zookeeper有watch事件,是一次性触发的,当watch监视的数据发生变化时,通知设置了该watch的client,即watcher
watcher的特性:一次性、客户端串行执行、轻量
操作代码如下
[Java] 纯文本查看 复制代码
public class ZooKeeperWatcher implements Watcher {
                   /** 定义原子变量 */
                    AtomicInteger seq = new AtomicInteger();
                    /** 定义session失效时间 */
                    private static final int SESSION_TIMEOUT = 10000;
                    /** zookeeper服务器地址 */
                    private static final String CONNECTION_ADDR = "192.168.80.88:2181";
                    /** zk父路径设置 */
                    private static final String PARENT_PATH = "/testWatch";
                    /** zk子路径设置 */
                    private static final String CHILDREN_PATH = "/testWatch/children";
                    /** 进入标识 */
                    private static final String LOG_PREFIX_OF_MAIN = "【Main】";
                    /** zk变量 */
                    private ZooKeeper zk = null;
                    /** 信号量设置,用于等待zookeeper连接建立之后 通知阻塞程序继续向下执行 */
                    private CountDownLatch connectedSemaphore = new CountDownLatch(1);
               
                    /**
                     * 创建ZK连接
                     * @param connectAddr ZK服务器地址列表
                     * @param sessionTimeout Session超时时间
                     */
                    public void createConnection(String connectAddr, int sessionTimeout) {
                        this.releaseConnection();
                        try {
                            zk = new ZooKeeper(connectAddr, sessionTimeout, this);
                            System.out.println(LOG_PREFIX_OF_MAIN + "开始连接ZK服务器");
                            connectedSemaphore.await();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
               
                    /**
                     * 关闭ZK连接
                     */
                    public void releaseConnection() {
                        if (this.zk != null) {
                            try {
                                this.zk.close();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
               
                    /**
                     * 创建节点
                     * @param path 节点路径
                     * @param data 数据内容
                     * @return
                     */
                    public boolean createPath(String path, String data) {
                        try {
                            //设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)
                            this.zk.exists(path, true);
                            System.out.println(LOG_PREFIX_OF_MAIN + "节点创建成功, Path: " +
                                               this.zk.create(    /**路径*/
                                                                   path,
                                                                   /**数据*/
                                                                   data.getBytes(),
                                                                   /**所有可见*/
                                                                   Ids.OPEN_ACL_UNSAFE,
                                                                   /**永久存储*/
                                                                   CreateMode.PERSISTENT ) +     
                                               ", content: " + data);
                        } catch (Exception e) {
                            e.printStackTrace();
                            return false;
                        }
                        return true;
                    }
               
                    /**
                     * 读取指定节点数据内容
                     * @param path 节点路径
                     * @return
                     */
                    public String readData(String path, boolean needWatch) {
                        try {
                            return new String(this.zk.getData(path, needWatch, null));
                        } catch (Exception e) {
                            e.printStackTrace();
                            return "";
                        }
                    }
               
                    /**
                     * 更新指定节点数据内容
                     * @param path 节点路径
                     * @param data 数据内容
                     * @return
                     */
                    public boolean writeData(String path, String data) {
                        try {
                            System.out.println(LOG_PREFIX_OF_MAIN + "更新数据成功,path:" + path + ", stat: " +
                                                this.zk.setData(path, data.getBytes(), -1));
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        return false;
                    }
               
                    /**
                     * 删除指定节点
                     *
                     * @param path
                     *            节点path
                     */
                    public void deleteNode(String path) {
                        try {
                            this.zk.delete(path, -1);
                            System.out.println(LOG_PREFIX_OF_MAIN + "删除节点成功,path:" + path);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
               
                    /**
                     * 判断指定节点是否存在
                     * @param path 节点路径
                     */
                    public Stat exists(String path, boolean needWatch) {
                        try {
                            return this.zk.exists(path, needWatch);
                        } catch (Exception e) {
                            e.printStackTrace();
                            return null;
                        }
                    }
               
                    /**
                     * 获取子节点
                     * @param path 节点路径
                     */
                    private List<String> getChildren(String path, boolean needWatch) {
                        try {
                            return this.zk.getChildren(path, needWatch);
                        } catch (Exception e) {
                            e.printStackTrace();
                            return null;
                        }
                    }
               
                    /**
                     * 删除所有节点
                     */
                    public void deleteAllTestPath() {
                        if(this.exists(CHILDREN_PATH, false) != null){
                            this.deleteNode(CHILDREN_PATH);
                        }
                        if(this.exists(PARENT_PATH, false) != null){
                            this.deleteNode(PARENT_PATH);
                        }        
                    }
                    
                    /**
                     * 收到来自Server的Watcher通知后的处理。
                     */
                    @Override
                    public void process(WatchedEvent event) {
                        
                        System.out.println("进入 process 。。。。。event = " + event);
                        
                        try {
                            Thread.sleep(200);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        
                        if (event == null) {
                            return;
                        }
                        
                        // 连接状态
                        KeeperState keeperState = event.getState();
                        // 事件类型
                        EventType eventType = event.getType();
                        // 受影响的path
                        String path = event.getPath();
                        
                        String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";
               
                        System.out.println(logPrefix + "收到Watcher通知");
                        System.out.println(logPrefix + "连接状态:\t" + keeperState.toString());
                        System.out.println(logPrefix + "事件类型:\t" + eventType.toString());
               
                        if (KeeperState.SyncConnected == keeperState) {
                            // 成功连接上ZK服务器
                            if (EventType.None == eventType) {
                                System.out.println(logPrefix + "成功连接上ZK服务器");
                                connectedSemaphore.countDown();
                            }
                            //创建节点
                            else if (EventType.NodeCreated == eventType) {
                                System.out.println(logPrefix + "节点创建");
                                try {
                                    Thread.sleep(100);
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                                this.exists(path, true);
                            }
                            //更新节点
                            else if (EventType.NodeDataChanged == eventType) {
                                System.out.println(logPrefix + "节点数据更新");
                                System.out.println("我看看走不走这里........");
                                try {
                                    Thread.sleep(100);
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                                System.out.println(logPrefix + "数据内容: " + this.readData(PARENT_PATH, true));
                            }
                            //更新子节点
                            else if (EventType.NodeChildrenChanged == eventType) {
                                System.out.println(logPrefix + "子节点变更");
                                try {
                                    Thread.sleep(3000);
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                                System.out.println(logPrefix + "子节点列表:" + this.getChildren(PARENT_PATH, true));
                            }
                            //删除节点
                            else if (EventType.NodeDeleted == eventType) {
                                System.out.println(logPrefix + "节点 " + path + " 被删除");
                            }
                            else ;
                        }
                        else if (KeeperState.Disconnected == keeperState) {
                            System.out.println(logPrefix + "与ZK服务器断开连接");
                        }
                        else if (KeeperState.AuthFailed == keeperState) {
                            System.out.println(logPrefix + "权限检查失败");
                        }
                        else if (KeeperState.Expired == keeperState) {
                            System.out.println(logPrefix + "会话失效");
                        }
                        else ;
               
                        System.out.println("--------------------------------------------");
               
                    }
               
                    /**
                     * <B>方法名称:</B>测试zookeeper监控<BR>
                     * <B>概要说明:</B>主要测试watch功能<BR>
                     * @param args
                     * @throws Exception
                     */
                    public static void main(String[] args) throws Exception {
               
                        //建立watcher
                        ZooKeeperWatcher zkWatch = new ZooKeeperWatcher();
                        //创建连接
                        zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);
                        //System.out.println(zkWatch.zk.toString());
                        
                        Thread.sleep(1000);
                        
                        // 清理节点
                        zkWatch.deleteAllTestPath();
                        
                        if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "")) {
                           
                            Thread.sleep(1000);


                            // 读取数据
                            System.out.println("---------------------- read parent ----------------------------");
                            //zkWatch.readData(PARENT_PATH, true);
                           
                            // 读取子节点
                            System.out.println("---------------------- read children path ----------------------------");
                            zkWatch.getChildren(PARENT_PATH, true);
               
                            // 更新数据
                            zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");
                           
                            Thread.sleep(1000);
                           
                            // 创建子节点
                            zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "");
                           
                            Thread.sleep(1000);
                           
                            zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");
                        }
                        
                        Thread.sleep(50000);
                        // 清理节点
                        zkWatch.deleteAllTestPath();
                        Thread.sleep(1000);
                        zkWatch.releaseConnection();
                    }
        
                }

zookeeper的ACL(AUTH):
ACL(Access Control ListZookeeper)提供一套完善的ACL权限控制机制来保障数据的安全ZK提供了三种模式。权限模式,授权对象,权限。
  CREATE,DELLETE,READ,WRITE,ADMIN
操作代码如下:
[AppleScript] 纯文本查看 复制代码
public class ZookeeperAuth implements Watcher {
                /** 连接地址 */
                final static String CONNECT_ADDR = "192.168.80.88:2181";
                /** 测试路径 */
                final static String PATH = "/testAuth";
                final static String PATH_DEL = "/testAuth/delNode";
                /** 认证类型 */
                final static String authentication_type = "digest";
                /** 认证正确方法 */
                final static String correctAuthentication = "123456";
                /** 认证错误方法 */
                final static String badAuthentication = "654321";
               
                static ZooKeeper zk = null;
                /** 计时器 */
                AtomicInteger seq = new AtomicInteger();
                /** 标识 */
                private static final String LOG_PREFIX_OF_MAIN = "【Main】";
               
                private CountDownLatch connectedSemaphore = new CountDownLatch(1);
               
                @Override
                public void process(WatchedEvent event) {
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (event==null) {
                        return;
                    }
                    // 连接状态
                    KeeperState keeperState = event.getState();
                    // 事件类型
                    EventType eventType = event.getType();
                    // 受影响的path
                    String path = event.getPath();
                    
                    String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";
            
                    System.out.println(logPrefix + "收到Watcher通知");
                    System.out.println(logPrefix + "连接状态:\t" + keeperState.toString());
                    System.out.println(logPrefix + "事件类型:\t" + eventType.toString());
                    if (KeeperState.SyncConnected == keeperState) {
                        // 成功连接上ZK服务器
                        if (EventType.None == eventType) {
                            System.out.println(logPrefix + "成功连接上ZK服务器");
                            connectedSemaphore.countDown();
                        }
                    } else if (KeeperState.Disconnected == keeperState) {
                        System.out.println(logPrefix + "与ZK服务器断开连接");
                    } else if (KeeperState.AuthFailed == keeperState) {
                        System.out.println(logPrefix + "权限检查失败");
                    } else if (KeeperState.Expired == keeperState) {
                        System.out.println(logPrefix + "会话失效");
                    }
                    System.out.println("--------------------------------------------");
                }
                /**
                 * 创建ZK连接
                 *
                 * @param connectString
                 *            ZK服务器地址列表
                 * @param sessionTimeout
                 *            Session超时时间
                 */
                public void createConnection(String connectString, int sessionTimeout) {
                    this.releaseConnection();
                    try {
                        zk = new ZooKeeper(connectString, sessionTimeout, this);
                        //添加节点授权
                        zk.addAuthInfo(authentication_type,correctAuthentication.getBytes());
                        System.out.println(LOG_PREFIX_OF_MAIN + "开始连接ZK服务器");
                        //倒数等待
                        connectedSemaphore.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
               
                /**
                 * 关闭ZK连接
                 */
                public void releaseConnection() {
                    if (this.zk!=null) {
                        try {
                            this.zk.close();
                        } catch (InterruptedException e) {
                        }
                    }
                }
               
                /**
                 *
                 * <B>方法名称:</B>测试函数<BR>
                 * <B>概要说明:</B>测试认证<BR>
                 * @param args
                 * @throws Exception
                 */
                public static void main(String[] args) throws Exception {
                    
                    ZookeeperAuth testAuth = new ZookeeperAuth();
                    testAuth.createConnection(CONNECT_ADDR,2000);
                    List<ACL> acls = new ArrayList<ACL>(1);
                    for (ACL ids_acl : Ids.CREATOR_ALL_ACL) {
                        acls.add(ids_acl);
                    }
            
                    try {
                        zk.create(PATH, "init content".getBytes(), acls, CreateMode.PERSISTENT);
                        System.out.println("使用授权key:" + correctAuthentication + "创建节点:"+ PATH + ", 初始内容是: init content");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    try {
                        zk.create(PATH_DEL, "will be deleted! ".getBytes(), acls, CreateMode.PERSISTENT);
                        System.out.println("使用授权key:" + correctAuthentication + "创建节点:"+ PATH_DEL + ", 初始内容是: init content");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
            
                    // 获取数据
                    getDataByNoAuthentication();
                    getDataByBadAuthentication();
                    getDataByCorrectAuthentication();
            
                    // 更新数据
                    updateDataByNoAuthentication();
                    updateDataByBadAuthentication();
                    updateDataByCorrectAuthentication();
            
                    // 删除数据
                    deleteNodeByBadAuthentication();
                    deleteNodeByNoAuthentication();
                    deleteNodeByCorrectAuthentication();
                    //
                    Thread.sleep(1000);
                    
                    deleteParent();
                    //释放连接
                    testAuth.releaseConnection();
                }
                /** 获取数据:采用错误的密码 */
                static void getDataByBadAuthentication() {
                    String prefix = "[使用错误的授权信息]";
                    try {
                        ZooKeeper badzk = new ZooKeeper(CONNECT_ADDR, 2000, null);
                        //授权
                        badzk.addAuthInfo(authentication_type,badAuthentication.getBytes());
                        Thread.sleep(2000);
                        System.out.println(prefix + "获取数据:" + PATH);
                        System.out.println(prefix + "成功获取数据:" + badzk.getData(PATH, false, null));
                    } catch (Exception e) {
                        System.err.println(prefix + "获取数据失败,原因:" + e.getMessage());
                    }
                }
            
                /** 获取数据:不采用密码 */
                static void getDataByNoAuthentication() {
                    String prefix = "[不使用任何授权信息]";
                    try {
                        System.out.println(prefix + "获取数据:" + PATH);
                        ZooKeeper nozk = new ZooKeeper(CONNECT_ADDR, 2000, null);
                        Thread.sleep(2000);
                        System.out.println(prefix + "成功获取数据:" + nozk.getData(PATH, false, null));
                    } catch (Exception e) {
                        System.err.println(prefix + "获取数据失败,原因:" + e.getMessage());
                    }
                }
            
                /** 采用正确的密码 */
                static void getDataByCorrectAuthentication() {
                    String prefix = "[使用正确的授权信息]";
                    try {
                        System.out.println(prefix + "获取数据:" + PATH);
                        
                        System.out.println(prefix + "成功获取数据:" + zk.getData(PATH, false, null));
                    } catch (Exception e) {
                        System.out.println(prefix + "获取数据失败,原因:" + e.getMessage());
                    }
                }
            
                /**
                 * 更新数据:不采用密码
                 */
                static void updateDataByNoAuthentication() {
            
                    String prefix = "[不使用任何授权信息]";
            
                    System.out.println(prefix + "更新数据: " + PATH);
                    try {
                        ZooKeeper nozk = new ZooKeeper(CONNECT_ADDR, 2000, null);
                        Thread.sleep(2000);
                        Stat stat = nozk.exists(PATH, false);
                        if (stat!=null) {
                            nozk.setData(PATH, prefix.getBytes(), -1);
                            System.out.println(prefix + "更新成功");
                        }
                    } catch (Exception e) {
                        System.err.println(prefix + "更新失败,原因是:" + e.getMessage());
                    }
                }
            
                /**
                 * 更新数据:采用错误的密码
                 */
                static void updateDataByBadAuthentication() {
            
                    String prefix = "[使用错误的授权信息]";
            
                    System.out.println(prefix + "更新数据:" + PATH);
                    try {
                        ZooKeeper badzk = new ZooKeeper(CONNECT_ADDR, 2000, null);
                        //授权
                        badzk.addAuthInfo(authentication_type,badAuthentication.getBytes());
                        Thread.sleep(2000);
                        Stat stat = badzk.exists(PATH, false);
                        if (stat!=null) {
                            badzk.setData(PATH, prefix.getBytes(), -1);
                            System.out.println(prefix + "更新成功");
                        }
                    } catch (Exception e) {
                        System.err.println(prefix + "更新失败,原因是:" + e.getMessage());
                    }
                }
            
                /**
                 * 更新数据:采用正确的密码
                 */
                static void updateDataByCorrectAuthentication() {
            
                    String prefix = "[使用正确的授权信息]";
            
                    System.out.println(prefix + "更新数据:" + PATH);
                    try {
                        Stat stat = zk.exists(PATH, false);
                        if (stat!=null) {
                            zk.setData(PATH, prefix.getBytes(), -1);
                            System.out.println(prefix + "更新成功");
                        }
                    } catch (Exception e) {
                        System.err.println(prefix + "更新失败,原因是:" + e.getMessage());
                    }
                }
            
                /**
                 * 不使用密码 删除节点
                 */
                static void deleteNodeByNoAuthentication() throws Exception {
            
                    String prefix = "[不使用任何授权信息]";
            
                    try {
                        System.out.println(prefix + "删除节点:" + PATH_DEL);
                        ZooKeeper nozk = new ZooKeeper(CONNECT_ADDR, 2000, null);
                        Thread.sleep(2000);
                        Stat stat = nozk.exists(PATH_DEL, false);
                        if (stat!=null) {
                            nozk.delete(PATH_DEL,-1);
                            System.out.println(prefix + "删除成功");
                        }
                    } catch (Exception e) {
                        System.err.println(prefix + "删除失败,原因是:" + e.getMessage());
                    }
                }
            
                /**
                 * 采用错误的密码删除节点
                 */
                static void deleteNodeByBadAuthentication() throws Exception {
            
                    String prefix = "[使用错误的授权信息]";
            
                    try {
                        System.out.println(prefix + "删除节点:" + PATH_DEL);
                        ZooKeeper badzk = new ZooKeeper(CONNECT_ADDR, 2000, null);
                        //授权
                        badzk.addAuthInfo(authentication_type,badAuthentication.getBytes());
                        Thread.sleep(2000);
                        Stat stat = badzk.exists(PATH_DEL, false);
                        if (stat!=null) {
                            badzk.delete(PATH_DEL, -1);
                            System.out.println(prefix + "删除成功");
                        }
                    } catch (Exception e) {
                        System.err.println(prefix + "删除失败,原因是:" + e.getMessage());
                    }
                }
            
                /**
                 * 使用正确的密码删除节点
                 */
                static void deleteNodeByCorrectAuthentication() throws Exception {
            
                    String prefix = "[使用正确的授权信息]";
            
                    try {
                        System.out.println(prefix + "删除节点:" + PATH_DEL);
                        Stat stat = zk.exists(PATH_DEL, false);
                        if (stat!=null) {
                            zk.delete(PATH_DEL, -1);
                            System.out.println(prefix + "删除成功");
                        }
                    } catch (Exception e) {
                        System.out.println(prefix + "删除失败,原因是:" + e.getMessage());
                    }
                }
            
                /**
                 * 使用正确的密码删除节点
                 */
                static void deleteParent() throws Exception {
                    try {
                        Stat stat = zk.exists(PATH_DEL, false);
                        if (stat == null) {
                            zk.delete(PATH, -1);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            
            }

六、Zookeeper选举机制以及ZAB协议


由上图可见,通过Leader进行写操作,主要分为五步:
1. 客户端向Leader发起写请求
2. Leader将写请求以(广播)Proposal的形式发给所有Follower并等待(回应)ACK
3. Follower收到Leader的Proposal后返回ACK
4. Leader得到过半数的ACK(Leader对自己默认有一个ACK)后向所有的Follower和Observer发送Commmit
5. Leader将处理结果返回给客户端
这里要注意
1. Leader并不需要得到Observer的ACK,即Observer无投票权
2. Leader不需要得到所有Follower的ACK,只要收到过半的ACK即可,同时Leader本身对自己有一个ACK。上图中有4个Follower,只需其中两个返回ACK即可,因为(2+1) / (4+1) > 1/2
3. Observer虽然无投票权,但仍须同步Leader的数据从而在处理读请求时可以返回尽可能新的数据
从上图可见
1. Follower/Observer均可接受写请求,但不能直接处理,而需要将写请求转发给Leader处理
2. 除了多了一步请求转发,其它流程与直接写Leader无任何区别
注意:Leader/Follower/Observer都可直接处理读请求,由于处理读请求不需要服务器之间的交互,Follower/Observer越多,整体可处理的读请求量越大,也即读性能越好。
选举机制
      几种领导选举场景

1. 集群启动领导选举
初始投票给自己  集群刚启动时,所有服务器的logicClock都为1,zxid都为0。
各服务器初始化后,都投票给自己,并将自己的一票存入自己的票箱,如下图所示。
在上图中,(1, 1, 0)第一位数代表投出该选票的服务器的logicClock,第二位数代表被推荐的服务器的myid,第三位代表被推荐的服务器的最大的zxid。由于该步骤中所有选票都投给自己,所以第二位的myid即是自己的myid,第三位的zxid即是自己的zxid。  
此时各自的票箱中只有自己投给自己的一票。
更新选票  服务器收到外部投票后,进行选票PK,相应更新自己的选票并广播出去,并将合适的选票存入自己的票箱,如下图所示。


服务器1收到服务器2的选票(1, 2, 0)和服务器3的选票(1, 3, 0)后,由于所有的logicClock都相等,所有的zxid都相等,因此根据myid判断应该将自己的选票按照服务器3的选票更新为(1, 3, 0),并将自己的票箱全部清空,再将服务器3的选票与自己的选票存入自己的票箱,接着将自己更新后的选票广播出去。此时服务器1票箱内的选票为(1, 3),(3, 3)。
同理,服务器2收到服务器3的选票后也将自己的选票更新为(1, 3, 0)并存入票箱然后广播。此时服务器2票箱内的选票为(2, 3),(3, ,3)。
服务器3根据上述规则,无须更新选票,自身的票箱内选票仍为(3, 3)。
服务器1与服务器2更新后的选票广播出去后,由于三个服务器最新选票都相同,最后三者的票箱内都包含三张投给服务器3的选票。
根据选票确定角色   根据上述选票,三个服务器一致认为此时服务器3应该是Leader。因此服务器1和2都进入FOLLOWING状   态,而服务器3进入LEADING状态。之后Leader发起并维护与Follower间的心跳。

2. Follower重启
Follower重启,或者发生网络分区后找不到Leader,会进入LOOKING状态并发起新的一轮投票。首先Follower重启投票给自己

发现已有Leader后成为Follower,服务器3收到服务器1的投票后,将自己的状态LEADING以及选票返回给服务器1。服务器2收到服务器1的投票后,将自己的状态FOLLOWING及选票返回给服务器1。此时服务器1知道服务器3是Leader,并且通过服务器2与服务器3的选票可以确定服务器3确实得到了超过半数的选票。因此服务器1进入FOLLOWING状态

3. Leader重启
Leader(服务器3)宕机后,Follower(服务器1和2)发现Leader不工作了,因此进入LOOKING状态并发起新的一轮投票,并且都将票投给自己。


使用ZAB协议广播更新选票  
服务器1和2根据外部投票确定是否要更新自身的选票。这里有两种情况
1. 服务器1和2的zxid相同。例如在服务器3宕机前服务器1与2完全与之同步。此时选票的更新主要取决于myid的大小
2. 服务器1和2的zxid不同。在旧Leader宕机之前,其所主导的写操作,只需过半服务器确认即可,而不需所有服务器确认。换句话说,服务器1和2可能一个与旧Leader同步(即zxid与之相同)另一个不同步(即zxid比之小)。此时选票的更新主要取决于谁的zxid较大
在上图中,服务器1的zxid为11,而服务器2的zxid为10,因此服务器2将自身选票更新为(3, 1, 11)。  
经过上一步选票更新后,服务器1与服务器2均将选票投给服务器1,因此服务器2成为Follower,而服务器1成为新的Leader并维护与服务器2的心跳


旧Leader恢复后发起选举   
旧的Leader恢复后,进入LOOKING状态并发起新一轮领导选举,并将选票投给自己。此时服务器1会将自己的LEADING状态及选票(3, 1, 11)返回给服务器3,而服务器2将自己的FOLLOWING状态及选票(3, 1, 11)返回给服务器3。如下图所示

旧Leader成为Follower  
服务器3了解到Leader为服务器1,且根据选票了解到服务器1确实得到过半服务器的选票,因此自己进入FOLLOWING状态




ZAB协议一致性保证
ZAB协议保证了在Leader选举的过程中,已经被Commit的数据不会丢失,未被Commit的数据对客户端不可见。
Commit过的数据不丢失  
Failover前状态:为更好演示Leader Failover过程,本例中共使用5个Zookeeper服务器。A作为Leader,共收到P1、P2、P3三条消息,并且Commit了1和2,且总体顺序为P1、P2、C1、P3、C2。根据顺序性原则,其它Follower收到的消息的顺序肯定与之相同。其中B与A完全同步,C收到P1、P2、C1,D收到P1、P2,E收到P1,如下图所示。


这里要注意
1. 由于A没有C3,意味着收到P3的服务器的总个数不会超过一半,也即包含A在内最多只有两台服务器收到P3。在这里A和B收到P3,其它服务器均未收到P3  
2. 由于A已写入C1、C2,说明它已经Commit了P1、P2,因此整个集群有超过一半的服务器,即最少三个服务器收到P1、P2。在这里所有服务器都收到了P1,除E外其它服务器也都收到了P2
选出新Leader  
旧Leader也即A宕机后,其它服务器根据上述FastLeaderElection算法选出B作为新的Leader。C、D和E成为Follower且以B为Leader后,会主动将自己最大的zxid发送给B,B会将Follower的zxid与自身zxid间的所有被Commit过的消息同步给Follower,如下图所示。

在上图中
1. P1和P2都被A Commit,因此B会通过同步保证P1、P2、C1与C2都存在于C、D和E中
2. P3由于未被A Commit,同时幸存的所有服务器中P3未存在于大多数据服务器中,因此它不会被同步到其它Follower
未Commit过的消息对客户端不可见  
在上例中,P3未被A Commit过,同时因为没有过半的服务器收到P3,因此B也未Commit P3(如果有过半服务器收到P3,即使A未Commit P3,B会主动Commit P3,即C3),所以它不会将P3广播出去。
具体做法是,B在成为Leader后,先判断自身未Commit的消息(本例中即P3)是否存在于大多数服务器中从而决定是否要将其Commit。然后B可得出自身所包含的被Commit过的消息中的最小zxid(记为min_zxid)与最大zxid(记为max_zxid)。C、D和E向B发送自身Commit过的最大消息zxid(记为max_zxid)以及未被Commit过的所有消息(记为zxid_set)。B根据这些信息作出如下操作
1. 如果Follower的max_zxid与Leader的max_zxid相等,说明该Follower与Leader完全同步,无须同步任何数据
2. 如果Follower的max_zxid在Leader的(min_zxid,max_zxid)范围内,Leader会通过TRUNC命令通知Follower将其zxid_set中大于Follower的max_zxid(如果有)的所有消息全部删除
上述操作保证了未被Commit过的消息不会被Commit从而对外不可见。


七、Zookeeper的应用场景
分布式协调  
这个其实是zk很经典的一个用法,简单来说,就好比,你A系统发送个请求到mq,然后B消息消费之后处理了。那A系统如何知道B系统的处理结果?用zk就可以实现分布式系统之间的协调工作。A系统发送请求之后可以在zk上对某个节点的值注册个监听器,一旦B系统处理完了就修改zk那个节点的值,A立马就可以收到通知,完美解决。

分布式锁
对某一个数据连续发出两个修改操作,两台机器同时收到了请求,但是只能一台机器先执行另外一个机器再执行。那么此时就可以使用zk分布式锁,一个机器接收到了请求之后先获取zk上的一把分布式锁,就是可以去创建一个znode,接着执行操作;然后另外一个机器也尝试去创建那个znode,结果发现自己创建不了,因为被别人创建了。。。。那只能等着,等第一个机器执行完了自己再执行
元数据/配置信息管理
zk可以用作很多系统的配置信息的管理,比如kafka、storm,solr集群等等很多分布式系统都会选用zk来做一些元数据、配置信息的管理,包括dubbo注册中心不也支持zk么
HA高可用性
这个应该是很常见的,比如hadoop、hdfs、yarn等很多大数据系统,都选择基于zk来开发HA高可用机制,就是一个重要进程一般会做主备两个,主进程挂了立马通过zk感知到切换到备用进程
八、总结
本文从概念讲解了zookeeper的理论知识,核心知识部分-数据模型、节点操作、watch机制以及ACL权限问题,同时将zookeeper集群ZAB协议如何选举领导进行了深入分析,如果读者能够将本文的内容梳理清楚并有了理解,那么zookeeper就搞定啦。最后也为大家讲解了zookeeper在企业开发中的应用场景,希望本文能够让读者对zookeeper有一定认识,让我们不只是停留在zookeeper的使用层面,大家一起加油,相互交流学习!!!!






欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) 黑马程序员IT技术论坛 X3.2