A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

本帖最后由 彼岸话雨 于 2019-2-21 14:19 编辑

Spark源码之Standalone模式下master持久化引擎讲解
      
      Standalone 模式下Master为了保证故障恢复,会持久化一些重要的数据,来避免master故障导致集群不可用这种情况(也即单点故障)。目前,有四种持久化策略:
1、基于zookeeper的持久化引擎;
2、基于文件的持久化引擎;
3、用户自定义持久化引擎;
4、不使用持久化引擎。

一、在master的OnStart方法中,对应的源码如下:
[Scala] 纯文本查看 复制代码
val serializer = new JavaSerializer(conf)
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
  case "ZOOKEEPER" =>
    logInfo("Persisting recovery state to ZooKeeper")
    val zkFactory =
      new ZooKeeperRecoveryModeFactory(conf, serializer)
    (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
  case "FILESYSTEM" =>
    val fsFactory =
      new FileSystemRecoveryModeFactory(conf, serializer)
    (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
  case "CUSTOM" =>
    val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
    val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
      .newInstance(conf, serializer)
      .asInstanceOf[StandaloneRecoveryModeFactory]
    (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
  case _ =>
    (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
      默认情况下是无持久化引擎,也就是没有ha策略。Spark提供的可用的ha策略:基于文件系统的和基于zookeeper。配置方法如下:
基于文件系统:
property
Meaning
spark.deploy.recoveryMode
FILESYSTEM
spark.deploy.recoveryDirectory
用来恢复状态的目录
基于zookeeper:
property
Meaning
spark.deploy.recoveryMode
ZOOKEEPER
spark.deploy.zookeeper.url
e.g., 192.168.1.100:2181,192.168.1.101:2181
spark.deploy.zookeeper.dir
zookeeper保存恢复状态的目录
      生产环境中可用的是基于zookeeper的持久化引擎。基于zookeeper持久化策略,会允许我们同时运行多个master,然后支持leader选举,最终是一个leader,其余是standby。

二、Spark的Master的leader选举实现
      Spark源码里面使用的是CuratorFramework,跟zookeeper交流。该框架有以下特点:
      1、自动连接管理:自动处理zookeeper的连接和重试存在一些潜在的问题;可以watch NodeDataChanged event和获取updateServerList;Watches可以自动被Cruator recipes删除;
      2、更加简洁的API:简化raw zookeeper方法,事件等,提供现代流式API接口;
      3、Recipe实现:leader选举,分布式锁,path缓存,和watcher,分布式队列,Barriers等。

三、Spark源码里面使用了LeaderLatch实现选举功能。
     这个实现实际是基于zookeeper的节点类型来做,zookeeper有四种节点类型:
     1、持久节点(PERSISTENT)
节点创建后,会一直存在,不会因客户端会话失效而删除;
     2、持久顺序节点(PERSISTENT_SEQUENTIAL)
基本特性与持久节点一致,创建节点的过程中,zookeeper会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名;
     3、临时节点(EPHEMERAL)
客户端会话失效或连接关闭后,该节点会被自动删除,且不能再临时节点下面创建子节点;
     4、临时顺序节点(EPHEMERAL_SEQUENTIAL)
基本特性与临时节点一致,创建节点的过程中,zookeeper会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名;LeaderLatch实现leader选举实际上基于临时顺序节点来做的。

四、Spark源码里面基于zookeeper的leader选举具体实现过程源码如下:
      在master的OnStart方法里面
[Scala] 纯文本查看 复制代码
leaderElectionAgent = leaderElectionAgent_
      实际是在构建zookeeper的持久化引擎的时候构建的:
[Scala] 纯文本查看 复制代码
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
      在createLeaderElectionAgent方法里面构建了
[Scala] 纯文本查看 复制代码
new ZooKeeperLeaderElectionAgent(master, conf)
      该对象继承了LeaderLatchListener,并且覆盖了notLeader和isLeader两个重要的方法具体。
在ZooKeeperLeaderElectionAgent构建的时候调用了自己的start方法,该方法构建了LeaderLatch,并添加ZooKeeperLeaderElectionAgent作为其listener。
[Scala] 纯文本查看 复制代码
private def start() {
  logInfo("Starting ZooKeeper LeaderElection agent")
  zk = SparkCuratorUtil.newClient(conf)
  leaderLatch = new LeaderLatch(zk, WORKING_DIR)
  leaderLatch.addListener(this)
  leaderLatch.start()
}
      Leader选举在zookeeper的临时节点的路径为:
[Scala] 纯文本查看 复制代码
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
      执行ZooKeeperLeaderElectionAgent对象的start方法之后,每当该对象所在的master由standby变为Leader的时候,会调用isLeader()方法。由Leader变为StandBy的时候会调用notLeader()。我们就可以在这两个方法里实现自己要的状态切换的相关操作。
[Scala] 纯文本查看 复制代码
override def isLeader() {
  synchronized {
    // could have lost leadership by now.
    if (!leaderLatch.hasLeadership) {
      return
    }

    logInfo("We have gained leadership")
    updateLeadershipStatus(true)
  }
}

override def notLeader() {
  synchronized {
    // could have gained leadership by now.
    if (leaderLatch.hasLeadership) {
      return
    }

    logInfo("We have lost leadership")
    updateLeadershipStatus(false)
  }
}
要实现我们自己应用,也可基于此方法。

1 个回复

正序浏览
感谢分享
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马