A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

AM向RM提交app:

submitapp:



  • private void submitApp(ReservationId reservationId)



  •           throws YarnException, InterruptedException, IOException {



  •     // ask for new application



  •     GetNewApplicationRequest newAppRequest =



  •         Records.newRecord(GetNewApplicationRequest.class);



  •     GetNewApplicationResponse newAppResponse =



  •         rm.getClientRMService().getNewApplication(newAppRequest);



  •     appId = newAppResponse.getApplicationId();







  •     // submit the application



  •     final SubmitApplicationRequest subAppRequest =



  •         Records.newRecord(SubmitApplicationRequest.class);



  •     ApplicationSubmissionContext appSubContext =



  •         Records.newRecord(ApplicationSubmissionContext.class);



  •     appSubContext.setApplicationId(appId);



  •     appSubContext.setMaxAppAttempts(1);



  •     appSubContext.setQueue(queue);



  •     appSubContext.setPriority(Priority.newInstance(0));



  •     ContainerLaunchContext conLauContext =



  •         Records.newRecord(ContainerLaunchContext.class);



  •     conLauContext.setApplicationACLs(new HashMap<>());



  •     conLauContext.setCommands(new ArrayList<>());



  •     conLauContext.setEnvironment(new HashMap<>());



  •     conLauContext.setLocalResources(new HashMap<>());



  •     conLauContext.setServiceData(new HashMap<>());



  •     appSubContext.setAMContainerSpec(conLauContext);



  •     appSubContext.setResource(amContainerResource);







  •     if(reservationId != null) {



  •       appSubContext.setReservationID(reservationId);



  •     }







  •     subAppRequest.setApplicationSubmissionContext(appSubContext);



  •     UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);



  •     ugi.doAs(new PrivilegedExceptionAction<Object>() {



  •       @Override



  •       public Object run() throws YarnException, IOException {



  •         rm.getClientRMService().submitApplication(subAppRequest);



  •         return null;



  •       }



  •     });



  •     LOG.info("Submit a new application {}", appId);



  •   }



final SubmitApplicationRequest subAppRequest中的subAppRequest就是传递给

getClientRMService().submitApplication(subAppRequest)的最终标准格式。


注意这个:

ApplicationSubmissionContext appSubContext

这就是ApplicationSubmissionContext协议类的对象。

该类在hadoop-yarn-api/src/main/proto/yarn_protos.proto中实现:



  • ////////////////////////////////////////////////////////////////////////



  • ////// From client_RM_Protocol /////////////////////////////////////////



  • ////////////////////////////////////////////////////////////////////////



  • message ApplicationSubmissionContextProto {



  •   optional ApplicationIdProto application_id = 1;



  •   optional string application_name = 2 [default = "N/A"];



  •   optional string queue = 3 [default = "default"];



  •   optional PriorityProto priority = 4;



  •   optional ContainerLaunchContextProto am_container_spec = 5;



  •   optional bool cancel_tokens_when_complete = 6 [default = true];



  •   optional bool unmanaged_am = 7 [default = false];



  •   optional int32 maxAppAttempts = 8 [default = 0];



  •   optional ResourceProto resource = 9;



  •   optional string applicationType = 10 [default = "YARN"];



  •   optional bool keep_containers_across_application_attempts = 11 [default = false];



  •   repeated string applicationTags = 12;



  •   optional int64 attempt_failures_validity_interval = 13 [default = -1];



  •   optional LogAggregationContextProto log_aggregation_context = 14;



  •   optional ReservationIdProto reservation_id = 15;



  •   optional string node_label_expression = 16;



  •   repeated ResourceRequestProto am_container_resource_request = 17;



  •   repeated ApplicationTimeoutMapProto application_timeouts = 18;



  •   repeated StringStringMapProto application_scheduling_properties = 19;



  • }



而在submitapp函数中,对appSubContext对象也初始化了:



  •    ApplicationSubmissionContext appSubContext =



  •         Records.newRecord(ApplicationSubmissionContext.class);



  •     appSubContext.setApplicationId(appId);



  •     appSubContext.setMaxAppAttempts(1);



  •     appSubContext.setQueue(queue);



  •     appSubContext.setPriority(Priority.newInstance(0));



  •     ContainerLaunchContext conLauContext =



  •         Records.newRecord(ContainerLaunchContext.class);



  •     conLauContext.setApplicationACLs(new HashMap<>());



  •     conLauContext.setCommands(new ArrayList<>());



  •     conLauContext.setEnvironment(new HashMap<>());



  •     conLauContext.setLocalResources(new HashMap<>());



  •     conLauContext.setServiceData(new HashMap<>());



  •     appSubContext.setAMContainerSpec(conLauContext);



  •     appSubContext.setResource(amContainerResource);



初始化好后,通过setApplicationSubmissionContext(appSubContext)传递给subAppRequest,然后调用:

rm.getClientRMService().submitApplication(subAppRequest);方法传递给RM。




AM向RM请求资源:

在middleStep中的MRAMSimulator.java的sendContainerRequest中调用的packageRequests方法,该方法就是将请求打包成ResourceRequest协议格式的:



  •   protected List<ResourceRequest> packageRequests(



  •           List<ContainerSimulator> csList, int priority) {



  •     // create requests



  •     Map<String, ResourceRequest> rackLocalRequestMap = new HashMap<String, ResourceRequest>();



  •     Map<String, ResourceRequest> nodeLocalRequestMap = new HashMap<String, ResourceRequest>();



  •     ResourceRequest anyRequest = null;



  •     for (ContainerSimulator cs : csList) {



  •       if (cs.getHostname() != null) {



  •         String[] rackHostNames = SLSUtils.getRackHostName(cs.getHostname());



  •         // check rack local



  •         String rackname = "/" + rackHostNames[0];



  •         if (rackLocalRequestMap.containsKey(rackname)) {



  •           rackLocalRequestMap.get(rackname).setNumContainers(



  •               rackLocalRequestMap.get(rackname).getNumContainers() + 1);



  •         } else {



  •           ResourceRequest request = createResourceRequest(cs.getResource(),



  •               cs.getExecutionType(), rackname, priority, 1);



  •           rackLocalRequestMap.put(rackname, request);



  •         }



  •         // check node local



  •         String hostname = rackHostNames[1];



  •         if (nodeLocalRequestMap.containsKey(hostname)) {



  •           nodeLocalRequestMap.get(hostname).setNumContainers(



  •               nodeLocalRequestMap.get(hostname).getNumContainers() + 1);



  •         } else {



  •           ResourceRequest request = createResourceRequest(cs.getResource(),



  •               cs.getExecutionType(), hostname, priority, 1);



  •           nodeLocalRequestMap.put(hostname, request);



  •         }



  •       }



  •       // any



  •       if (anyRequest == null) {



  •         anyRequest = createResourceRequest(cs.getResource(),



  •             cs.getExecutionType(), ResourceRequest.ANY, priority, 1);



  •       } else {



  •         anyRequest.setNumContainers(anyRequest.getNumContainers() + 1);



  •       }



  •     }



  •     List<ResourceRequest> ask = new ArrayList<ResourceRequest>();



  •     ask.addAll(nodeLocalRequestMap.values());



  •     ask.addAll(rackLocalRequestMap.values());



  •     if (anyRequest != null) {



  •       ask.add(anyRequest);



  •     }



  •     return ask;



  •   }


首先创建request:



  • Map<String, ResourceRequest> rackLocalRequestMap = new HashMap<String, ResourceRequest>();



  • Map<String, ResourceRequest> nodeLocalRequestMap = new HashMap<String, ResourceRequest>();


然后开始对协议内容进行填充:比如添加rackname,numcontainer等



  • for (ContainerSimulator cs : csList) {



  •       if (cs.getHostname() != null) {



  •         String[] rackHostNames = SLSUtils.getRackHostName(cs.getHostname());



  •         // check rack local



  •         String rackname = "/" + rackHostNames[0];



  •         if (rackLocalRequestMap.containsKey(rackname)) {



  •           rackLocalRequestMap.get(rackname).setNumContainers(



  •               rackLocalRequestMap.get(rackname).getNumContainers() + 1);



  •         } else {



  •           ResourceRequest request = createResourceRequest(cs.getResource(),



  •               cs.getExecutionType(), rackname, priority, 1);



  •           rackLocalRequestMap.put(rackname, request);



  •         }



  •         // check node local



  •         String hostname = rackHostNames[1];



  •         if (nodeLocalRequestMap.containsKey(hostname)) {



  •           nodeLocalRequestMap.get(hostname).setNumContainers(



  •               nodeLocalRequestMap.get(hostname).getNumContainers() + 1);



  •         } else {



  •           ResourceRequest request = createResourceRequest(cs.getResource(),



  •               cs.getExecutionType(), hostname, priority, 1);



  •           nodeLocalRequestMap.put(hostname, request);



  •         }



  •       }



  •       // any



  •       if (anyRequest == null) {



  •         anyRequest = createResourceRequest(cs.getResource(),



  •             cs.getExecutionType(), ResourceRequest.ANY, priority, 1);



  •       } else {



  •         anyRequest.setNumContainers(anyRequest.getNumContainers() + 1);



  •       }



  •     }



ResourceRequest格式的协议如下:


该类在hadoop-yarn-api/src/main/proto/yarn_protos.proto中实现:



  • ////////////////////////////////////////////////////////////////////////



  • ////// From AM_RM_Protocol /////////////////////////////////////////////



  • ////////////////////////////////////////////////////////////////////////



  • message ResourceRequestProto {



  •   optional PriorityProto priority = 1;



  •   optional string resource_name = 2;



  •   optional ResourceProto capability = 3;



  •   optional int32 num_containers = 4;



  •   optional bool relax_locality = 5 [default = true];



  •   optional string node_label_expression = 6;



  •   optional ExecutionTypeRequestProto execution_type_request = 7;



  •   optional int64 allocation_request_id = 8 [default = -1];



  • }



然后回到sendContainerRequest函数:

ask = packageRequests(mergeLists(pendingMaps, scheduledMaps),
            PRIORITY_MAP);

ask就是List<ResourceRequest> 格式。sendContainerRequest接着往下走:







  •     if (ask == null) {



  •       ask = new ArrayList<>();



  •     }







  •     final AllocateRequest request = createAllocateRequest(ask);



  •     if (totalContainers == 0) {



  •       request.setProgress(1.0f);



  •     } else {



  •       request.setProgress((float) finishedContainers / totalContainers);



  •     }







  •     UserGroupInformation ugi =



  •             UserGroupInformation.createRemoteUser(appAttemptId.toString());



  •     Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps()



  •             .get(appAttemptId.getApplicationId())



  •             .getRMAppAttempt(appAttemptId).getAMRMToken();



  •     ugi.addTokenIdentifier(token.decodeIdentifier());



  •     AllocateResponse response = ugi.doAs(



  •             new PrivilegedExceptionAction<AllocateResponse>() {



  •       @Override



  •       public AllocateResponse run() throws Exception {



  •         return rm.getApplicationMasterService().allocate(request);



  •       }



  •     });



  •     if (response != null) {



  •       responseQueue.put(response);



  •     }


可以看到,通过createAllocateRequest(ask)函数将ask(List<ResourceRequest> )转换为request(AllocateRequest)。



  •   protected AllocateRequest createAllocateRequest(List<ResourceRequest> ask,



  •       List<ContainerId> toRelease) {



  •     AllocateRequest allocateRequest =



  •             recordFactory.newRecordInstance(AllocateRequest.class);



  •     allocateRequest.setResponseId(responseId++);



  •     allocateRequest.setAskList(ask);



  •     allocateRequest.setReleaseList(toRelease);



  •     return allocateRequest;



  •   }


ask被最终包装成AllocateRequest类型的request,AllocateRequest就是AM向RM申请资源最终的格式。在真正的YARN中,AM与RM通信也是AllocateRequest格式。

这就是RM与AM获取资源通信的协议。包括container的个数等。



0 个回复

您需要登录后才可以回帖 登录 | 加入黑马