Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * #%L
   * Talend ESB :: Camel Talend Job Component
   * %%
   * Copyright (C) 2011 - 2014 Talend Inc.
   * %%
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
  * 
  *      http://www.apache.org/licenses/LICENSE-2.0
  * 
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  * #L%
  */
 
 package org.talend.camel;
 
 import java.util.Map;
 
 

The Talend producer.

 
 public class TalendProducer extends DefaultProducer {
 
     private static final transient Logger LOG = LoggerFactory.getLogger(TalendProducer.class);
 
     private Thread workingThread;
 
     public TalendProducer(TalendEndpoint endpoint) {
         super(endpoint);
     }
 
     public void process(Exchange exchangethrows Exception {
         final TalendEndpoint talendEndpoint = (TalendEndpointgetEndpoint();
         final String context = talendEndpoint.getContext();
         final Collection<Stringargs = new ArrayList<String>();
         if (context != null) {
             args.add("--context=" + context);
         }
         if (talendEndpoint.isPropagateHeader()) {
             getParamsFromHeaders(exchangeargs);
         }
         getParamsFromProperties(talendEndpoint.getEndpointProperties(), args);
         invokeTalendJob(talendEndpoint.getJobInstance(), args.toArray(new String[args.size()]), exchange);
     }
 
     private static void getParamsFromProperties(Map<StringStringpropertiesMapCollection<Stringargs) {
         if (propertiesMap != null) {
             for (Map.Entry<StringStringentry : propertiesMap.entrySet()) {
                 args.add("--context_param " + entry.getKey() + '=' + entry.getValue());
             }
         }
     }
 
     private static void getParamsFromHeaders(
             Exchange exchangeCollection<Stringargs) {
         Map<StringObjectheaders = exchange.getIn().getHeaders();
         for (Map.Entry<StringObjectheader : headers.entrySet()) {
             Object headerValue = header.getValue();
             if (headerValue != null) {
                 String headerStringValue = exchange.getContext().getTypeConverter()
                         .convertTo(String.classexchangeheaderValue);
                 args.add("--context_param " + header.getKey() + '=' + headerStringValue);
             }
         }
     }
 
     private void invokeTalendJob(TalendJob jobInstanceString[] argsExchange exchange) {
         try {
             Method setExchangeMethod =
                     jobInstance.getClass().getMethod("setExchange"new Class[]{Exchange.class});
             .debug("Pass the exchange from route to Job");
             ObjectHelper.invokeMethod(setExchangeMethodjobInstanceexchange);
         } catch (NoSuchMethodException e) {
             .debug("No setExchange(exchange) method found in Job, the message data will be ignored");
         }
         if (.isDebugEnabled()) {
             .debug("Invoking Talend job '" + jobInstance.getClass().getCanonicalName() 
                    + ".runJob(String[] args)' with args: " + Arrays.toString(args));
        }
        ClassLoader oldContextCL = Thread.currentThread().getContextClassLoader();
        try {
             = Thread.currentThread();
            .setContextClassLoader(jobInstance.getClass().getClassLoader());
            int result = jobInstance.runJobInTOS(args);
            if (result != 0) {
                throw new RuntimeCamelException("Execution of Talend job '" 
                        + jobInstance.getClass().getCanonicalName() + "' with args: "
                        + Arrays.toString(args) + "' failed, see stderr for details");
                // Talend logs errors using System.err.println
            }
        } finally {
            .setContextClassLoader(oldContextCL);
             = null;
        }
    }
    @Override
    protected void doStop() throws Exception {
        super.doStop();
        if (null != ) {
            .info("Force terminate Talend job");
            .interrupt();
        }
    }
New to GrepCode? Check out our FAQ X