Spark源码之Standalone模式下master持久化引擎讲解
Standalone 模式下Master为了保证故障恢复,会持久化一些重要的数据,来避免master故障导致集群不可用这种情况(也即单点故障)。目前,有四种持久化策略:
1、基于zookeeper的持久化引擎;
2、基于文件的持久化引擎;
3、用户自定义持久化引擎;
4、不使用持久化引擎。
一、在master的OnStart方法中,对应的源码如下:
[Scala] 纯文本查看 复制代码
val serializer = new JavaSerializer(conf)
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
默认情况下是无持久化引擎,也就是没有ha策略。Spark提供的可用的ha策略:基于文件系统的和基于zookeeper。配置方法如下:
基于文件系统:
property | |
spark.deploy.recoveryMode | |
spark.deploy.recoveryDirectory | |
基于zookeeper:
property | |
spark.deploy.recoveryMode | |
spark.deploy.zookeeper.url | e.g., 192.168.1.100:2181,192.168.1.101:2181 |
spark.deploy.zookeeper.dir | |
生产环境中可用的是基于zookeeper的持久化引擎。基于zookeeper持久化策略,会允许我们同时运行多个master,然后支持leader选举,最终是一个leader,其余是standby。
二、Spark的Master的leader选举实现
Spark源码里面使用的是CuratorFramework,跟zookeeper交流。该框架有以下特点:
1、自动连接管理:自动处理zookeeper的连接和重试存在一些潜在的问题;可以watch NodeDataChanged event和获取updateServerList;Watches可以自动被Cruator recipes删除;
2、更加简洁的API:简化raw zookeeper方法,事件等,提供现代流式API接口;
3、Recipe实现:leader选举,分布式锁,path缓存,和watcher,分布式队列,Barriers等。
三、Spark源码里面使用了LeaderLatch实现选举功能。
这个实现实际是基于zookeeper的节点类型来做,zookeeper有四种节点类型:
1、持久节点(PERSISTENT)
节点创建后,会一直存在,不会因客户端会话失效而删除;
2、持久顺序节点(PERSISTENT_SEQUENTIAL)
基本特性与持久节点一致,创建节点的过程中,zookeeper会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名;
3、临时节点(EPHEMERAL)
客户端会话失效或连接关闭后,该节点会被自动删除,且不能再临时节点下面创建子节点;
4、临时顺序节点(EPHEMERAL_SEQUENTIAL)
基本特性与临时节点一致,创建节点的过程中,zookeeper会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名;LeaderLatch实现leader选举实际上基于临时顺序节点来做的。
四、Spark源码里面基于zookeeper的leader选举具体实现过程源码如下:
在master的OnStart方法里面
[Scala] 纯文本查看 复制代码
leaderElectionAgent = leaderElectionAgent_
实际是在构建zookeeper的持久化引擎的时候构建的:
[Scala] 纯文本查看 复制代码
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
在createLeaderElectionAgent方法里面构建了
[Scala] 纯文本查看 复制代码
new ZooKeeperLeaderElectionAgent(master, conf)
该对象继承了LeaderLatchListener,并且覆盖了notLeader和isLeader两个重要的方法具体。
在ZooKeeperLeaderElectionAgent构建的时候调用了自己的start方法,该方法构建了LeaderLatch,并添加ZooKeeperLeaderElectionAgent作为其listener。
[Scala] 纯文本查看 复制代码
private def start() {
logInfo("Starting ZooKeeper LeaderElection agent")
zk = SparkCuratorUtil.newClient(conf)
leaderLatch = new LeaderLatch(zk, WORKING_DIR)
leaderLatch.addListener(this)
leaderLatch.start()
}
Leader选举在zookeeper的临时节点的路径为:
[Scala] 纯文本查看 复制代码
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
执行ZooKeeperLeaderElectionAgent对象的start方法之后,每当该对象所在的master由standby变为Leader的时候,会调用isLeader()方法。由Leader变为StandBy的时候会调用notLeader()。我们就可以在这两个方法里实现自己要的状态切换的相关操作。
[Scala] 纯文本查看 复制代码
override def isLeader() {
synchronized {
// could have lost leadership by now.
if (!leaderLatch.hasLeadership) {
return
}
logInfo("We have gained leadership")
updateLeadershipStatus(true)
}
}
override def notLeader() {
synchronized {
// could have gained leadership by now.
if (leaderLatch.hasLeadership) {
return
}
logInfo("We have lost leadership")
updateLeadershipStatus(false)
}
}
要实现我们自己应用,也可基于此方法。