Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.util;
MonitoredUDF is used to watch execution of a UDF, and kill it if the UDF takes an exceedingly long time. Null is returned if the UDF times out. Optionally, UDFs can implement the provided interfaces to provide custom logic for handling errors and default values.
 public class MonitoredUDFExecutor implements Serializable {
     private final transient ListeningExecutorService exec;
     private final transient TimeUnit timeUnit;
     private final transient long duration;
     private final transient Object defaultValue;
     private final transient EvalFunc evalFunc;
     private final transient Function<TupleObjectclosure;
     // Let us reflect upon our errors.
     private final transient Class<? extends ErrorCallbackerrorCallback;
     private final transient Method errorHandler;
     private final transient Method timeoutHandler;
     public MonitoredUDFExecutor(EvalFunc udf) {
         // is 10 enough? This is pretty arbitrary.
          = MoreExecutors.listeningDecorator(MoreExecutors.getExitingExecutorService(new ScheduledThreadPoolExecutor(1)));
         this. = udf;
         MonitoredUDF anno = udf.getClass().getAnnotation(MonitoredUDF.class);
          = anno.timeUnit();
          = anno.duration();
          = anno.errorCallback();
         // The exceptions really should not happen since our handlers are defined by the parent class which
         // must be extended by all custom handlers.
         try {
              = .getMethod("handleError"EvalFunc.classException.class);
              = .getMethod("handleTimeout"EvalFunc.classException.class);
         } catch (SecurityException e1) {
             throw new RuntimeException("Unable to use the monitored callback due to a Security Exception while working with "
                     + .getClass().getName());
         } catch (NoSuchMethodException e1) {
             throw new RuntimeException("Unable to use the monitored callback because a required method not found while working with "
                     + .getClass().getName());
         Type retType = udf.getReturnType();
          = getDefaultValue(annoretType);
          = new Function<TupleObject>() {
             public Object apply(Tuple input) {
                 try {
                     return .exec(input);
                 } catch (IOException e) {
                     // I don't see a CheckedFunction in Guava. Resorting to this hackery.
                     throw new RuntimeException(e);
    private Object getDefaultValue(MonitoredUDF annoType retType) {
        if (retType.equals(.) || retType.equals(Integer.class)) {
            return (anno.intDefault().length == 0) ? null : anno.intDefault()[0];
        } else if (retType.equals(.) || retType.equals(Double.class)) {
            return (anno.doubleDefault().length == 0) ? null : anno.doubleDefault()[0];
        } else if (retType.equals(.) || retType.equals(Float.class)) {
            return (anno.floatDefault().length == 0) ? null : anno.floatDefault()[0];
        } else if (retType.equals(.) || retType.equals(Long.class)) {
            return (anno.longDefault().length == 0) ? null : anno.longDefault()[0];
        } else if (retType.equals(String.class)) {
            return (anno.stringDefault().length == 0) ? null : anno.stringDefault()[0];
        } else {
            // Default default is null.
            return null;

This method *MUST* be called in the finish by POUserFunc. Though we do use an ExitingExecutorService just in case.
    public void terminate() {

UDF authors can optionally extend this class and provide the class of their custom callbacks in the annotation to perform their own handling of errors and timeouts.
    public static class ErrorCallback {
        public static void handleError(EvalFunc evalFuncException e) {
            StatusReporter reporter = PigStatusReporter.getInstance();
            if (reporter != null &&
                    reporter.getCounter(evalFunc.getClass().getName(), e.toString()) != null) {
                reporter.getCounter(evalFunc.getClass().getName(), e.toString()).increment(1L);
        public static void handleTimeout(EvalFunc evalFuncException e) {
            StatusReporter reporter = PigStatusReporter.getInstance();
            if (reporter != null &&
                    reporter.getCounter(evalFunc.getClass().getName(), "MonitoredUDF Timeout") != null) {
                reporter.getCounter(evalFunc.getClass().getName(), "MonitoredUDF Timeout").increment(1L);
    public Object monitorExec(final Tuple inputthrows IOException {
        CheckedFuture<ObjectExceptionf =
                    // the Future whose exceptions we want to catch
                    .submit(new Callable<Object>() {
                        public Object call() throws Exception {
                            return .apply(input);
                    // How to map those exceptions; we simply rethrow them.
                    // Theoretically we could do some handling of
                    // CancellationException, ExecutionException  and InterruptedException here
                    // and do something special for UDF IOExceptions as opposed to thread exceptions.
                    new Function<ExceptionException>() {
                        public Exception apply(Exception e) {
                            return e;
        Object result = ;
        // The outer try "should never happen" (tm).
        try {
            try {
                result = f.get();
            } catch (TimeoutException e) {
            } catch (Exception e) {
            } finally {
        } catch (IllegalArgumentException e) {
            throw new IOException(e);
        } catch (IllegalAccessException e) {
            throw new IOException(e);
        } catch (InvocationTargetException e) {
            throw new IOException(e);
        return result;
New to GrepCode? Check out our FAQ X