AM向RM提交app: submitapp:
private void submitApp(ReservationId reservationId)
throws YarnException, InterruptedException, IOException {
// ask for new application
GetNewApplicationRequest newAppRequest =
Records.newRecord(GetNewApplicationRequest.class);
GetNewApplicationResponse newAppResponse =
rm.getClientRMService().getNewApplication(newAppRequest);
appId = newAppResponse.getApplicationId();
// submit the application
final SubmitApplicationRequest subAppRequest =
Records.newRecord(SubmitApplicationRequest.class);
ApplicationSubmissionContext appSubContext =
Records.newRecord(ApplicationSubmissionContext.class);
appSubContext.setApplicationId(appId);
appSubContext.setMaxAppAttempts(1);
appSubContext.setQueue(queue);
appSubContext.setPriority(Priority.newInstance(0));
ContainerLaunchContext conLauContext =
Records.newRecord(ContainerLaunchContext.class);
conLauContext.setApplicationACLs(new HashMap<>());
conLauContext.setCommands(new ArrayList<>());
conLauContext.setEnvironment(new HashMap<>());
conLauContext.setLocalResources(new HashMap<>());
conLauContext.setServiceData(new HashMap<>());
appSubContext.setAMContainerSpec(conLauContext);
appSubContext.setResource(amContainerResource);
if(reservationId != null) {
appSubContext.setReservationID(reservationId);
}
subAppRequest.setApplicationSubmissionContext(appSubContext);
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws YarnException, IOException {
rm.getClientRMService().submitApplication(subAppRequest);
return null;
}
});
LOG.info("Submit a new application {}", appId);
}
final SubmitApplicationRequest subAppRequest中的subAppRequest就是传递给 getClientRMService().submitApplication(subAppRequest)的最终标准格式。
注意这个: ApplicationSubmissionContext appSubContext这就是ApplicationSubmissionContext协议类的对象。 该类在hadoop-yarn-api/src/main/proto/yarn_protos.proto中实现:
////////////////////////////////////////////////////////////////////////
////// From client_RM_Protocol /////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
message ApplicationSubmissionContextProto {
optional ApplicationIdProto application_id = 1;
optional string application_name = 2 [default = "N/A"];
optional string queue = 3 [default = "default"];
optional PriorityProto priority = 4;
optional ContainerLaunchContextProto am_container_spec = 5;
optional bool cancel_tokens_when_complete = 6 [default = true];
optional bool unmanaged_am = 7 [default = false];
optional int32 maxAppAttempts = 8 [default = 0];
optional ResourceProto resource = 9;
optional string applicationType = 10 [default = "YARN"];
optional bool keep_containers_across_application_attempts = 11 [default = false];
repeated string applicationTags = 12;
optional int64 attempt_failures_validity_interval = 13 [default = -1];
optional LogAggregationContextProto log_aggregation_context = 14;
optional ReservationIdProto reservation_id = 15;
optional string node_label_expression = 16;
repeated ResourceRequestProto am_container_resource_request = 17;
repeated ApplicationTimeoutMapProto application_timeouts = 18;
repeated StringStringMapProto application_scheduling_properties = 19;
}
而在submitapp函数中,对appSubContext对象也初始化了:
ApplicationSubmissionContext appSubContext =
Records.newRecord(ApplicationSubmissionContext.class);
appSubContext.setApplicationId(appId);
appSubContext.setMaxAppAttempts(1);
appSubContext.setQueue(queue);
appSubContext.setPriority(Priority.newInstance(0));
ContainerLaunchContext conLauContext =
Records.newRecord(ContainerLaunchContext.class);
conLauContext.setApplicationACLs(new HashMap<>());
conLauContext.setCommands(new ArrayList<>());
conLauContext.setEnvironment(new HashMap<>());
conLauContext.setLocalResources(new HashMap<>());
conLauContext.setServiceData(new HashMap<>());
appSubContext.setAMContainerSpec(conLauContext);
appSubContext.setResource(amContainerResource);
初始化好后,通过setApplicationSubmissionContext(appSubContext)传递给subAppRequest,然后调用: rm.getClientRMService().submitApplication(subAppRequest);方法传递给RM。
AM向RM请求资源: 在middleStep中的MRAMSimulator.java的sendContainerRequest中调用的packageRequests方法,该方法就是将请求打包成ResourceRequest协议格式的:
protected List<ResourceRequest> packageRequests(
List<ContainerSimulator> csList, int priority) {
// create requests
Map<String, ResourceRequest> rackLocalRequestMap = new HashMap<String, ResourceRequest>();
Map<String, ResourceRequest> nodeLocalRequestMap = new HashMap<String, ResourceRequest>();
ResourceRequest anyRequest = null;
for (ContainerSimulator cs : csList) {
if (cs.getHostname() != null) {
String[] rackHostNames = SLSUtils.getRackHostName(cs.getHostname());
// check rack local
String rackname = "/" + rackHostNames[0];
if (rackLocalRequestMap.containsKey(rackname)) {
rackLocalRequestMap.get(rackname).setNumContainers(
rackLocalRequestMap.get(rackname).getNumContainers() + 1);
} else {
ResourceRequest request = createResourceRequest(cs.getResource(),
cs.getExecutionType(), rackname, priority, 1);
rackLocalRequestMap.put(rackname, request);
}
// check node local
String hostname = rackHostNames[1];
if (nodeLocalRequestMap.containsKey(hostname)) {
nodeLocalRequestMap.get(hostname).setNumContainers(
nodeLocalRequestMap.get(hostname).getNumContainers() + 1);
} else {
ResourceRequest request = createResourceRequest(cs.getResource(),
cs.getExecutionType(), hostname, priority, 1);
nodeLocalRequestMap.put(hostname, request);
}
}
// any
if (anyRequest == null) {
anyRequest = createResourceRequest(cs.getResource(),
cs.getExecutionType(), ResourceRequest.ANY, priority, 1);
} else {
anyRequest.setNumContainers(anyRequest.getNumContainers() + 1);
}
}
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ask.addAll(nodeLocalRequestMap.values());
ask.addAll(rackLocalRequestMap.values());
if (anyRequest != null) {
ask.add(anyRequest);
}
return ask;
}
首先创建request:
Map<String, ResourceRequest> rackLocalRequestMap = new HashMap<String, ResourceRequest>();
Map<String, ResourceRequest> nodeLocalRequestMap = new HashMap<String, ResourceRequest>();
然后开始对协议内容进行填充:比如添加rackname,numcontainer等
for (ContainerSimulator cs : csList) {
if (cs.getHostname() != null) {
String[] rackHostNames = SLSUtils.getRackHostName(cs.getHostname());
// check rack local
String rackname = "/" + rackHostNames[0];
if (rackLocalRequestMap.containsKey(rackname)) {
rackLocalRequestMap.get(rackname).setNumContainers(
rackLocalRequestMap.get(rackname).getNumContainers() + 1);
} else {
ResourceRequest request = createResourceRequest(cs.getResource(),
cs.getExecutionType(), rackname, priority, 1);
rackLocalRequestMap.put(rackname, request);
}
// check node local
String hostname = rackHostNames[1];
if (nodeLocalRequestMap.containsKey(hostname)) {
nodeLocalRequestMap.get(hostname).setNumContainers(
nodeLocalRequestMap.get(hostname).getNumContainers() + 1);
} else {
ResourceRequest request = createResourceRequest(cs.getResource(),
cs.getExecutionType(), hostname, priority, 1);
nodeLocalRequestMap.put(hostname, request);
}
}
// any
if (anyRequest == null) {
anyRequest = createResourceRequest(cs.getResource(),
cs.getExecutionType(), ResourceRequest.ANY, priority, 1);
} else {
anyRequest.setNumContainers(anyRequest.getNumContainers() + 1);
}
}
ResourceRequest格式的协议如下:
该类在hadoop-yarn-api/src/main/proto/yarn_protos.proto中实现:
////////////////////////////////////////////////////////////////////////
////// From AM_RM_Protocol /////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
message ResourceRequestProto {
optional PriorityProto priority = 1;
optional string resource_name = 2;
optional ResourceProto capability = 3;
optional int32 num_containers = 4;
optional bool relax_locality = 5 [default = true];
optional string node_label_expression = 6;
optional ExecutionTypeRequestProto execution_type_request = 7;
optional int64 allocation_request_id = 8 [default = -1];
}
然后回到sendContainerRequest函数: ask = packageRequests(mergeLists(pendingMaps, scheduledMaps),
PRIORITY_MAP); ask就是List<ResourceRequest> 格式。sendContainerRequest接着往下走:
if (ask == null) {
ask = new ArrayList<>();
}
final AllocateRequest request = createAllocateRequest(ask);
if (totalContainers == 0) {
request.setProgress(1.0f);
} else {
request.setProgress((float) finishedContainers / totalContainers);
}
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(appAttemptId.toString());
Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps()
.get(appAttemptId.getApplicationId())
.getRMAppAttempt(appAttemptId).getAMRMToken();
ugi.addTokenIdentifier(token.decodeIdentifier());
AllocateResponse response = ugi.doAs(
new PrivilegedExceptionAction<AllocateResponse>() {
@Override
public AllocateResponse run() throws Exception {
return rm.getApplicationMasterService().allocate(request);
}
});
if (response != null) {
responseQueue.put(response);
}
可以看到,通过createAllocateRequest(ask)函数将ask(List<ResourceRequest> )转换为request(AllocateRequest)。
protected AllocateRequest createAllocateRequest(List<ResourceRequest> ask,
List<ContainerId> toRelease) {
AllocateRequest allocateRequest =
recordFactory.newRecordInstance(AllocateRequest.class);
allocateRequest.setResponseId(responseId++);
allocateRequest.setAskList(ask);
allocateRequest.setReleaseList(toRelease);
return allocateRequest;
}
ask被最终包装成AllocateRequest类型的request,AllocateRequest就是AM向RM申请资源最终的格式。在真正的YARN中,AM与RM通信也是AllocateRequest格式。 这就是RM与AM获取资源通信的协议。包括container的个数等。
|