Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.pig.impl.util;
 
 import java.io.File;
 import java.net.URI;
 import java.net.URL;
 import java.util.Map;
 import java.util.Set;
 
 import  org.antlr.runtime.CommonTokenStream;
 
 
 
 public class JarManager {
 
     private static Log log = LogFactory.getLog(JarManager.class);
    
A container class to track the Jar files that need to be merged together to submit to Hadoop.
 
     private static class JarListEntry {
        
The name of the Jar file to merge in.
 
         String jar;
        
If this field is not null, only entries that start with this prefix will be merged in.
 
         String prefix;
 
         JarListEntry(String jarString prefix) {
             this. = jar;
             this. = prefix;
         }
 
         @Override
         public boolean equals(Object obj) {
             if (!(obj instanceof JarListEntry))
                 return false;
             JarListEntry other = (JarListEntryobj;
             if (!.equals(other.jar))
                 return false;
             if ( == null)
                 return other.prefix == null;
             return .equals(other.prefix);
         }
 
         @Override
         public int hashCode() {
            return .hashCode() + ( == null ? 1 : .hashCode());
        }
    }
    private static enum DefaultPigPackages {
        PIG("org/apache/pig"PigMapReduce.class),
        BZIP2R("org/apache/tools/bzip2r"BZip2Constants.class),
        AUTOMATON("dk/brics/automaton"Automaton.class),
        ANTLR("org/antlr/runtime", CommonTokenStream.class),
        GUAVA("com/google/common"Multimaps.class),
        JACKSON_CORE("org/codehaus/jackson"JsonPropertyOrder.class),
        JACKSON_MAPPER("org/codehaus/jackson"JacksonStdImpl.class),
        JODATIME("org/joda/time"DateTime.class);
        private final String pkgPrefix;
        private final Class pkgClass;
        DefaultPigPackages(String pkgPrefixClass pkgClass) {
            this. = pkgPrefix;
            this. = pkgClass;
        }
        public String getPkgPrefix() {
            return ;
        }
        public Class getPkgClass() {
            return ;
        }
    }

    
Create a jarfile in a temporary path, that is a merge of all the jarfiles containing the functions and the core pig classes.

Parameters:
funcs the functions that will be used in a job and whose jar files need to be included in the final merged jar file.
Throws:
ClassNotFoundException
IOException
    @SuppressWarnings("deprecation")
    public static void createJar(OutputStream osSet<StringfuncsPigContext pigContextthrows ClassNotFoundExceptionIOException {
        JarOutputStream jarFile = new JarOutputStream(os);
        HashMap<StringStringcontents = new HashMap<StringString>();
        Vector<JarListEntryjarList = new Vector<JarListEntry>();
        for (DefaultPigPackages pkgToSend : DefaultPigPackages.values()) {
            addContainingJar(jarListpkgToSend.getPkgClass(), pkgToSend.getPkgPrefix(), pigContext);
        }
        for (String funcfuncs) {
            Class clazz = pigContext.getClassForAlias(func);
            if (clazz != null) {
                addContainingJar(jarListclazznullpigContext);
                
                if (clazz.getSimpleName().equals("StreamingUDF")) {
                    for (String fileName : StreamingUDF.getResourcesForJar()) {
                        InputStream in = Launcher.class.getResourceAsStream(fileName);
                        addStream(jarFilefileNameincontents);
                    }
                }
            }
        }
        Iterator<JarListEntryit = jarList.iterator();
        while (it.hasNext()) {
            JarListEntry jarEntry = it.next();
            // log.error("Adding " + jarEntry.jar + ":" + jarEntry.prefix);
            mergeJar(jarFilejarEntry.jarjarEntry.prefixcontents);
        }
        for (String scriptJarpigContext.scriptJars) {
            mergeJar(jarFilescriptJarnullcontents);
        }
        for (String pathpigContext.scriptFiles) {
            .debug("Adding entry " + path + " to job jar" );
            InputStream stream = null;
            if (new File(path).exists()) {
                stream = new FileInputStream(new File(path));
            } else {
                stream = PigContext.getClassLoader().getResourceAsStream(path);
            }
            if (stream==null) {
                throw new IOException("Cannot find " + path);
            }
        	addStream(jarFilepathstreamcontents);
        }
        for (Map.Entry<StringFileentry : pigContext.getScriptFiles().entrySet()) {
            .debug("Adding entry " + entry.getKey() + " to job jar" );
            InputStream stream = null;
            if (entry.getValue().exists()) {
                stream = new FileInputStream(entry.getValue());
            } else {
                stream = PigContext.getClassLoader().getResourceAsStream(entry.getValue().getPath());
            }
            if (stream==null) {
                throw new IOException("Cannot find " + entry.getValue().getPath());
            }
        	addStream(jarFileentry.getKey(), streamcontents);
        }
        .debug("Adding entry pigContext to job jar" );
        jarFile.putNextEntry(new ZipEntry("pigContext"));
        new ObjectOutputStream(jarFile).writeObject(pigContext);
        jarFile.close();
    }

    
Creates a Classloader based on the passed jarFile and any extra jar files.

Parameters:
jarFile the jar file to be part of the newly created Classloader. This jar file plus any jars in the extraJars list will constitute the classpath.
Returns:
the new Classloader.
Throws:
MalformedURLException
    static ClassLoader createCl(String jarFilePigContext pigContextthrows MalformedURLException {
        int len = pigContext.extraJars.size();
        int passedJar = jarFile == null ? 0 : 1;
        URL urls[] = new URL[len + passedJar];
        if (jarFile != null) {
            urls[0] = new URL("file:" + jarFile);
        }
        for (int i = 0; i < pigContext.extraJars.size(); i++) {
            urls[i + passedJar] = new URL("file:" + pigContext.extraJars.get(i));
        }
        return new URLClassLoader(urlsPigMapReduce.class.getClassLoader());
    }
    

    
Merge one Jar file into another.

Parameters:
jarFile the stream of the target jar file.
jar the name of the jar file to be merged.
prefix if not null, only entries in jar that start with this prefix will be merged.
contents the current contents of jarFile. (Use to prevent duplicate entries.)
Throws:
FileNotFoundException
IOException
    private static void mergeJar(JarOutputStream jarFileString jarString prefixMap<StringStringcontents)
            throws FileNotFoundExceptionIOException {
        JarInputStream jarInput = new JarInputStream(new FileInputStream(jar));
        .debug("Adding jar " + jar + (prefix != null ? " for prefix "+prefix : "" ) + " to job jar" );
        mergeJar(jarFilejarInputprefixcontents);
    }
    
    private static void mergeJar(JarOutputStream jarFileURL jarString prefixMap<StringStringcontents)
        JarInputStream jarInput = new JarInputStream(jar.openStream());
        mergeJar(jarFilejarInputprefixcontents);
    }
    private static void mergeJar(JarOutputStream jarFileJarInputStream jarInputString prefixMap<StringStringcontents)
        JarEntry entry;
        while ((entry = jarInput.getNextJarEntry()) != null) {
            if (prefix != null && !entry.getName().startsWith(prefix)) {
                continue;
            }
            addStream(jarFileentry.getName(), jarInputcontents);
        }
    }
        
Adds a stream to a Jar file.

Parameters:
os the OutputStream of the Jar file to which the stream will be added.
name the name of the stream.
is the stream to add.
contents the current contents of the Jar file. (We use this to avoid adding two streams with the same name.
Throws:
IOException
    private static void addStream(JarOutputStream osString nameInputStream isMap<StringStringcontents)
            throws IOException {
        if (contents.get(name) != null) {
            return;
        }
        contents.put(name"");
        os.putNextEntry(new JarEntry(name));
        byte buffer[] = new byte[4096];
        int rc;
        while ((rc = is.read(buffer)) > 0) {
            os.write(buffer, 0, rc);
        }
    }


    
    
Adds the Jar file containing the given class to the list of jar files to be merged.

Parameters:
jarList the list of jar files to be merged.
clazz the class in a jar file to be merged.
prefix if not null, only resources from the jar file that start with this prefix will be merged.
    private static void addContainingJar(Vector<JarListEntryjarListClass clazzString prefixPigContext pigContext) {
        String jar = findContainingJar(clazz);
        if (pigContext.skipJars.contains(jar) && prefix == null)
            return;
        if (jar == null)
        {
            //throw new RuntimeException("Couldn't find the jar for " + clazz.getName());
            .warn("Couldn't find the jar for " + clazz.getName() + ", skip it");
            return;
        }
        JarListEntry jarListEntry = new JarListEntry(jarprefix);
        if (!jarList.contains(jarListEntry))
            jarList.add(jarListEntry);
    }


    
Find a jar that contains a class of the same name, if any. It will return a jar file, even if that is not the first thing on the class path that has a class with the same name.

Parameters:
my_class the class to find
Returns:
a jar file that contains the class, or null
Throws:
IOException
    public static String findContainingJar(Class my_class) {
        ClassLoader loader = PigContext.getClassLoader();
        String class_file = my_class.getName().replaceAll("\\.""/") + ".class";
        try {
            Enumeration<URLitr = null;
            //Try to find the class in registered jars
            if (loader instanceof URLClassLoader) {
                itr = ((URLClassLoaderloader).findResources(class_file);
            }
            //Try system classloader if not URLClassLoader or no resources found in URLClassLoader
            if (itr == null || !itr.hasMoreElements()) {
                itr = loader.getResources(class_file);
            }
            for (; itr.hasMoreElements();) {
                URL url = (URLitr.nextElement();
                if ("jar".equals(url.getProtocol())) {
                    String toReturn = url.getPath();
                    if (toReturn.startsWith("file:")) {
                        toReturn = toReturn.substring("file:".length());
                    }
                    // URLDecoder is a misnamed class, since it actually decodes
                    // x-www-form-urlencoded MIME type rather than actual
                    // URL encoding (which the file path has). Therefore it would
                    // decode +s to ' 's which is incorrect (spaces are actually
                    // either unencoded or encoded as "%20"). Replace +s first, so
                    // that they are kept sacred during the decoding process.
                    toReturn = toReturn.replaceAll("\\+""%2B");
                    toReturn = URLDecoder.decode(toReturn"UTF-8");
                    return toReturn.replaceAll("!.*$""");
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        return null;
    }
    
    
Add the jars containing the given classes to the job's configuration such that JobClient will ship them to the cluster and add them to the DistributedCache

Parameters:
job Job object
classes classes to find
Throws:
IOException
    public static void addDependencyJars(Job jobClass<?>... classes)
            throws IOException {
        Configuration conf = job.getConfiguration();
        FileSystem fs = FileSystem.getLocal(conf);
        Set<Stringjars = new HashSet<String>();
        jars.addAll(conf.getStringCollection("tmpjars"));
        addQualifiedJarsName(fsjarsclasses);
        if (jars.isEmpty())
            return;
        conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[0])));
    }
    
    
Add the qualified path name of jars containing the given classes

Parameters:
fs FileSystem object
jars the resolved path names to be added to this set
classes classes to find
    private static void addQualifiedJarsName(FileSystem fsSet<StringjarsClass<?>... classes) {
        URI fsUri = fs.getUri();
        Path workingDir = fs.getWorkingDirectory();
        for (Class<?> clazz : classes) {
            String jarName = findContainingJar(clazz);
            if (jarName == null) {
                .warn("Could not find jar for class " + clazz);
                continue;
            }
            jars.add(new Path(jarName).makeQualified(fsUriworkingDir).toString());
        }
    }
New to GrepCode? Check out our FAQ X