Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Copyright (C) 2014 Microsoft Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 package com.microsoft.reef.runtime.yarn.client;
 
 
 import java.io.File;
 import java.util.List;
 import java.util.Map;
 
 final class YarnJobSubmissionHandler implements JobSubmissionHandler {
 
   private static final Logger LOG = Logger.getLogger(YarnJobSubmissionHandler.class.getName());
 
   private final YarnConfiguration yarnConfiguration;
   private final YarnClient yarnClient;
   private final JobJarMaker jobJarMaker;
   private final REEFFileNames filenames;
   private final REEFClasspath classpath;
   private final FileSystem fileSystem;
   private final double jvmSlack;
 
   @Inject
       final YarnConfiguration yarnConfiguration,
       final JobJarMaker jobJarMaker,
       final REEFFileNames filenames,
       final YarnClasspath classpath,
       final ConfigurationSerializer configurationSerializer,
       final @Parameter(JVMHeapSlack.classdouble jvmSlackthrows IOException {
 
     this. = yarnConfiguration;
     this. = jobJarMaker;
     this. = filenames;
     this. = classpath;
     this. = configurationSerializer;
     this. = jvmSlack;
 
     this. = FileSystem.get(yarnConfiguration);
 
     this. = YarnClient.createYarnClient();
     this..init(this.);
     this..start();
   }
 
   @Override
   public void close() {
     this..stop();
   }
  public void onNext(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
    .log(."Submitting job with ID [{0}]"jobSubmissionProto.getIdentifier());
    try {
      .log(."Requesting Application ID from YARN.");
      final YarnClientApplication yarnClientApplication = this..createApplication();
      final GetNewApplicationResponse applicationResponse = yarnClientApplication.getNewApplicationResponse();
      final ApplicationSubmissionContext applicationSubmissionContext =
          yarnClientApplication.getApplicationSubmissionContext();
      final ApplicationId applicationId = applicationSubmissionContext.getApplicationId();
      .log(."YARN Application ID: {0}"applicationId);
      // set the application name
      applicationSubmissionContext.setApplicationName(
          "reef-job-" + jobSubmissionProto.getIdentifier());
      .log(."Assembling submission JAR for the Driver.");
      final Path submissionFolder = new Path(
          "/tmp/" + this..getJobFolderPrefix() + applicationId.getId() + "/");
      final Configuration driverConfiguration =
          makeDriverConfiguration(jobSubmissionProtosubmissionFolder);
      final File jobSubmissionFile =
          this..createJobSubmissionJAR(jobSubmissionProtodriverConfiguration);
      final Path uploadedJobJarPath = this.uploadToJobFolder(jobSubmissionFilesubmissionFolder);
      final Map<StringLocalResourceresources = new HashMap<>(1);
      resources.put(this..getREEFFolderName(),
          this.makeLocalResourceForJarFile(uploadedJobJarPath));
      // SET MEMORY RESOURCE
      final int amMemory = getMemory(
          jobSubmissionProtoapplicationResponse.getMaximumResourceCapability().getMemory());
      applicationSubmissionContext.setResource(Resource.newInstance(amMemory, 1));
      // SET EXEC COMMAND
      final List<StringlaunchCommand = new JavaLaunchCommandBuilder()
          .setErrorHandlerRID(jobSubmissionProto.getRemoteId())
          .setLaunchID(jobSubmissionProto.getIdentifier())
          .setClassPath(this..getClasspath())
          .setMemory(amMemory)
          .build();
      applicationSubmissionContext.setAMContainerSpec(
          YarnTypes.getContainerLaunchContext(launchCommandresources));
      applicationSubmissionContext.setPriority(getPriority(jobSubmissionProto));
      // Set the queue to which this application is to be submitted in the RM
      applicationSubmissionContext.setQueue(getQueue(jobSubmissionProto"default"));
      .log(."Submitting REEF Application to YARN. ID: {0}"applicationId);
      if (.isLoggable(.)) {
        .log(."REEF app command: {0}", StringUtils.join(launchCommand' '));
      }
      this..submitApplication(applicationSubmissionContext);
    } catch (final YarnException | IOException e) {
      throw new RuntimeException("Unable to submit Driver to YARN."e);
    }
  }

  
Assembles the Driver configuration.
      final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto,
      final Path jobFolderPaththrows IOException {
    return Configurations.merge(
            .set(.jobFolderPath.toString())
            .set(.jobSubmissionProto.getIdentifier())
            .set(.jobSubmissionProto.getRemoteId())
            .set(.this.)
            .build(),
        this..fromString(jobSubmissionProto.getConfiguration()));
  }
  private final Path uploadToJobFolder(final File filefinal Path jobFolderthrows IOException {
    final Path source = new Path(file.getAbsolutePath());
    final Path destination = new Path(jobFolderfile.getName());
    .log(."Uploading {0} to {1}"new Object[]{sourcedestination});
    this..copyFromLocalFile(falsetruesourcedestination);
    return destination;
  }
  private Priority getPriority(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
    return Priority.newInstance(
        jobSubmissionProto.hasPriority() ? jobSubmissionProto.getPriority() : 0);
  }

  
Extract the queue name from the jobSubmissionProto or return default if none is set.

TODO: Revisit this. We also have a named parameter for the queue in YarnClientConfiguration.

  private final String getQueue(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto,
                                final String defaultQueue) {
    return jobSubmissionProto.hasQueue() && !jobSubmissionProto.getQueue().isEmpty() ?
        jobSubmissionProto.getQueue() : defaultQueue;
  }

  
Extract the desired driver memory from jobSubmissionProto.

returns maxMemory if that desired amount is more than maxMemory

  private int getMemory(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto,
                        final int maxMemory) {
    final int amMemory;
    final int requestedMemory = jobSubmissionProto.getDriverMemory();
    if (requestedMemory <= maxMemory) {
      amMemory = requestedMemory;
    } else {
      .log(.,
          "Requested {0}MB of memory for the driver. " +
              "The max on this YARN installation is {1}. " +
              "Using {1} as the memory for the driver.",
          new Object[]{requestedMemorymaxMemory});
      amMemory = maxMemory;
    }
    return amMemory;
  }

  
Creates a LocalResource instance for the JAR file referenced by the given Path
  private LocalResource makeLocalResourceForJarFile(final Path paththrows IOException {
    final LocalResource localResource = Records.newRecord(LocalResource.class);
    final FileStatus status = FileContext.getFileContext(.getUri()).getFileStatus(path);
    localResource.setType(.);
    localResource.setResource(ConverterUtils.getYarnUrlFromPath(status.getPath()));
    localResource.setTimestamp(status.getModificationTime());
    localResource.setSize(status.getLen());
    return localResource;
  }
New to GrepCode? Check out our FAQ X