Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2011-2013 the original author or authors.
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *      http://www.apache.org/licenses/LICENSE-2.0
   *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.springframework.data.hadoop.batch.mapreduce;
 
 
Batch tasklet for executing one Hadoop job. Can be configured to not wait for the job to finish - by default the tasklet waits for the job submitted to finish.

Author(s):
Costin Leau
Thomas Risberg
 
 public class JobTasklet extends JobExecutor implements Tasklet {
 
 
 	public RepeatStatus execute(final StepContribution contributionChunkContext chunkContextthrows Exception {
 
 		StepContext context = StepSynchronizationManager.getContext();
 		final StepExecution stepExecution = (context != null) ? context.getStepExecution() : null;
 
 		final AtomicBoolean done = new AtomicBoolean(false);
 
 		final JobListener jobListener = new JobListener() {
 
 			public Object beforeAction() {
 				// double check the underlying thread to see whether it is aware of a step execution
 				if (StepSynchronizationManager.getContext() == null) {
 					StepSynchronizationManager.register(stepExecution);
 					return .;
 				}
 				return .;
 			}
 
 			public void afterAction(Object state) {
 				if (..equals(state)) {
 					StepSynchronizationManager.close();
 				}
 				done.set(true);
 				synchronized (done) {
 					done.notify();
 				}
 			}
 
 			public void jobKilled(Job job) {
 				saveCounters(jobcontribution);
 				saveJobStats(jobstepExecution);
 			}
 
 			public void jobFinished(Job job) {
 				saveCounters(jobcontribution);
 				saveJobStats(jobstepExecution);
 			}
 		};
 
 		startJobs(jobListener);
 
 		boolean stopped = false;
 		// check status (if we have to wait)
 		if (isWaitForCompletion()) {
 			while (!done.get() && !stopped) {
 				if (stepExecution.isTerminateOnly()) {
 					.info("Cancelling job tasklet");
 					stopped = true;
 					stopJobs(jobListener);
					// wait for stopping to properly occur
					while (!done.get()) {
						synchronized (done) {
							done.wait();
						}
					}
				}
				else {
					// wait a bit more then the internal hadoop threads
					Thread.sleep(5500);
				}
			}
		}
	}
	@SuppressWarnings("deprecation")
	private void saveCounters(Job jobStepContribution contribution) {
		Counters counters = null;
		try {
			counters = job.getCounters();
catch (Exception ex) {
				throw (RuntimeException)ex;
else {
				// ignore - we just can't get stats
			}
		}
		if (counters == null) {
			return;
		}
		// TODO: remove deprecation suppress when we don't want to rely on org.apache.hadoop.mapred
		for (int i = 0; i < safeLongToInt(count.getValue()); i++) {
			contribution.incrementReadCount();
		}
		for (int i = 0; i < safeLongToInt(count.getValue()); i++) {
		}
	}
	private static void saveJobStats(Job jobStepExecution stepExecution) {
		if (stepExecution == null) {
			return;
		}
		ExecutionContext executionContext = stepExecution.getExecutionContext();
		String statusPrefix = "Job Status::";
		executionContext.put(statusPrefix + "ID", JobUtils.getJobId(job).toString());
		executionContext.put(statusPrefix + "Name"job.getJobName());
		executionContext.put(statusPrefix + "Tracking URL"job.getTrackingURL());
		executionContext.put(statusPrefix + "State", JobUtils.getStatus(job).toString());
		try {
			for (String cgName : job.getCounters().getGroupNames()) {
				CounterGroup group = job.getCounters().getGroup(cgName);
				Iterator<Counterci = group.iterator();
				while (ci.hasNext()) {
					Counter c = ci.next();
					executionContext.put(group.getDisplayName().trim() + "::" + c.getDisplayName().trim(), c.getValue());
				}
			}
catch (Exception ignore) {}
	}
	static int safeLongToInt(long l) {
		if (l < . || l > .) {
			throw new IllegalArgumentException(l + " cannot be cast to int without changing its value.");
		}
		return (intl;
	}
New to GrepCode? Check out our FAQ X