A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

本帖最后由 小蜀哥哥 于 2019-3-1 15:10 编辑

zookeeper一文精通

一、zookeeper简介
  • Zookeeper是一个高效的分布式协调服务,由雅虎创建,是 Google Chubby 的开源实现。 它暴露了一些公用服务,比如命名服务/配置管理/同步控制/群组服务等。我们可以使用ZK来实现比如达成共识/集群管理/leader选举等。 利用zookeeper的ZAB算法(原子消费广播协议)能够很好地保证分布式环境中数据的一致性,也正是基于这样的特性,使得Zookeeper成为了解决分布式一致性问题的利器

二、Zookeeper的数据模型
Zookeeper的数据模型是什么样子呢?它很像数据结构当中的树,也很像文件系统的目录。

树是由节点所组成,Zookeeper的数据存储也同样是基于节点,这种节点叫做Znode。
但是,不同于树的节点,Znode的引用方式是路径引用,类似于文件路径:
       / 动物 / 仓鼠
       / 植物 / 荷花
      这样的层级结构,让每一个Znode节点拥有唯一的路径,就像命名空间一样对不同信息作出清晰的隔离。

Znode节点中包含以下数据:
  • data:Znode存储的数据信息。
  • ACL:记录Znode的访问权限,即哪些人或哪些IP可以访问本节点。  
  • stat:包含Znode的各种元数据,比如事务ID、版本号(version)、时间戳、大小等等。  
  • child:当前节点的子节点引用,类似于二叉树的左孩子右孩子。  

这里需要注意一点,Zookeeper是为读多写少的场景所设计。Znode并不是用来存储大规模业务数据,而是用于存储少量的状态和配置信息,每个节点的数据最大不能超过1MB。
三、Zookeeper的基本操作和事件通知
基本操作:  
  • create : 创建节点
  • delete : 删除节点
  • exists : 判断节点是否存在
  • getData() : 获得一个节点的数据
  • setData : 设置一个节点的数据
  • getChildren : 获取节点下的所有子节点

这其中,exists,getData,getChildren属于读操作。Zookeeper客户端在请求读操作的时候,可以选择是否设置Watch。我们可以理解成是注册在特定Znode上的触发器。当这个Znode发生改变,也就是调用了create,delete,setData方法的时候,将会触发Znode上注册的对应事件,请求Watch的客户端会接收到异步通知。
具体交互过程如下:
1.客户端调用getData方法,watch参数是true。服务端接到请求,返回节点数据,并且在对应的哈希表里插入被Watch的Znode路径,以及Watcher列表。

2.当被Watch的Znode已删除,服务端会查找哈希表,找到该Znode对应的所有Watcher,异步通知客户端,并且删除哈希表中对应的Key-Value。
四、java客户端操作Zookeeper
  • 引入依赖

[XML] 纯文本查看 复制代码
<dependency>
        <groupId>org.apache.zookeeper</groupId>
          <artifactId>zookeeper</artifactId>
          <version>3.4.6</version>
</dependency>



  • 操作代码

[Java] 纯文本查看 复制代码
public class ZookeeperBase {
                    static final String CONNECT_ADDR = "192.168.2.2:2181";
                    /** session超时时间 */
                    static final int SESSION_OUTTIME = 2000;// ms
                    /** 信号量,阻塞程序执行,用于等待zookeeper连接成功,发送成功信号 */
                    static final CountDownLatch connectedSemaphore = new CountDownLatch(1);
                    
                    public static void main(String[] args) throws Exception {
          
                        ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME,
                                new Watcher() {
                                    @Override
                                    public void process(WatchedEvent event) {
                                        // 获取事件的状态
                                        KeeperState keeperState = event.getState();
                                        EventType eventType = event.getType();
                                        // 如果是建立连接
                                        if (KeeperState.SyncConnected == keeperState) {
                                            if (EventType.None == eventType) {
                                                // 如果建立连接成功,则发送信号量,让后续阻塞程序向下执行
                                                System.out.println("zk 建立连接");
                                                connectedSemaphore.countDown();
                                            }
                                        }
                                    }
                                });
                    
                        // 进行阻塞
                        connectedSemaphore.await();
                    
                        System.out.println("..");
                        // 创建父节点
                        // zk.create("/testRoot", "testRoot".getBytes(), Ids.OPEN_ACL_UNSAFE,
                        // CreateMode.PERSISTENT);
                    
                        // 创建子节点
                        // zk.create("/testRoot/children", "children data".getBytes(),
                        // Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    
                        // 获取节点洗信息
                        // byte[] data = zk.getData("/testRoot", false, null);
                        // System.out.println(new String(data));
                        // System.out.println(zk.getChildren("/testRoot", false));
                    
                        // 修改节点的值
                        // zk.setData("/testRoot", "modify data root".getBytes(), -1);
                        // byte[] data = zk.getData("/testRoot", false, null);
                        // System.out.println(new String(data));
                    
                        // 判断节点是否存在
                        // System.out.println(zk.exists("/testRoot/children", false));
                        // 删除节点
                        // zk.delete("/testRoot/children", -1);
                        // System.out.println(zk.exists("/testRoot/children", false));
                    
                        zk.close();
                    
                            }
                    
                  }


五、zookeeper核心原理-Watcher、ZK状态、事件类型、权限
zookeeper有watch事件,是一次性触发的,当watch监视的数据发生变化时,通知设置了该watch的client,即watcher
  • 事件类型:(znode节点相关的)  

    • EventType.NodeCreated  
    • EventType.NodeDataChanged  
    • EventType.NodeChildrenChanged
    • EventType.NodeDeleted

  • 状态类型:(跟客户端实例相关的)  

    • keeperState.Disconnected
    • keeperState.SyncConnected
    • keeperState.AuthFailed
    • keeperState.Expired


watcher的特性:一次性、客户端串行执行、轻量
  • 一次性:对于ZK的watcher,只需要记住一点:zookeeper有watch事件,是一次性触发的,当watch监视的数据发生变化时,通知设置了该watch的client,即watcher,由于zookeeper的监控都是一次性的,所以每次必须设置监控。
  • 客户端串行执行:客户端Watcher回调的过程是一个串行同步的过程,这为我们保证了顺序,同事需要开发人员注意一点,千万不要因为一个Watcher的处理逻辑影响了整个客户端的Watcher回调。
  • 轻量:WatchedEvent 是Zookeeper整个Watcher通知机制的最小通知单元,整个结构只包含三部分:通知状态、事件类型和节点路径。也就是说Watcher通知非常的简单,只会告诉客户端发生了事件,而不会告知其具体内容,需要客户端自己去进行获取,比如NodeDataChanged事件,Zookeeper只会通知客户端指定节点的数据发生了变更,而不会直接提供具体的数据内容。

操作代码如下
[Java] 纯文本查看 复制代码
public class ZooKeeperWatcher implements Watcher {
                   /** 定义原子变量 */
                    AtomicInteger seq = new AtomicInteger();
                    /** 定义session失效时间 */
                    private static final int SESSION_TIMEOUT = 10000;
                    /** zookeeper服务器地址 */
                    private static final String CONNECTION_ADDR = "192.168.80.88:2181";
                    /** zk父路径设置 */
                    private static final String PARENT_PATH = "/testWatch";
                    /** zk子路径设置 */
                    private static final String CHILDREN_PATH = "/testWatch/children";
                    /** 进入标识 */
                    private static final String LOG_PREFIX_OF_MAIN = "【Main】";
                    /** zk变量 */
                    private ZooKeeper zk = null;
                    /** 信号量设置,用于等待zookeeper连接建立之后 通知阻塞程序继续向下执行 */
                    private CountDownLatch connectedSemaphore = new CountDownLatch(1);
                
                    /**
                     * 创建ZK连接
                     * @param connectAddr ZK服务器地址列表
                     * @param sessionTimeout Session超时时间
                     */
                    public void createConnection(String connectAddr, int sessionTimeout) {
                        this.releaseConnection();
                        try {
                            zk = new ZooKeeper(connectAddr, sessionTimeout, this);
                            System.out.println(LOG_PREFIX_OF_MAIN + "开始连接ZK服务器");
                            connectedSemaphore.await();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                
                    /**
                     * 关闭ZK连接
                     */
                    public void releaseConnection() {
                        if (this.zk != null) {
                            try {
                                this.zk.close();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                
                    /**
                     * 创建节点
                     * @param path 节点路径
                     * @param data 数据内容
                     * @return 
                     */
                    public boolean createPath(String path, String data) {
                        try {
                            //设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)
                            this.zk.exists(path, true);
                            System.out.println(LOG_PREFIX_OF_MAIN + "节点创建成功, Path: " + 
                                               this.zk.create(    /**路径*/ 
                                                                   path, 
                                                                   /**数据*/
                                                                   data.getBytes(), 
                                                                   /**所有可见*/
                                                                   Ids.OPEN_ACL_UNSAFE, 
                                                                   /**永久存储*/
                                                                   CreateMode.PERSISTENT ) +     
                                               ", content: " + data);
                        } catch (Exception e) {
                            e.printStackTrace();
                            return false;
                        }
                        return true;
                    }
                
                    /**
                     * 读取指定节点数据内容
                     * @param path 节点路径
                     * @return
                     */
                    public String readData(String path, boolean needWatch) {
                        try {
                            return new String(this.zk.getData(path, needWatch, null));
                        } catch (Exception e) {
                            e.printStackTrace();
                            return "";
                        }
                    }
                
                    /**
                     * 更新指定节点数据内容
                     * @param path 节点路径
                     * @param data 数据内容
                     * @return
                     */
                    public boolean writeData(String path, String data) {
                        try {
                            System.out.println(LOG_PREFIX_OF_MAIN + "更新数据成功,path:" + path + ", stat: " +
                                                this.zk.setData(path, data.getBytes(), -1));
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        return false;
                    }
                
                    /**
                     * 删除指定节点
                     * 
                     * @param path
                     *            节点path
                     */
                    public void deleteNode(String path) {
                        try {
                            this.zk.delete(path, -1);
                            System.out.println(LOG_PREFIX_OF_MAIN + "删除节点成功,path:" + path);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                
                    /**
                     * 判断指定节点是否存在
                     * @param path 节点路径
                     */
                    public Stat exists(String path, boolean needWatch) {
                        try {
                            return this.zk.exists(path, needWatch);
                        } catch (Exception e) {
                            e.printStackTrace();
                            return null;
                        }
                    }
                
                    /**
                     * 获取子节点
                     * @param path 节点路径
                     */
                    private List<String> getChildren(String path, boolean needWatch) {
                        try {
                            return this.zk.getChildren(path, needWatch);
                        } catch (Exception e) {
                            e.printStackTrace();
                            return null;
                        }
                    }
                
                    /**
                     * 删除所有节点
                     */
                    public void deleteAllTestPath() {
                        if(this.exists(CHILDREN_PATH, false) != null){
                            this.deleteNode(CHILDREN_PATH);
                        }
                        if(this.exists(PARENT_PATH, false) != null){
                            this.deleteNode(PARENT_PATH);
                        }        
                    }
                    
                    /**
                     * 收到来自Server的Watcher通知后的处理。
                     */
                    @Override
                    public void process(WatchedEvent event) {
                        
                        System.out.println("进入 process 。。。。。event = " + event);
                        
                        try {
                            Thread.sleep(200);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        
                        if (event == null) {
                            return;
                        }
                        
                        // 连接状态
                        KeeperState keeperState = event.getState();
                        // 事件类型
                        EventType eventType = event.getType();
                        // 受影响的path
                        String path = event.getPath();
                        
                        String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";
                
                        System.out.println(logPrefix + "收到Watcher通知");
                        System.out.println(logPrefix + "连接状态:\t" + keeperState.toString());
                        System.out.println(logPrefix + "事件类型:\t" + eventType.toString());
                
                        if (KeeperState.SyncConnected == keeperState) {
                            // 成功连接上ZK服务器
                            if (EventType.None == eventType) {
                                System.out.println(logPrefix + "成功连接上ZK服务器");
                                connectedSemaphore.countDown();
                            } 
                            //创建节点
                            else if (EventType.NodeCreated == eventType) {
                                System.out.println(logPrefix + "节点创建");
                                try {
                                    Thread.sleep(100);
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                                this.exists(path, true);
                            } 
                            //更新节点
                            else if (EventType.NodeDataChanged == eventType) {
                                System.out.println(logPrefix + "节点数据更新");
                                System.out.println("我看看走不走这里........");
                                try {
                                    Thread.sleep(100);
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                                System.out.println(logPrefix + "数据内容: " + this.readData(PARENT_PATH, true));
                            } 
                            //更新子节点
                            else if (EventType.NodeChildrenChanged == eventType) {
                                System.out.println(logPrefix + "子节点变更");
                                try {
                                    Thread.sleep(3000);
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                                System.out.println(logPrefix + "子节点列表:" + this.getChildren(PARENT_PATH, true));
                            } 
                            //删除节点
                            else if (EventType.NodeDeleted == eventType) {
                                System.out.println(logPrefix + "节点 " + path + " 被删除");
                            }
                            else ;
                        } 
                        else if (KeeperState.Disconnected == keeperState) {
                            System.out.println(logPrefix + "与ZK服务器断开连接");
                        } 
                        else if (KeeperState.AuthFailed == keeperState) {
                            System.out.println(logPrefix + "权限检查失败");
                        } 
                        else if (KeeperState.Expired == keeperState) {
                            System.out.println(logPrefix + "会话失效");
                        }
                        else ;
                
                        System.out.println("--------------------------------------------");
                
                    }
                
                    /**
                     * <B>方法名称:</B>测试zookeeper监控<BR>
                     * <B>概要说明:</B>主要测试watch功能<BR>
                     * @param args
                     * @throws Exception
                     */
                    public static void main(String[] args) throws Exception {
                
                        //建立watcher
                        ZooKeeperWatcher zkWatch = new ZooKeeperWatcher();
                        //创建连接
                        zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);
                        //System.out.println(zkWatch.zk.toString());
                        
                        Thread.sleep(1000);
                        
                        // 清理节点
                        zkWatch.deleteAllTestPath();
                        
                        if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "")) {
                            
                            Thread.sleep(1000);


                            // 读取数据
                            System.out.println("---------------------- read parent ----------------------------");
                            //zkWatch.readData(PARENT_PATH, true);
                            
                            // 读取子节点
                            System.out.println("---------------------- read children path ----------------------------");
                            zkWatch.getChildren(PARENT_PATH, true);
                
                            // 更新数据
                            zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");
                            
                            Thread.sleep(1000);
                            
                            // 创建子节点
                            zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "");
                            
                            Thread.sleep(1000);
                            
                            zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");
                        }
                        
                        Thread.sleep(50000);
                        // 清理节点
                        zkWatch.deleteAllTestPath();
                        Thread.sleep(1000);
                        zkWatch.releaseConnection();
                    }
        
                }

zookeeper的ACL(AUTH):
ACL(Access Control ListZookeeper)提供一套完善的ACL权限控制机制来保障数据的安全ZK提供了三种模式。权限模式,授权对象,权限。
  • 权限模式:Scheme,开发人员最多使用一下四种权限模式:  

    • IP:ip模式通过ip地址粒度来进行控制权限,例如配置了:ip:192.168.1.109即表示权限控制都是针对这个ip地址的,同时也支持按网段分配,比如192.168.1.*
    • Digest:digest是最常用的权限控制模式,也更符合我们对权限控制的认识,其类似于“username:password” 形式的权限标识进行权限配置,ZK会对形成的权限标识先后进行两次编码处理,分别是SHA-1加密算法、BASE64编码。
    • World:World是一直最开放的全校性控制模式,这种模式可以看作为特殊的Digest,他仅仅是一个标识而已。
    • Super:超级用户模式,在超级用户模式下可以对ZK任意进行操作

  • 权限对象:指的是权限赋予的用户或者一个指定的实体,例如ip地址或机器等,在不同的模式下,授权对象是不同的,这种模式和权限对象一一对应。
  • 权限:权限就是指那些通过权限检测后可以被允许执行的操作,在ZK中,对数据的操作权限分为以下五大类:

  CREATE,DELLETE,READ,WRITE,ADMIN
操作代码如下:
[AppleScript] 纯文本查看 复制代码
public class ZookeeperAuth implements Watcher {
                /** 连接地址 */
                final static String CONNECT_ADDR = "192.168.80.88:2181";
                /** 测试路径 */
                final static String PATH = "/testAuth";
                final static String PATH_DEL = "/testAuth/delNode";
                /** 认证类型 */
                final static String authentication_type = "digest";
                /** 认证正确方法 */
                final static String correctAuthentication = "123456";
                /** 认证错误方法 */
                final static String badAuthentication = "654321";
                
                static ZooKeeper zk = null;
                /** 计时器 */
                AtomicInteger seq = new AtomicInteger();
                /** 标识 */
                private static final String LOG_PREFIX_OF_MAIN = "【Main】";
                
                private CountDownLatch connectedSemaphore = new CountDownLatch(1);
                
                @Override
                public void process(WatchedEvent event) {
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (event==null) {
                        return;
                    }
                    // 连接状态
                    KeeperState keeperState = event.getState();
                    // 事件类型
                    EventType eventType = event.getType();
                    // 受影响的path
                    String path = event.getPath();
                    
                    String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";
            
                    System.out.println(logPrefix + "收到Watcher通知");
                    System.out.println(logPrefix + "连接状态:\t" + keeperState.toString());
                    System.out.println(logPrefix + "事件类型:\t" + eventType.toString());
                    if (KeeperState.SyncConnected == keeperState) {
                        // 成功连接上ZK服务器
                        if (EventType.None == eventType) {
                            System.out.println(logPrefix + "成功连接上ZK服务器");
                            connectedSemaphore.countDown();
                        } 
                    } else if (KeeperState.Disconnected == keeperState) {
                        System.out.println(logPrefix + "与ZK服务器断开连接");
                    } else if (KeeperState.AuthFailed == keeperState) {
                        System.out.println(logPrefix + "权限检查失败");
                    } else if (KeeperState.Expired == keeperState) {
                        System.out.println(logPrefix + "会话失效");
                    }
                    System.out.println("--------------------------------------------");
                }
                /**
                 * 创建ZK连接
                 * 
                 * @param connectString
                 *            ZK服务器地址列表
                 * @param sessionTimeout
                 *            Session超时时间
                 */
                public void createConnection(String connectString, int sessionTimeout) {
                    this.releaseConnection();
                    try {
                        zk = new ZooKeeper(connectString, sessionTimeout, this);
                        //添加节点授权
                        zk.addAuthInfo(authentication_type,correctAuthentication.getBytes());
                        System.out.println(LOG_PREFIX_OF_MAIN + "开始连接ZK服务器");
                        //倒数等待
                        connectedSemaphore.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                
                /**
                 * 关闭ZK连接
                 */
                public void releaseConnection() {
                    if (this.zk!=null) {
                        try {
                            this.zk.close();
                        } catch (InterruptedException e) {
                        }
                    }
                }
                
                /**
                 * 
                 * <B>方法名称:</B>测试函数<BR>
                 * <B>概要说明:</B>测试认证<BR>
                 * @param args
                 * @throws Exception
                 */
                public static void main(String[] args) throws Exception {
                    
                    ZookeeperAuth testAuth = new ZookeeperAuth();
                    testAuth.createConnection(CONNECT_ADDR,2000);
                    List<ACL> acls = new ArrayList<ACL>(1);
                    for (ACL ids_acl : Ids.CREATOR_ALL_ACL) {
                        acls.add(ids_acl);
                    }
            
                    try {
                        zk.create(PATH, "init content".getBytes(), acls, CreateMode.PERSISTENT);
                        System.out.println("使用授权key:" + correctAuthentication + "创建节点:"+ PATH + ", 初始内容是: init content");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    try {
                        zk.create(PATH_DEL, "will be deleted! ".getBytes(), acls, CreateMode.PERSISTENT);
                        System.out.println("使用授权key:" + correctAuthentication + "创建节点:"+ PATH_DEL + ", 初始内容是: init content");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
            
                    // 获取数据
                    getDataByNoAuthentication();
                    getDataByBadAuthentication();
                    getDataByCorrectAuthentication();
            
                    // 更新数据
                    updateDataByNoAuthentication();
                    updateDataByBadAuthentication();
                    updateDataByCorrectAuthentication();
            
                    // 删除数据
                    deleteNodeByBadAuthentication();
                    deleteNodeByNoAuthentication();
                    deleteNodeByCorrectAuthentication();
                    //
                    Thread.sleep(1000);
                    
                    deleteParent();
                    //释放连接
                    testAuth.releaseConnection();
                }
                /** 获取数据:采用错误的密码 */
                static void getDataByBadAuthentication() {
                    String prefix = "[使用错误的授权信息]";
                    try {
                        ZooKeeper badzk = new ZooKeeper(CONNECT_ADDR, 2000, null);
                        //授权
                        badzk.addAuthInfo(authentication_type,badAuthentication.getBytes());
                        Thread.sleep(2000);
                        System.out.println(prefix + "获取数据:" + PATH);
                        System.out.println(prefix + "成功获取数据:" + badzk.getData(PATH, false, null));
                    } catch (Exception e) {
                        System.err.println(prefix + "获取数据失败,原因:" + e.getMessage());
                    }
                }
            
                /** 获取数据:不采用密码 */
                static void getDataByNoAuthentication() {
                    String prefix = "[不使用任何授权信息]";
                    try {
                        System.out.println(prefix + "获取数据:" + PATH);
                        ZooKeeper nozk = new ZooKeeper(CONNECT_ADDR, 2000, null);
                        Thread.sleep(2000);
                        System.out.println(prefix + "成功获取数据:" + nozk.getData(PATH, false, null));
                    } catch (Exception e) {
                        System.err.println(prefix + "获取数据失败,原因:" + e.getMessage());
                    }
                }
            
                /** 采用正确的密码 */
                static void getDataByCorrectAuthentication() {
                    String prefix = "[使用正确的授权信息]";
                    try {
                        System.out.println(prefix + "获取数据:" + PATH);
                        
                        System.out.println(prefix + "成功获取数据:" + zk.getData(PATH, false, null));
                    } catch (Exception e) {
                        System.out.println(prefix + "获取数据失败,原因:" + e.getMessage());
                    }
                }
            
                /**
                 * 更新数据:不采用密码
                 */
                static void updateDataByNoAuthentication() {
            
                    String prefix = "[不使用任何授权信息]";
            
                    System.out.println(prefix + "更新数据: " + PATH);
                    try {
                        ZooKeeper nozk = new ZooKeeper(CONNECT_ADDR, 2000, null);
                        Thread.sleep(2000);
                        Stat stat = nozk.exists(PATH, false);
                        if (stat!=null) {
                            nozk.setData(PATH, prefix.getBytes(), -1);
                            System.out.println(prefix + "更新成功");
                        }
                    } catch (Exception e) {
                        System.err.println(prefix + "更新失败,原因是:" + e.getMessage());
                    }
                }
            
                /**
                 * 更新数据:采用错误的密码
                 */
                static void updateDataByBadAuthentication() {
            
                    String prefix = "[使用错误的授权信息]";
            
                    System.out.println(prefix + "更新数据:" + PATH);
                    try {
                        ZooKeeper badzk = new ZooKeeper(CONNECT_ADDR, 2000, null);
                        //授权
                        badzk.addAuthInfo(authentication_type,badAuthentication.getBytes());
                        Thread.sleep(2000);
                        Stat stat = badzk.exists(PATH, false);
                        if (stat!=null) {
                            badzk.setData(PATH, prefix.getBytes(), -1);
                            System.out.println(prefix + "更新成功");
                        }
                    } catch (Exception e) {
                        System.err.println(prefix + "更新失败,原因是:" + e.getMessage());
                    }
                }
            
                /**
                 * 更新数据:采用正确的密码
                 */
                static void updateDataByCorrectAuthentication() {
            
                    String prefix = "[使用正确的授权信息]";
            
                    System.out.println(prefix + "更新数据:" + PATH);
                    try {
                        Stat stat = zk.exists(PATH, false);
                        if (stat!=null) {
                            zk.setData(PATH, prefix.getBytes(), -1);
                            System.out.println(prefix + "更新成功");
                        }
                    } catch (Exception e) {
                        System.err.println(prefix + "更新失败,原因是:" + e.getMessage());
                    }
                }
            
                /**
                 * 不使用密码 删除节点
                 */
                static void deleteNodeByNoAuthentication() throws Exception {
            
                    String prefix = "[不使用任何授权信息]";
            
                    try {
                        System.out.println(prefix + "删除节点:" + PATH_DEL);
                        ZooKeeper nozk = new ZooKeeper(CONNECT_ADDR, 2000, null);
                        Thread.sleep(2000);
                        Stat stat = nozk.exists(PATH_DEL, false);
                        if (stat!=null) {
                            nozk.delete(PATH_DEL,-1);
                            System.out.println(prefix + "删除成功");
                        }
                    } catch (Exception e) {
                        System.err.println(prefix + "删除失败,原因是:" + e.getMessage());
                    }
                }
            
                /**
                 * 采用错误的密码删除节点
                 */
                static void deleteNodeByBadAuthentication() throws Exception {
            
                    String prefix = "[使用错误的授权信息]";
            
                    try {
                        System.out.println(prefix + "删除节点:" + PATH_DEL);
                        ZooKeeper badzk = new ZooKeeper(CONNECT_ADDR, 2000, null);
                        //授权
                        badzk.addAuthInfo(authentication_type,badAuthentication.getBytes());
                        Thread.sleep(2000);
                        Stat stat = badzk.exists(PATH_DEL, false);
                        if (stat!=null) {
                            badzk.delete(PATH_DEL, -1);
                            System.out.println(prefix + "删除成功");
                        }
                    } catch (Exception e) {
                        System.err.println(prefix + "删除失败,原因是:" + e.getMessage());
                    }
                }
            
                /**
                 * 使用正确的密码删除节点
                 */
                static void deleteNodeByCorrectAuthentication() throws Exception {
            
                    String prefix = "[使用正确的授权信息]";
            
                    try {
                        System.out.println(prefix + "删除节点:" + PATH_DEL);
                        Stat stat = zk.exists(PATH_DEL, false);
                        if (stat!=null) {
                            zk.delete(PATH_DEL, -1);
                            System.out.println(prefix + "删除成功");
                        }
                    } catch (Exception e) {
                        System.out.println(prefix + "删除失败,原因是:" + e.getMessage());
                    }
                }
            
                /**
                 * 使用正确的密码删除节点
                 */
                static void deleteParent() throws Exception {
                    try {
                        Stat stat = zk.exists(PATH_DEL, false);
                        if (stat == null) {
                            zk.delete(PATH, -1);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            
            }

六、Zookeeper选举机制以及ZAB协议
  • Zookeeper架构中的角色           

    • leader:一个Zookeeper集群同一时间只会有一个实际工作的Leader,它会发起并维护与各Follwer及Observer间的心跳。所有的写操作必须要通过Leader完成再由Leader将写操作广播给其它服务器。
    • Follower:一个Zookeeper集群可能同时存在多个Follower,它会响应Leader的心跳。Follower可直接处理并返回客户端的读请求,同时会将写请求转发给Leader处理,并且负责在Leader处理写请求时对请求进行投票。
    • Observer:角色与Follower类似,但是无投票权。



  • 原子广播(ZAB)
    为了保证写操作的一致性与可用性,Zookeeper专门设计了一种名为原子广播(ZAB)的支持崩溃恢复的一致性协议。基于该协议,Zookeeper实现了一种主从模式的系统架构来保持集群中各个副本之间的数据一致性。
    根据ZAB协议,所有的写操作都必须通过Leader完成,Leader写入本地日志后再复制到所有的Follower节点,Observer节点。
    一旦Leader节点无法工作,ZAB协议能够自动从Follower节点中重新选出一个合适的替代者,即新的Leader,该过程即为领导选举。该领导选举过程,是ZAB协议中最为重要和复杂的过程。

由上图可见,通过Leader进行写操作,主要分为五步:
1. 客户端向Leader发起写请求
2. Leader将写请求以(广播)Proposal的形式发给所有Follower并等待(回应)ACK
3. Follower收到Leader的Proposal后返回ACK
4. Leader得到过半数的ACK(Leader对自己默认有一个ACK)后向所有的Follower和Observer发送Commmit
5. Leader将处理结果返回给客户端
这里要注意
1. Leader并不需要得到Observer的ACK,即Observer无投票权
2. Leader不需要得到所有Follower的ACK,只要收到过半的ACK即可,同时Leader本身对自己有一个ACK。上图中有4个Follower,只需其中两个返回ACK即可,因为(2+1) / (4+1) > 1/2
3. Observer虽然无投票权,但仍须同步Leader的数据从而在处理读请求时可以返回尽可能新的数据
从上图可见
1. Follower/Observer均可接受写请求,但不能直接处理,而需要将写请求转发给Leader处理
2. 除了多了一步请求转发,其它流程与直接写Leader无任何区别
注意:Leader/Follower/Observer都可直接处理读请求,由于处理读请求不需要服务器之间的交互,Follower/Observer越多,整体可处理的读请求量越大,也即读性能越好。
选举机制
  • 术语解释

    • myid:每个Zookeeper服务器,都需要在数据文件夹下创建一个名为myid的文件,该文件包含整个Zookeeper集群唯一的ID(整数)。例如某Zookeeper集群包含三台服务器,hostname分别为zoo1、zoo2和zoo3,其myid分别为1、2和3,则在配置文件中其ID与hostname必须一一对应,如下所示。在该配置文件中,server.后面的数据即为myid
    • 1. server.1=zoo1:2888:3888
    • 2. server.2=zoo2:2888:3888
    • 3. server.3=zoo3:2888:3888
    • zxid:类似于RDBMS中的事务ID,用于标识一次更新操作的Proposal ID。为了保证顺序性,该zkid必须单调递增。因此Zookeeper使用一个64位的数来表示,高32位是Leader的epoch,从1开始,每次选出新的Leader,epoch加一。低32位为该epoch内的序号,每次epoch变化,都将低32位的序号重置。这样保证了zkid的全局递增性。

  • 领导选举算法  到3.4.10版本为止,可选项有
  • 基于UDP的LeaderElection
  • 基于UDP的FastLeaderElection
  • 基于UDP和认证的FastLeaderElection
  •   基于TCP的FastLeaderElection  
    在3.4.10版本中,默认值为3,也即基于TCP的FastLeaderElection。另外三种算法已经被弃用,并且有计划在之后的版本中将它们彻底删除而不再支持。  
  • 服务器状态

    • LOOKING: 不确定Leader状态。该状态下的服务器认为当前集群中没有Leader,会发起Leader选举
    • FOLLOWING: 跟随者状态。表明当前服务器角色是Follower,并且它知道Leader是谁
    • LEADING: 领导者状态。表明当前服务器角色是Leader,它会维护与Follower间的心跳
    • OBSERVING: 观察者状态。表明当前服务器角色是Observer,与Folower唯一的不同在于不参与选举,也不参与集群写操作时的投票   

  • 选票数据结构  每个服务器在进行领导选举时,会发送如下关键信息
               logicClock 每个服务器会维护一个自增的整数,名为logicClock,它表示这是该服务器  发起的第多少轮投票           state 当前服务器的状态           self_id 当前服务器的myid           self_zxid 当前服务器上所保存的数据的最大zxid           vote_id 被推举的服务器的myid           vote_zxid 被推举的服务器上所保存的数据的最大zxid
  • 投票流程

    • 自增选举轮次

    Zookeeper规定所有有效的投票都必须在同一轮次中。每个服务器在开始新一轮投票时,会先对自己维护的logicClock进行自增操作。
    • 初始化选票

    每个服务器在广播自己的选票前,会将自己的投票箱清空。该投票箱记录了所收到的选票。例:服务器2投票给服务器3,服务器3投票给服务器1,则服务器1的投票箱为(2, 3), (3, 1), (1, 1)。票箱中只会记录每一投票者的最后一票,如投票者更新自己的选票,则其它服务器收到该新选票后会在自己票箱中更新该服务器的选票。
    • 发送初始化选票

    每个服务器最开始都是通过广播把票投给自己。
    • 接收外部投票

    服务器会尝试从其它服务器获取投票,并记入自己的投票箱内。如果无法获取任何外部投票,则会确认自己是否与集群中其它服务器保持着有效连接。如果是,则再次发送自己的投票;如果否,则马上与之建立连接。
    • 判断选举轮次收到外部投票后,首先会根据投票信息中所包含的logicClock来进行不同处理

    • 外部投票的logicClock大于自己的logicClock。说明该服务器的选举轮次落后于其它服务器的选举轮次,立即清空自己的投票箱并将自己的logicClock更新为收到的logicClock,然后再对比自己之前的投票与收到的投票以确定是否需要变更自己的投票,最终再次将自己的投票广播出去。  
    • 外部投票的logicClock小于自己的logicClock。当前服务器直接忽略该投票,继续处理下一个投票。  
    • 外部投票的logickClock与自己的相等。当时进行选票PK。

    • 选票PK选票PK是基于(self_id, self_zxid)与(vote_id, vote_zxid)的对比

    • 外部投票的logicClock大于自己的logicClock,则将自己的logicClock及自己的选票的logicClock变更为收到的logicClock  
    • 若logicClock一致,则对比二者的vote_zxid,若外部投票的vote_zxid比较大,则将自己的票中的vote_zxid与vote_myid更新为收到的票中的vote_zxid与vote_myid并广播出去,另外将收到的票及自己更新后的票放入自己的票箱。如果票箱内已存在(self_myid, self_zxid)相同的选票,则直接覆盖  
    • 若二者vote_zxid一致,则比较二者的vote_myid,若外部投票的vote_myid比较大,则将自己的票中的vote_myid更新为收到的票中的vote_myid并广播出去,另外将收到的票及自己更新后的票放入自己的票箱

    • 统计选票

    如果已经确定有过半服务器认可了自己的投票(可能是更新后的投票),则终止投票。否则继续接收其它服务器的投票。
    • 更新服务器状态

    投票终止后,服务器开始更新自身状态。若过半的票投给了自己,则将自己的服务器状态更新为LEADING,否则将自己的状态更新为FOLLOWING

      几种领导选举场景

1. 集群启动领导选举
初始投票给自己  集群刚启动时,所有服务器的logicClock都为1,zxid都为0。
各服务器初始化后,都投票给自己,并将自己的一票存入自己的票箱,如下图所示。
在上图中,(1, 1, 0)第一位数代表投出该选票的服务器的logicClock,第二位数代表被推荐的服务器的myid,第三位代表被推荐的服务器的最大的zxid。由于该步骤中所有选票都投给自己,所以第二位的myid即是自己的myid,第三位的zxid即是自己的zxid。  
此时各自的票箱中只有自己投给自己的一票。
更新选票  服务器收到外部投票后,进行选票PK,相应更新自己的选票并广播出去,并将合适的选票存入自己的票箱,如下图所示。


服务器1收到服务器2的选票(1, 2, 0)和服务器3的选票(1, 3, 0)后,由于所有的logicClock都相等,所有的zxid都相等,因此根据myid判断应该将自己的选票按照服务器3的选票更新为(1, 3, 0),并将自己的票箱全部清空,再将服务器3的选票与自己的选票存入自己的票箱,接着将自己更新后的选票广播出去。此时服务器1票箱内的选票为(1, 3),(3, 3)。
同理,服务器2收到服务器3的选票后也将自己的选票更新为(1, 3, 0)并存入票箱然后广播。此时服务器2票箱内的选票为(2, 3),(3, ,3)。
服务器3根据上述规则,无须更新选票,自身的票箱内选票仍为(3, 3)。
服务器1与服务器2更新后的选票广播出去后,由于三个服务器最新选票都相同,最后三者的票箱内都包含三张投给服务器3的选票。
根据选票确定角色   根据上述选票,三个服务器一致认为此时服务器3应该是Leader。因此服务器1和2都进入FOLLOWING状   态,而服务器3进入LEADING状态。之后Leader发起并维护与Follower间的心跳。

2. Follower重启
Follower重启,或者发生网络分区后找不到Leader,会进入LOOKING状态并发起新的一轮投票。首先Follower重启投票给自己

发现已有Leader后成为Follower,服务器3收到服务器1的投票后,将自己的状态LEADING以及选票返回给服务器1。服务器2收到服务器1的投票后,将自己的状态FOLLOWING及选票返回给服务器1。此时服务器1知道服务器3是Leader,并且通过服务器2与服务器3的选票可以确定服务器3确实得到了超过半数的选票。因此服务器1进入FOLLOWING状态

3. Leader重启
Leader(服务器3)宕机后,Follower(服务器1和2)发现Leader不工作了,因此进入LOOKING状态并发起新的一轮投票,并且都将票投给自己。


使用ZAB协议广播更新选票  
服务器1和2根据外部投票确定是否要更新自身的选票。这里有两种情况
1. 服务器1和2的zxid相同。例如在服务器3宕机前服务器1与2完全与之同步。此时选票的更新主要取决于myid的大小
2. 服务器1和2的zxid不同。在旧Leader宕机之前,其所主导的写操作,只需过半服务器确认即可,而不需所有服务器确认。换句话说,服务器1和2可能一个与旧Leader同步(即zxid与之相同)另一个不同步(即zxid比之小)。此时选票的更新主要取决于谁的zxid较大
在上图中,服务器1的zxid为11,而服务器2的zxid为10,因此服务器2将自身选票更新为(3, 1, 11)。  
经过上一步选票更新后,服务器1与服务器2均将选票投给服务器1,因此服务器2成为Follower,而服务器1成为新的Leader并维护与服务器2的心跳


旧Leader恢复后发起选举   
旧的Leader恢复后,进入LOOKING状态并发起新一轮领导选举,并将选票投给自己。此时服务器1会将自己的LEADING状态及选票(3, 1, 11)返回给服务器3,而服务器2将自己的FOLLOWING状态及选票(3, 1, 11)返回给服务器3。如下图所示

旧Leader成为Follower  
服务器3了解到Leader为服务器1,且根据选票了解到服务器1确实得到过半服务器的选票,因此自己进入FOLLOWING状态




ZAB协议一致性保证
ZAB协议保证了在Leader选举的过程中,已经被Commit的数据不会丢失,未被Commit的数据对客户端不可见。
Commit过的数据不丢失  
Failover前状态:为更好演示Leader Failover过程,本例中共使用5个Zookeeper服务器。A作为Leader,共收到P1、P2、P3三条消息,并且Commit了1和2,且总体顺序为P1、P2、C1、P3、C2。根据顺序性原则,其它Follower收到的消息的顺序肯定与之相同。其中B与A完全同步,C收到P1、P2、C1,D收到P1、P2,E收到P1,如下图所示。


这里要注意
1. 由于A没有C3,意味着收到P3的服务器的总个数不会超过一半,也即包含A在内最多只有两台服务器收到P3。在这里A和B收到P3,其它服务器均未收到P3  
2. 由于A已写入C1、C2,说明它已经Commit了P1、P2,因此整个集群有超过一半的服务器,即最少三个服务器收到P1、P2。在这里所有服务器都收到了P1,除E外其它服务器也都收到了P2
选出新Leader  
旧Leader也即A宕机后,其它服务器根据上述FastLeaderElection算法选出B作为新的Leader。C、D和E成为Follower且以B为Leader后,会主动将自己最大的zxid发送给B,B会将Follower的zxid与自身zxid间的所有被Commit过的消息同步给Follower,如下图所示。

在上图中
1. P1和P2都被A Commit,因此B会通过同步保证P1、P2、C1与C2都存在于C、D和E中
2. P3由于未被A Commit,同时幸存的所有服务器中P3未存在于大多数据服务器中,因此它不会被同步到其它Follower
未Commit过的消息对客户端不可见  
在上例中,P3未被A Commit过,同时因为没有过半的服务器收到P3,因此B也未Commit P3(如果有过半服务器收到P3,即使A未Commit P3,B会主动Commit P3,即C3),所以它不会将P3广播出去。
具体做法是,B在成为Leader后,先判断自身未Commit的消息(本例中即P3)是否存在于大多数服务器中从而决定是否要将其Commit。然后B可得出自身所包含的被Commit过的消息中的最小zxid(记为min_zxid)与最大zxid(记为max_zxid)。C、D和E向B发送自身Commit过的最大消息zxid(记为max_zxid)以及未被Commit过的所有消息(记为zxid_set)。B根据这些信息作出如下操作
1. 如果Follower的max_zxid与Leader的max_zxid相等,说明该Follower与Leader完全同步,无须同步任何数据
2. 如果Follower的max_zxid在Leader的(min_zxid,max_zxid)范围内,Leader会通过TRUNC命令通知Follower将其zxid_set中大于Follower的max_zxid(如果有)的所有消息全部删除
上述操作保证了未被Commit过的消息不会被Commit从而对外不可见。


七、Zookeeper的应用场景
分布式协调  
这个其实是zk很经典的一个用法,简单来说,就好比,你A系统发送个请求到mq,然后B消息消费之后处理了。那A系统如何知道B系统的处理结果?用zk就可以实现分布式系统之间的协调工作。A系统发送请求之后可以在zk上对某个节点的值注册个监听器,一旦B系统处理完了就修改zk那个节点的值,A立马就可以收到通知,完美解决。

分布式锁
对某一个数据连续发出两个修改操作,两台机器同时收到了请求,但是只能一台机器先执行另外一个机器再执行。那么此时就可以使用zk分布式锁,一个机器接收到了请求之后先获取zk上的一把分布式锁,就是可以去创建一个znode,接着执行操作;然后另外一个机器也尝试去创建那个znode,结果发现自己创建不了,因为被别人创建了。。。。那只能等着,等第一个机器执行完了自己再执行
元数据/配置信息管理
zk可以用作很多系统的配置信息的管理,比如kafka、storm,solr集群等等很多分布式系统都会选用zk来做一些元数据、配置信息的管理,包括dubbo注册中心不也支持zk么
HA高可用性
这个应该是很常见的,比如hadoop、hdfs、yarn等很多大数据系统,都选择基于zk来开发HA高可用机制,就是一个重要进程一般会做主备两个,主进程挂了立马通过zk感知到切换到备用进程
八、总结
本文从概念讲解了zookeeper的理论知识,核心知识部分-数据模型、节点操作、watch机制以及ACL权限问题,同时将zookeeper集群ZAB协议如何选举领导进行了深入分析,如果读者能够将本文的内容梳理清楚并有了理解,那么zookeeper就搞定啦。最后也为大家讲解了zookeeper在企业开发中的应用场景,希望本文能够让读者对zookeeper有一定认识,让我们不只是停留在zookeeper的使用层面,大家一起加油,相互交流学习!!!!

0 个回复

您需要登录后才可以回帖 登录 | 加入黑马