val serializer = new JavaSerializer(conf)
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
property | Meaning |
spark.deploy.recoveryMode | FILESYSTEM |
spark.deploy.recoveryDirectory | 用来恢复状态的目录 |
property | Meaning |
spark.deploy.recoveryMode | ZOOKEEPER |
spark.deploy.zookeeper.url | e.g., 192.168.1.100:2181,192.168.1.101:2181 |
spark.deploy.zookeeper.dir | zookeeper保存恢复状态的目录 |
leaderElectionAgent = leaderElectionAgent_
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
new ZooKeeperLeaderElectionAgent(master, conf)
private def start() {
logInfo("Starting ZooKeeper LeaderElection agent")
zk = SparkCuratorUtil.newClient(conf)
leaderLatch = new LeaderLatch(zk, WORKING_DIR)
leaderLatch.addListener(this)
leaderLatch.start()
}
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
override def isLeader() {
synchronized {
// could have lost leadership by now.
if (!leaderLatch.hasLeadership) {
return
}
logInfo("We have gained leadership")
updateLeadershipStatus(true)
}
}
override def notLeader() {
synchronized {
// could have gained leadership by now.
if (leaderLatch.hasLeadership) {
return
}
logInfo("We have lost leadership")
updateLeadershipStatus(false)
}
}
欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) | 黑马程序员IT技术论坛 X3.2 |