Yarn的启动 1. 对于任务的yarn服务的启动当然要从它的启动脚本start-yarn.sh中进行分析,可以看到分别执行了yarn-daemon.sh、yarn-daemons.sh
# start resourceManager
"$bin"/yarn-daemon.sh --config $YARN_CONF_DIR start resourcemanager
# start nodeManager
"$bin"/yarn-daemons.sh --config $YARN_CONF_DIR start nodemanager
2. 仔细阅读脚本(本文就不对脚本的细节部分做过多的讲解了),不难找到在脚本中有这么一句话,可知实际上执行的是bin目录下yarn这一个文件 nohup nice -n $YARN_NICENESS "$HADOOP_YARN_HOME"/bin/yarn --config $YARN_CONF_DIR $command "$@" > "$log" 2>&1 < /dev/null &3. 打开这个文件,去找传进去的resourcemanager参数和nodemanager参数对应的执行类,能够清楚的观察到分别执行了NodeManager和ResourceManager
elif [ "$COMMAND" = "nodemanager" ] ; then
CLASSPATH=${CLASSPATH}:$YARN_CONF_DIR/nm-config/log4j.properties
CLASS='org.apache.hadoop.yarn.server.nodemanager.NodeManager'
YARN_OPTS="$YARN_OPTS -server $YARN_NODEMANAGER_OPTS"
elif [ "$COMMAND" = "resourcemanager" ] ; then
CLASSPATH=${CLASSPATH}:$YARN_CONF_DIR/rm-config/log4j.properties
CLASS='org.apache.hadoop.yarn.server.resourcemanager.ResourceManager'
YARN_OPTS="$YARN_OPTS $YARN_RESOURCEMANAGER_OPTS"
ResourceManager那我们直接从ResourceManager(后文简称RM)的mian方法来分析RM的启动,main方法中很容易就能观察到两个重要的方法
//会实现本类中的serviceInit方法
resourceManager.init(conf);
resourceManager.start();
实际上此处是调用父类AbstractService的init()方法,再内部实现serviceInit方法,再有匹配的实现类来达到初始化的目的。AbstractService的init()方法非常重要,基本所有重要服务的初始化方法都是调用该方法实现的。
那我们就来看一下这个方法:
public void init(Configuration conf) {
//服务配置是否为空
if (conf == null) {
throw new ServiceStateException("Cannot initialize service "
+ getName() + ": null configuration");
}
//服务是否已经初始化
if (isInState(STATE.INITED)) {
return;
}
synchronized (stateChangeLock) {
/*相当于对服务做一次校验,确保服务初始化成功*/
if (enterState(STATE.INITED) != STATE.INITED) {
setConfig(conf);
try {
//服务初始化,会进入子类的同名函数
serviceInit(config);
if (isInState(STATE.INITED)) {
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}
再来看一下start方法它同init()方法一样,都是去调用父类AbstractService的start()方法,然后调用内部的serviceStart()方法由对应的实现类来实现的,与init()方法一样,许多重要服务的start方法都是采用这样的模式实现的。
public void start() {
if (isInState(STATE.STARTED)) {
return;
}
synchronized (stateChangeLock) {
if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
try {
startTime = System.currentTimeMillis();
if (isInState(STATE.STARTED)) {
serviceStart();
if (LOG.isDebugEnabled()) {
LOG.debug("Service " + getName() + " is started");
}
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}
那我们去看一下具体实现的方法先从ResourceManager.serviceInit方法看起,充分理解服务启动时各个成员变量以及相关服务的初始化对于我们分析整篇yarn相关的内容是很有帮助的。在初始化方法中有一个很重要的方法createAndInitActiveServices(),具体的作用在下面的代码块中做了注释。该方法内部实现了许多重要服务的初始化,在这里我们就不做这些服务一一介绍了,在后文分析中遇到使用该类的地方时,会顺带介绍初始化的相关方法。其实通过研究RM的初始化代码不难发现RM就像一个由许多子服务类组成的大服务类,所以RM就通过这些子服务来管理整个Yarn。
// load core-site.xml
// load yarn-site.xml
//前面的代码关键就做了初始化配置文件,读取配置文件,设置配置文件上下文这类的事情,就不多做分析了。
// 注册了一个调度器,用于内部事件调度处理
rmDispatcher = setupDispatcher();
addIfService(rmDispatcher);
rmContext.setDispatcher(rmDispatcher);
//这个方法内部实现了许多重要的服务初始化的过程,其实真正需要分析的就是这个方法。
//这是由RM的一个内部类RMActiveServices实现的。
createAndInitActiveServices();
super.serviceInit(this.conf);
在本文最后一节会介绍一个关键类的初始化,这是NM与RM保持心跳的关键类,它的初始化方法也是一些配置参数的初始化
resourceTracker = createResourceTrackerService();
addService(resourceTracker);
rmContext.setResourceTrackerService(resourceTracker);
接下来看一下ResourceManager.serviceStart()
protected void serviceStart() throws Exception {
if (this.rmContext.isHAEnabled()) {
transitionToStandby(true);
} else {
transitionToActive();
}
startWepApp();
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER,false)) {
int port = webApp.port();
WebAppUtils.setRMWebAppPort(conf, port);
}
super.serviceStart();
}
本文不分析HA环境,所以我们直接看transitionToActive(),顺着该方法往里走,最终调用的是AsyncDispatcher.serviceStart(),内部新建了一个死循环线程,用来处理阻塞队列中的各个事件,最终会调用createThread()
Runnable createThread(){
return new Runnable(){
@Override
public void run(){
while(!stopped&&!Thread.currentThread().isInterrupted()){
drained=eventQueue.isEmpty();
if(blockNewEvents){
synchronized(waitForDrained){
if(drained){
waitForDrained.notify();
}
}
}
Event event;
try{
event=eventQueue.take();
}catch(InterruptedException ie){
if(!stopped){
LOG.warn("AsyncDispatcher thread interrupted",ie);
}
return;
}
if(event!=null){
dispatch(event);
}
}
}
}
}
最终看super.serviceStart(),内部就是将每一个service启动,至此RM的启动就完毕了。(每一个具体服务的启动在后文介绍的时候会在使用到的地方再做解释)
protected void serviceStart() throws Exception {
List<Service> services = getServices();
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": starting services, size=" + services.size());
}
for (Service service : services) {
service.start();
}
super.serviceStart();
}
NodeManager先来看一下main()方法
public static void main(String[] args) throws IOException {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
NodeManager nodeManager = new NodeManager();
Configuration conf = new YarnConfiguration();
new GenericOptionsParser(conf, args);
nodeManager.initAndStartNodeManager(conf, false);
}
调用了initAndStartNodeManager(conf, false),其实这个方法就是执行了init(),以及start(),所以我们先来看serviceInit()方法,同RM一样,做了许多重要方法的初始化,此篇文章要着重介绍NodeStatusUpdaterImpl,可以看到该初始化方法中new了一个NodeStatusUpdaterImpl,一起在RM章节中我们已经介绍了,后文的start方法会逐一启动这些初始化过的服务,所以不再赘述,我们直接去看NodeStatusUpdaterImpl的初始化方法和启动方法
nodeStatusUpdater =createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,metrics);
}
大致浏览一下初始化方法,大多都是配置文件的读取(都是关于节点资源信息的配置文件),而后去关注重点的start方法
this.nodeId = this.context.getNodeId();
this.httpPort = this.context.getHttpPort();
this.nodeManagerVersionId = YarnVersionInfo.getVersion();
try {
//注册启动以保证ContainerManager可以获得每一台NM的标记
this.resourceTracker = getRMClient();
//配置必要配置信息,和安全认证操作利用Hadoop RPC远程调用RM端ResourcesTrackerService下的registerNodeManager()方法
// 详细见后面ResourcesTrackerService下的registerNodeManager()代码分析
registerWithRM();
super.serviceStart();
// 创建一个线程,然后启动,所有操作都在运行while的循环中
//设置、获取和输出必要配置信息,其中比较重要的调用getNodeStatus()方法,
// 获取本地Container和本地Node的状态,以供后面的nodeHeartbeat()方法使用
//通过Hadoop RPC远程调用RM端ResourcesTrackerService下的nodeHeartbeat()函数,
// 用while循环以一定时间间隔向RM发送心跳信息,心跳操作见下面ResourcesTrackerService下的nodeHeartbeat()函数
//nodeHeartbeat()将返回给NM信息,根据返回的response,
// 根据response返回的信息标记不需要的Container和Application发送相应的
// FINISH_CONTAINERS和 FINISH_APPS给ContainerManager,进行清理操作----详细见后面的代码分析
startStatusUpdater();
} catch (Exception e) {
String errorMessage = "Unexpected error starting NodeStatusUpdater";
LOG.error(errorMessage, e);
throw new YarnRuntimeException(e);
}
可归纳出两个重要的方法registerWithRM(),startStatusUpdater(),这两个方法会通过RPC远程调用上文说过的等待被调用ResourceTracker中的两个接口 registerNodeManager()和nodeHeartbeat()
NM与RM间的心跳通信从上文可知registerWithRM()会通过rpc通信调用ResourceTracker中的接口 registerNodeManager(),所以我们直接去看这个 registerNodeManager(),内部做了一些节点健康信息的检查,资源信息的检查,从下列代码中可以看到还会检查该node是否是新的node,如果是就会触发STARTRED时间,是,如果不是在触发RECONNECTED事件
RMNode oldNode=this.rmContext.getRMNodes().putIfAbsent(nodeId,rmNode);
if(oldNode==null) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeStartedEvent(nodeId,request.getNMContainerStatuses(),
request.getRunningApplications()));
} else {
LOG.info("Reconnect from the node at: "+host);
this.nmLivelinessMonitor.unregister(nodeId);
oldNode.resetLastNodeHeartBeatResponse();
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(
new RMNodeReconnectEvent(nodeId,rmNode,request
.getRunningApplications(),request.getNMContainerStatuses()));
}
那我们此次分析的是NM的启动,自然去分析STARTED事件,状态发生了转移触发AddNodeTransition函数,该函数内显示的添加了active nodes,并向调度器发送了一个事件NODE_ADDED,告诉调度器有节点增加,用于计算集群中总共可调用的资源,到此节点就已经注册成功了。
//资源判断
root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
int numNodes = numNodeManagers.incrementAndGet();
updateMaximumAllocation(schedulerNode, true);
然后代码往后走,调用了ResourceTracker中的接 口nodeHeartbeat(),心跳方法中最终要的就是提交了一个STATUS_UPDATE事件
// 4. Send status to RMNode, saving the latest response.
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
remoteNodeStatus.getContainersStatuses(),
remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse));
从下面代码中可以看出该事件对应的方法
.addTransition(NodeState.RUNNING,
EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
在没有任务提交时,心跳方法内部就是汇报了自己节点资源的可用信息和已用信息以及汇报健康状态,最后返回RUNNING状态,表示NM正常运行中,整个心跳方法是卸载while循环里的,没有异常就不会退出
//TODO:对container进行分组,分为a)刚申请到还未使用的 b)运行完毕需要回收的
rmNode.handleContainerStatus(statusEvent.getContainers());
if(rmNode.nextHeartBeat) {
rmNode.nextHeartBeat = false;
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeUpdateSchedulerEvent(rmNode));
}
// Update DTRenewer in secure mode to keep these apps alive. Today this is
// needed for log-aggregation to finish long after the apps are gone.
if (UserGroupInformation.isSecurityEnabled()) {
rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
statusEvent.getKeepAliveAppIds());
}
return NodeState.RUNNING;
|