Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.pig.impl.io;
 
 import java.io.File;
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
 
 
 public class FileLocalizer {
     private static final Log log = LogFactory.getLog(FileLocalizer.class);
     
     static public final String LOCAL_PREFIX  = "file:";
     static public final int STYLE_UNIX = 0;
     static public final int STYLE_WINDOWS = 1;
 
     public static class DataStorageInputStreamIterator extends InputStream {
         InputStream current;
         ElementDescriptor[] elements;
         int currentElement;
         
         public DataStorageInputStreamIterator(ElementDescriptor[] elements) {
             this. = elements;
         }
 
         private boolean isEOF() throws IOException {
             if ( == null) {
                 if ( == .) {
                     return true;
                 }
                  = ++ ].open();
             }
             return false;
         }
 
         private void doNext() throws IOException {
             .close();
              = null;
         }
 
         @Override
         public int read() throws IOException {
             while (!isEOF()) {
                 int rc = .read();
                 if (rc != -1)
                     return rc;
                 doNext();
             }
             return -1;
         }
 
         @Override
        public int available() throws IOException {
            if (isEOF())
                return 0;
            return .available();
        }
        @Override
        public void close() throws IOException {
            if ( != null) {
                .close();
                 = null;
            }
             = .;
        }
        @Override
        public int read(byte[] bint offint lenthrows IOException {
            int count = 0;
            while (!isEOF() && len > 0) {
                int rc = .read(bofflen);
                if (rc <= 0) {
                    doNext();
                    continue;
                }
                off += rc;
                len -= rc;
                count += rc;
            }
            return count == 0 ? (isEOF() ? -1 : 0) : count;
        }
        @Override
        public int read(byte[] bthrows IOException {
            return read(b, 0, b.length);
        }
        @Override
        public long skip(long nthrows IOException {
            while (!isEOF() && n > 0) {
                n -= .skip(n);
            }
            return n;
        }
    }
    static String checkDefaultPrefix(ExecType execTypeString fileSpec) {
        if (fileSpec.startsWith())
            return fileSpec;
        return (execType == . ?  : "") + fileSpec;
    }

    
This function is meant to be used if the mappers/reducers want to access any HDFS file

Parameters:
fileName
Returns:
InputStream of the open file.
Throws:
java.io.IOException
    public static InputStream openDFSFile(String fileNamethrows IOException {
        Configuration conf = ..get();
        if (conf == null) {
            throw new RuntimeException(
                    "can't open DFS file while executing locally");
        }
        
        return openDFSFile(fileName, ConfigurationUtil.toProperties(conf));
    }
    public static InputStream openDFSFile(String fileNameProperties propertiesthrows IOException{
        DataStorage dds = new HDataStorage(properties);
        ElementDescriptor elem = dds.asElement(fileName);
        return openDFSFile(elem);
    }
    
    public static long getSize(String fileNamethrows IOException {
    	if (conf == null) {
    		throw new RuntimeException(
                "can't open DFS file while executing locally");
    	}
        return getSize(fileName, ConfigurationUtil.toProperties(conf));
    }
    
    public static long getSize(String fileNameProperties propertiesthrows IOException {
    	DataStorage dds = new HDataStorage(properties);
        ElementDescriptor elem = dds.asElement(fileName);
       
        // recursively get all the files under this path
        ElementDescriptor[] allElems = getFileElementDescriptors(elem);
        
        long size = 0;
        
        // add up the sizes of all files found
        for (int i=0; i<allElems.lengthi++) {
        	Map<StringObjectstats = allElems[i].getStatistics();
        	size += (Long) (stats.get(.));
        }
        
        return size;
    }
    
    private static InputStream openDFSFile(ElementDescriptor elemthrows IOException{
        ElementDescriptor[] elements = null;
        
        if (elem.exists()) {
            try {
                if(! elem.getDataStorage().isContainer(elem.toString())) {
                    if (elem.systemElement())
                        throw new IOException ("Attempt is made to open system file " + elem.toString());
                    return elem.open();
                }
            }
            catch (DataStorageException e) {
                throw new IOException("Failed to determine if elem=" + elem + " is container"e);
            }
            
            // elem is a directory - recursively get all files in it
            elements = getFileElementDescriptors(elem);
        } else {
            // It might be a glob
            if (!globMatchesFiles(elemelem.getDataStorage())) {
                throw new IOException(elem.toString() + " does not exist");
            } else {
                elements = getFileElementDescriptors(elem); 
                return new DataStorageInputStreamIterator(elements);
                
            }
        }
        
        return new DataStorageInputStreamIterator(elements);
    }
    
    
recursively get all "File" element descriptors present in the input element descriptor

Parameters:
elem input element descriptor
Returns:
an array of Element descriptors for files present (found by traversing all levels of dirs) in the input element descriptor
Throws:
org.apache.pig.backend.datastorage.DataStorageException
        DataStorage store = elem.getDataStorage();
        ElementDescriptor[] elems = store.asCollection(elem.toString());
        // elems could have directories in it, if so
        // get the files out so that it contains only files
        List<ElementDescriptorpaths = new ArrayList<ElementDescriptor>();
        List<ElementDescriptorfilePaths = new ArrayList<ElementDescriptor>();
        for (int m = 0; m < elems.lengthm++) {
            paths.add(elems[m]);
        }
        for (int j = 0; j < paths.size(); j++) {
            ElementDescriptor fullPath = store.asElement(store
                    .getActiveContainer(), paths.get(j));
            // Skip hadoop's private/meta files ...
            if (fullPath.systemElement()) {
                continue;
            }
            
            if (fullPath instanceof ContainerDescriptor) {
                for (ElementDescriptor child : ((ContainerDescriptorfullPath)) {
                    paths.add(child);
                }
                continue;
            } else {
                // this is a file, add it to filePaths
                filePaths.add(fullPath);
            }
        }
        elems = new ElementDescriptor[filePaths.size()];
        filePaths.toArray(elems);
        return elems;
    }
    
    private static InputStream openLFSFile(ElementDescriptor elemthrows IOException{
        // IMPORTANT NOTE: Currently we use HXXX classes to represent
        // files and dirs in local mode - so we can just delegate this
        // call to openDFSFile(elem). When we have true local mode files
        // and dirs THIS WILL NEED TO CHANGE
        return openDFSFile(elem);
    }
    
    
This function returns an input stream to a local file system file or a file residing on Hadoop's DFS

Deprecated:
Use open(java.lang.String,org.apache.pig.impl.PigContext) instead
Parameters:
fileName The filename to open
execType execType indicating whether executing in local mode or MapReduce mode (Hadoop)
storage The DataStorage object used to open the fileSpec
Returns:
InputStream to the fileSpec
Throws:
java.io.IOException
    @Deprecated
    static public InputStream open(String fileNameExecType execTypeDataStorage storagethrows IOException {
        fileName = checkDefaultPrefix(execTypefileName);
        if (!fileName.startsWith()) {
            ElementDescriptor elem = storage.asElement(fullPath(fileNamestorage));
            return openDFSFile(elem);
        }
        else {
            fileName = fileName.substring(.length());
            ElementDescriptor elem = storage.asElement(fullPath(fileNamestorage));
            return openLFSFile(elem);
        }
    }
    
    
    @Deprecated
    public static String fullPath(String fileNameDataStorage storage) {
        String fullPath;
        try {
            if (fileName.charAt(0) != '/') {
                ElementDescriptor currentDir = storage.getActiveContainer();
                ElementDescriptor elem = storage.asElement(currentDir.toString(), fileName);
                
                fullPath = elem.toString();
            } else {
                fullPath = fileName;
            }
        } catch (DataStorageException e) {
            fullPath = fileName;
        }
        return fullPath;
    }
    
    static public InputStream open(String fileSpecPigContext pigContextthrows IOException {
        fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
        if (!fileSpec.startsWith()) {
            ElementDescriptor elem = pigContext.getDfs().asElement(fullPath(fileSpecpigContext));
            return openDFSFile(elem);
        }
        else {
            fileSpec = fileSpec.substring(.length());
            //buffering because we only want buffered streams to be passed to load functions.
            /*return new BufferedInputStream(new FileInputStream(fileSpec));*/
            ElementDescriptor elem = pigContext.getLfs().asElement(fullPath(fileSpecpigContext));
            return openLFSFile(elem);
        }
    }
    
    

Parameters:
fileSpec
offset
pigContext
Returns:
SeekableInputStream
Throws:
java.io.IOException This is an overloaded version of open where there is a need to seek in stream. Currently seek is supported only in file, not in directory or glob.
    static public SeekableInputStream open(String fileSpeclong offsetPigContext pigContextthrows IOException {
        
        fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
        
        ElementDescriptor elem;
        if (!fileSpec.startsWith()) 
            elem = pigContext.getDfs().asElement(fullPath(fileSpecpigContext));
                
        else{
            fileSpec = fileSpec.substring(.length());
            elem = pigContext.getLfs().asElement(fullPath(fileSpecpigContext));            
        }
        
        if (elem.exists() && (!elem.getDataStorage().isContainer(elem.toString()))) {
            try {
                if (elem.systemElement())
                    throw new IOException ("Attempt is made to open system file " + elem.toString());
                
                SeekableInputStream sis = elem.sopen();
                sis.seek(offset.);
                return sis;
            }
            catch (DataStorageException e) {
                throw new IOException("Failed to determine if elem=" + elem + " is container"e);
            }
        }
        // Either a directory or a glob.
        else
            throw new IOException("Currently seek is supported only in a file, not in glob or directory.");
    }
    
    static public OutputStream create(String fileSpecPigContext pigContextthrows IOException{
        return create(fileSpec,false,pigContext);
    }
    static public OutputStream create(String fileSpecboolean appendPigContext pigContextthrows IOException {
        fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
        if (!fileSpec.startsWith()) {
            ElementDescriptor elem = pigContext.getDfs().asElement(fileSpec);
            return elem.create();
        }
        else {
            fileSpec = fileSpec.substring(.length());
            // TODO probably this should be replaced with the local file system
            File f = (new File(fileSpec)).getParentFile();
            if (f!=null){
                boolean res = f.mkdirs();
                if (!res)
                    .warn("FileLocalizer.create: failed to create " + f);
            }
            
            return new FileOutputStream(fileSpec,append);
        }
    }
    
    static public boolean delete(String fileSpecPigContext pigContextthrows IOException{
        fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
        ElementDescriptor elem = null;
        if (!fileSpec.startsWith()) {
            elem = pigContext.getDfs().asElement(fileSpec);
        } else {
            elem = pigContext.getLfs().asElement(fileSpec);
        }
        elem.delete();
        return true;
    }
    static Random      r           = new Random();

    
Thread local relativeRoot ContainerDescriptor. Do not access this object directly, since it's lazy initialized in the relativeRoot(PigContext) method, which should be used instead.
    private static ThreadLocal<ContainerDescriptorrelativeRoot =
        new ThreadLocal<ContainerDescriptor>() {
    };

    
This method is only used by test code to reset state.

Parameters:
initialized
    public static void setInitialized(boolean initialized) {
        if (!initialized) {
            .set(null);
        }
    }

    
Accessor method to get the root ContainerDescriptor used for temporary files bound to this thread. Calling this method lazy-initialized the relativeRoot object.

Parameters:
pigContext
Returns:
Throws:
org.apache.pig.backend.datastorage.DataStorageException
    private static synchronized ContainerDescriptor relativeRoot(final PigContext pigContext)
            throws DataStorageException {
        if (.get() == null) {
            String tdirpigContext.getProperties().getProperty("pig.temp.dir""/tmp");
            .set(pigContext.getDfs().asContainer(tdir + "/temp" + .nextInt()));
        }
        return .get();
    }
    public static void deleteTempFiles() {
        if (.get() != null) {
            try {
                .get().delete();
            } catch (IOException e) {
                .error(e);
            }
            setInitialized(false);
        }
    }
    public static Path getTemporaryPath(PigContext pigContextthrows IOException {
        return getTemporaryPath(pigContext"");
    }
    public static Path getTemporaryPath(PigContext pigContextString suffixthrows IOException {
      ElementDescriptor relative = relativeRoot(pigContext);
      if (!relativeRoot(pigContext).exists()) {
          relativeRoot(pigContext).create();
      }
      ElementDescriptor elem=
          pigContext.getDfs().asElement(relative.toString(), "tmp" + .nextInt() + suffix);
      return ((HPath)elem).getPath();
  }
    public static String hadoopify(String filenamePigContext pigContextthrows IOException {
        if (filename.startsWith()) {
            filename = filename.substring(.length());
        }
        
        ElementDescriptor localElem =
            pigContext.getLfs().asElement(filename);
            
        if (!localElem.exists()) {
            throw new FileNotFoundException(filename);
        }
            
        ElementDescriptor distribElem = pigContext.getDfs().asElement(
                getTemporaryPath(pigContext).toString());
    
        int suffixStart = filename.lastIndexOf('.');
        if (suffixStart != -1) {
            distribElem = pigContext.getDfs().asElement(distribElem.toString() +
                    filename.substring(suffixStart));
        }
            
        // TODO: currently the copy method in Data Storage does not allow to specify overwrite
        //       so the work around is to delete the dst file first, if it exists
        if (distribElem.exists()) {
            distribElem.delete();
        }
        localElem.copy(distribElemnullfalse);
            
        return distribElem.toString();
    }
    public static String fullPath(String filenamePigContext pigContextthrows IOException {
        try {
            if (filename.charAt(0) != '/') {
                ElementDescriptor currentDir = pigContext.getDfs().getActiveContainer();
                ElementDescriptor elem = pigContext.getDfs().asElement(currentDir.toString(),
                                                                                  filename);
                
                return elem.toString();
            }
            return filename;
        }
        catch (DataStorageException e) {
            return filename;
        }
    }
    public static boolean fileExists(String filenamePigContext context)
            throws IOException {
        return fileExists(filenamecontext.getFs());
    }

    
    @Deprecated 
    public static boolean fileExists(String filenameDataStorage store)
            throws IOException {
        ElementDescriptor elem = store.asElement(filename);
        return elem.exists() || globMatchesFiles(elemstore);
    }
    public static boolean isFile(String filenamePigContext context)
    throws IOException {
        return !isDirectory(filenamecontext.getDfs());
    }

    
    @Deprecated
    public static boolean isFile(String filenameDataStorage store)
    throws IOException {
        return !isDirectory(filenamestore);
    }
    public static boolean isDirectory(String filenamePigContext context)
    throws IOException {
        return isDirectory(filenamecontext.getDfs());
    }

    
    @Deprecated
    public static boolean isDirectory(String filenameDataStorage store)
    throws IOException {
        ElementDescriptor elem = store.asElement(filename);
        return (elem instanceof ContainerDescriptor);
    }
    private static boolean globMatchesFiles(ElementDescriptor elem,
                                            DataStorage fs)
            throws IOException
    {
        try {
            // Currently, if you give a glob with non-special glob characters, hadoop
            // returns an array with your file name in it.  So check for that.
            ElementDescriptor[] elems = fs.asCollection(elem.toString());
            switch (elems.length) {
            case 0:
                return false;
    
            case 1:
                return !elems[0].equals(elem);
    
            default:
                return true;
            }
        }
        catch (DataStorageException e) {
            throw e;
        }
    }
    public static Random getR() {
        return ;
    }
    public static void setR(Random r) {
        . = r;
    }
    
    
Convert path from Windows convention to Unix convention. Invoked under cygwin.

Parameters:
path path in Windows convention
Returns:
path in Unix convention, null if fail
    static public String parseCygPath(String pathint style) {
        String[] command
        if (style==)
            command = new String[] { "cygpath""-w"path };
        else
            command = new String[] { "cygpath""-u"path };
        Process p = null;
        try {
            p = Runtime.getRuntime().exec(command);
        } catch (IOException e) {
            return null;
        }
        int exitVal = 0;
        try {
            exitVal = p.waitFor();
        } catch (InterruptedException e) {
            return null;
        }
        if (exitVal != 0)
            return null;
        String line = null;
        BufferedReader br = null;
        try {
            InputStreamReader isr = new InputStreamReader(p.getInputStream());
            br = new BufferedReader(isr);
            line = br.readLine();
            isr.close();
        } catch (IOException e) {
            return null;
        } finally {
            if (br != nulltry {br.close();} catch (Exception e) {}
        }
        return line;
    }
    
    static File localTempDir = null;
    static {
        File f;
        boolean success = true;
        try {
            f = File.createTempFile("pig""tmp");
            success &= f.delete();
            success &= f.mkdir();
             = f;
            .deleteOnExit();
        } catch (IOException e) {
        }
        if (!success) {
          throw new RuntimeException("Error creating FileLocalizer temp directory.");
        }
    }    
    
    public static class FetchFileRet {
        public FetchFileRet(File fileboolean didFetch) {
            this. = file;
            this. = didFetch;
        }
        public File file;
        public boolean didFetch;
    }

    
Ensures that the passed path is on the local file system, fetching it to the java.io.tmpdir if necessary. If pig.jars.relative.to.dfs is true and dfs is not null, then a relative path is assumed to be relative to the passed dfs active directory. Else they are assumed to be relative to the local working directory.
    public static FetchFileRet fetchFile(Properties propertiesString filePaththrows IOException {
        return fetchFilesInternal(propertiesfilePathfalse)[0];
    }

    
Ensures that the passed files pointed to by path are on the local file system, fetching them to the java.io.tmpdir if necessary. If pig.jars.relative.to.dfs is true and dfs is not null, then a relative path is assumed to be relative to the passed dfs active directory. Else they are assumed to be relative to the local working directory.
    public static FetchFileRet[] fetchFiles(Properties propertiesString filePaththrows IOException {
        return fetchFilesInternal(propertiesfilePathtrue);
    }

    
Copies the files from remote to local filesystem. When 'multipleFiles' is set the path could point to multiple files through globs or a directory. In this case, return array contains multiple files, otherwise a single file is returned. If pig.jars.relative.to.dfs is true then a relative path is assumed to be relative to the default filesystem's active directory. Else they are assumed to be relative to the local working directory.

Parameters:
properties
filePath
multipleFiles
Returns:
    private static FetchFileRet[] fetchFilesInternal(Properties properties,
                                            String filePath,
                                            boolean multipleFilesthrows IOException {
        Path path = new Path(filePath);
        URI uri = path.toUri();
        Configuration conf = new Configuration();
        ConfigurationUtil.mergeConf(conf, ConfigurationUtil.toConfiguration(properties));
        // if there is no schema or if the schema is "local", then it is
        // expected to be a local path.
        FileSystem localFs = FileSystem.getLocal(conf);
        FileSystem srcFs;
        if ( (!"true".equals(properties.getProperty("pig.jars.relative.to.dfs"))
                && uri.getScheme() == null )||
                uri.getScheme().equals("local") ) {
            srcFs = localFs;
        } else {
            srcFs = path.getFileSystem(conf);
        }
        FileStatus[] files;
        if (multipleFiles) {
            files = srcFs.globStatus(path);
        } else {
            files = new FileStatus[]{ srcFs.getFileStatus(path) };
        }
        if (files == null || files.length == 0) {
            throw new ExecException("file '" + filePath + "' does not exist.", 101, .);
        }
        FetchFileRet[] fetchFiles = new FetchFileRet[files.length];
        int idx = 0;
        for(FileStatus file : files) {
            // should throw an exception if this is not a file?
            String pathname = file.getPath().toUri().getPath();
            String filename = file.getPath().getName();
            if (srcFs == localFs) {
                fetchFiles[idx++] = new FetchFileRet(new File(pathname), false);
            } else {
                // fetch from remote:
                File dest = new File(filename);
                dest.deleteOnExit();
                try {
                    srcFs.copyToLocalFile(file.getPath(), new Path(dest.getAbsolutePath()));
                } catch (IOException e) {
                    throw new ExecException("Could not copy " + filePath + " to local destination " + dest, 101, .e);
                }
                fetchFiles[idx++] = new FetchFileRet(desttrue);
            }
        }
        return fetchFiles;
    }
    
    
Ensures that the passed resource is available from the local file system, fetching it to a temporary directory.

    public static FetchFileRet fetchResource(String namethrows IOExceptionResourceNotFoundException {
      FetchFileRet localFileRet = null;
      InputStream resourceStream = PigContext.getClassLoader().getResourceAsStream(name);
      if (resourceStream != null) {        
        File dest = new File(name);
        dest.getParentFile().mkdirs();        
        dest.deleteOnExit();
                
        OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(dest));
        byte[] buffer = new byte[1024];
        int len;
        while ((len=resourceStream.read(buffer)) > 0) {
          outputStream.write(buffer,0,len);
        }
        outputStream.close();
        
        localFileRet = new FetchFileRet(dest,false);
      }
      else
      {
        throw new ResourceNotFoundException(name);
      }
      
      return localFileRet;
    }
New to GrepCode? Check out our FAQ X