Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package ch.cern.dirq;
  
  import static ch.cern.mig.posix.Posix.posix;
  
  import java.io.File;
  import java.io.FileFilter;
  import java.util.Arrays;
 import java.util.List;
 
 
QueueSimple - object oriented interface to a simple directory based queue.

A port of Perl module Directory::Queue::Simple http://search.cpan.org/dist/Directory-Queue/

The documentation from Directory::Queue::Simple module was adapted for Java.

Usage

 // sample producer
 QueueSimple dirq = new QueueSimple("/tmp/test");
 for (int i=0; i < 100; i++) {
  String name = dirq.add("element " + i);
  System.out.println("# added element " + i + " as " + name);
 

 // sample consumer
 dirq = QueueSimple('/tmp/test');
 for (String name:dirq) {
  if (! dirq.lock(name)) {
   continue;
  }
  System.out.println("# reading element " + name);
  String data = dirq.get(name);
  // one could use dirq.unlock(name) to only browse the queue...
  dirq.remove(name);
 }
 }
 

Description

This module is very similar to normal dirq, but uses a different way to store data in the filesystem, using less directories. Its API is almost identical.

Compared to normal dirq, this module:

  • is simpler
  • is faster
  • uses less space on disk
  • can be given existing files to store
  • does not support schemas
  • can only store and retrieve byte strings
  • is not compatible (at filesystem level) with Queue

Directory Structure

The toplevel directory contains intermediate directories that contain the stored elements, each of them in a file.
The names of the intermediate directories are time based: the element insertion time is used to create a 8-digits long hexadecimal number.
The granularity (see the constructor) is used to limit the number of new directories. For instance, with a granularity of 60 (the default), new directories will be created at most once per minute.

Since there is usually a filesystem limit in the number of directories a directory can hold, there is a trade-off to be made. If you want to support many added elements per second, you should use a low granularity to keep small directories. However, in this case, you will create many directories and this will limit the total number of elements you can store.

The elements themselves are stored in files (one per element) with a 14-digits long hexadecimal name SSSSSSSSMMMMMR where:

  • SSSSSSSS represents the number of seconds since the Epoch
  • MMMMM represents the microsecond part of the time since the Epoch
  • R is a random digit used to reduce name collisions

A temporary element (being added to the queue) will have a .tmp suffix.

A locked element will have a hard link with the same name and the .lck suffix.

Please refer to Queue for general information about directory queues.

Author(s):
Lionel Cons <lionel.cons@cern.ch>
Massimo Paladin <massimo.paladin@gmail.com> Copyright (C) CERN 2012-2013
public class QueueSimple implements Queue {
    public static final String TEMPORARY_SUFFIX = ".tmp";
    public static final String LOCKED_SUFFIX = ".lck";
    public static final Pattern DIRECTORY_REGEXP =
        Pattern.compile("[0-9a-f]{8}");
    public static final Pattern ELEMENT_REGEXP =
        Pattern.compile("[0-9a-f]{14}");
    private static boolean WARN = false;
    private static Random rand = new Random();
    private String id = null;
    private String queuePath = null;
    private int granularity = 60;
    private int umask = 0; /* set in constructor */
    private int defaultMaxLock = 600;
    private int defaultMaxTemp = 300;
    private int rndHex = 0; /* set in constructor */
    private void warn(String string) {
        if (!)
            return;
        ..println(string);
        ..flush();
    }
    @Override
    public String getId() {
        return ;
    }

    
Get the queue path.

Returns:
queue path
    public String getQueuePath() {
        return ;
    }

    
Get the granularity.

Returns:
granularity (in seconds)
    public int getGranularity() {
        return ;
    }

    
Set the granularity.

Parameters:
granularity to be set (in seconds)
    public QueueSimple setGranularity(int granularity) {
        this. = granularity;
        return this;
    }

    
Get the umask.

Returns:
numerical umask
    public int getUmask() {
        return ;
    }

    
Set the umask.

Parameters:
umask to be set (numerical)
    public QueueSimple setUmask(int umask) {
        this. = umask;
        return this;
    }

    
Get the default maxLock for purge().

Returns:
maximum lock time (in seconds)
    public int getMaxLock() {
        return ;
    }

    
Set the default maxLock for purge().

Parameters:
maxLock maximum lock time (in seconds)
    public QueueSimple setMaxLock(int maxLock) {
        this. = maxLock;
        return this;
    }

    
Get the default maxTemp for purge().

Returns:
maximum temporary time (in seconds)
    public int getMaxTemp() {
        return ;
    }

    
Set the default maxTemp for purge().

Parameters:
maxTemp maximum temporary time (in seconds)
    public QueueSimple setMaxTemp(int maxTemp) {
        this. = maxTemp;
        return this;
    }

    
Get the random hexadecimal digit.

Returns:
numerical hexadecimal digit
    public int getRndHex() {
        return ;
    }

    
Set the random hexadecimal digit.

Parameters:
rndHex hexadecimal digit to be set (numerical)
    public QueueSimple setRndHex(int rndHex) {
        this. = rndHex % 16;
        return this;
    }

    
Constructor creating a simple directory queue given a path.

Parameters:
queuePath the path of the directory queue
Throws:
java.io.IOException
    public QueueSimple(String queuePaththrows IOException {
        this. = queuePath;
        this. = .umask();
        this. = .nextInt(0x10);
        // check if directory exists
        File dir = new File(queuePath);
        if (dir.exists() && (!dir.isDirectory()))
            throw new IllegalArgumentException("not a directory: " + queuePath);
        // check umask option
        if ( >= 512)
            throw new IllegalArgumentException("invalid umask: " + );
        // create top level directory
        String tmpPath = "";
        for (String subDir : dir.getPath().split("/+")) {
            tmpPath += subDir + .;
            if (new File(tmpPath).exists()) {
                continue;
            }
            specialMkdir(tmpPath);
        }
        // store the queue unique identifier
        if (System.getProperty("os.name").startsWith("Windows")) {
             = queuePath;
        } else {
            // set id to stat->st_dev + stat->st_ino
            FileStat stat = .stat(queuePath);
             = "" + stat.dev() + ":" + stat.ino();
        }
    }
    private static String name(int r) {
        return String.format("%013x%01x", System.nanoTime() / 1000, r);
    }
    private static boolean specialMkdir(String paththrows IOException {
        return specialMkdir(path.umask());
    }
    private static boolean specialMkdir(String pathint umask)
            throws IOException {
        try {
            .mkdir(path, 0777 - umask);
        } catch (LastErrorException e) {
            if (Posix.getErrorCode(e) == .
                    && !new File(path).isFile())
                return false;
            else if (Posix.getErrorCode(e) == .)
                return false;
            throw new IOException(String.format("cannot mkdir(%s): %s"path,
                    e.getMessage()));
        }
        return true;
    }
    private boolean specialRmdir(String paththrows IOException {
        try {
            .rmdir(path);
        } catch (LastErrorException e) {
            if (!(Posix.getErrorCode(e) == .))
                throw new IOException(String.format("cannot rmdir(%s): %s",
                        pathe.getMessage()));
            return false;
        }
        return true;
    }
    @Override
    public String add(String datathrows IOException {
        String dir = addDir();
        File tmp = addData(dirdata);
        return addPathHelper(tmpdir);
    }
    @Override
    public String add(byte[] datathrows IOException {
        String dir = addDir();
        File tmp = addData(dirdata);
        return addPathHelper(tmpdir);
    }
    private String addPathHelper(File tmpString dirthrows IOException {
        String name = null;
        while (true) {
            name = name();
            File newFile = new File( + . + dir
                    + . + name);
            try {
                .link(tmp.getPath(), newFile.getPath());
            } catch (LastErrorException e) {
                if (Posix.getErrorCode(e) != .) {
                    throw new IOException(String.format(
                            "cannot link(%s, %s): %s"tmpnewFile,
                            e.getMessage()));
                } else {
                    continue;
                }
            }
            try {
                .unlink(tmp.getPath());
            } catch (LastErrorException e) {
                throw new IOException(String.format("cannot unlink(%s): %s",
                        tmpe.getMessage()));
            }
            break;
        }
        return dir + . + name;
    }
    private File fileCreate(String paththrows IOException {
        File file = null;
        try {
            file = .open(path);
        } catch (LastErrorException e) {
            // RACE: someone else may have created the file (EEXIST)
            // RACE: the containing directory may be mising (ENOENT)
            if (Posix.getErrorCode(e) != .
                    && Posix.getErrorCode(e) != .)
                throw new IOException(String.format("cannot create %s: %s",
                        pathe.getMessage()));
            return null;
        }
        return file;
    }
    private File addData(String dirbyte[] datathrows IOException {
        File newFile = getNewFile(dir);
        try {
            FileUtils.writeToFile(newFiledata);
        } catch (IOException e) {
            throw new IOException("cannot write to file: " + newFile);
        }
        return newFile;
    }
    private File addData(String dirString datathrows IOException {
        File newFile = getNewFile(dir);
        try {
            FileUtils.writeToFile(newFiledata);
        } catch (IOException e) {
            throw new IOException("cannot write to file: " + newFile);
        }
        return newFile;
    }
    private File getNewFile(String dirthrows IOException {
        File newFile = null;
        while (true) {
            String name = name();
            newFile = fileCreate( + . + dir
                    + . + name + );
            if (newFile != null)
                break;
            if (!new File( + . + dir).exists())
                specialMkdir( + . + dir);
        }
        return newFile;
    }
    @Override
    public String addPath(String paththrows IOException {
        String dir = addDir();
        specialMkdir(this. + . + dir);
        return addPathHelper(new File(path), dir);
    }
    protected String addDir() {
        long now = System.currentTimeMillis() / 1000;
        if ( > 0)
            now -= now % ;
        return String.format("%08x"now);
    }
    @Override
    public String get(String name) {
        return FileUtils.readToString( + . + name
                + );
    }
    @Override
    public byte[] getAsByteArray(String name) {
        return FileUtils.readToByteArray( + . + name
                + );
    }
    @Override
    public String getPath(String name) {
        return  + . + name + ;
    }
    @Override
    public boolean lock(String namethrows IOException {
        return lock(nametrue);
    }
    @Override
    public boolean lock(String nameboolean permissivethrows IOException {
        File file = new File( + . + name);
        File lock = new File( + . + name + );
        try {
            .link(file.getPath(), lock.getPath());
        } catch (LastErrorException e) {
            if (permissive
                    && (Posix.getErrorCode(e) == . || Posix
                    .getErrorCode(e) == .))
                return false;
            throw new IOException(String.format("cannot link(%s, %s): %s",
                    filelocke.getMessage()));
        }
        try {
            .utimes(file.getPath(), null);
        } catch (LastErrorException e) {
            if (permissive && Posix.getErrorCode(e) == .) {
                .unlink(lock.getPath());
                return false;
            }
            throw new IOException(String.format("cannot utime(%s, null): %s",
                    filee.getMessage()));
        }
        return true;
    }
    @Override
    public boolean unlock(String namethrows IOException {
        return unlock(namefalse);
    }
    @Override
    public boolean unlock(String nameboolean permissivethrows IOException {
        String lock =  + . + name + ;
        try {
            .unlink(lock);
        } catch (LastErrorException e) {
            if (permissive && Posix.getErrorCode(e) == .)
                return false;
            throw new IOException(String.format("cannot unlink(%s): %s"lock,
                    e.getMessage()));
        }
        return true;
    }
    @Override
    public void remove(String name) {
        .unlink( + . + name);
        .unlink( + . + name + );
    }

    
Used to filter directories while listing files.
    private class DirFilter implements FileFilter {
        public boolean accept(File file) {
            return file.isDirectory();
        }
    }
    @Override
    public int count() {
        int count = 0;
        // get list of intermediate directories
        File[] elements = new File().listFiles(new DirFilter());
        // count elements in sub-directories
        for (File element : elements) {
            File[] inElements = element.listFiles();
            for (File inElement : inElements) {
                if (.matcher(inElement.getName()).matches())
                    count += 1;
            }
        }
        return count;
    }
    @Override
    public void purge() throws IOException {
        purge(nullnull);
    }
    @Override
    public void purge(Integer maxLockthrows IOException {
        purge(maxLocknull);
    }
    @Override
    public void purge(Integer maxLockInteger maxTempthrows IOException {
        long now = System.currentTimeMillis() / 1000;
        long oldtemp = 0;
        long oldlock = 0;
        // get the list of intermediate directories
        File[] elements = new File().listFiles(new DirFilter());
        if (maxLock == null)
            maxLock = ;
        if (maxTemp == null)
            maxTemp = ;
        if (maxLock > 0)
            oldlock = now - maxLock;
        if (maxTemp > 0)
            oldtemp = now - maxTemp;
        if (maxTemp > 0 || maxLock > 0) {
            for (File element : elements) {
                File[] inElements = element.listFiles(new RegExpFilenameFilter(
                        Pattern.compile("\\."), false));
                if (inElements == null)
                    continue;
                for (File inElement : inElements) {
                    FileStat stat = null;
                    try {
                        stat = .stat(inElement.getPath());
                    } catch (LastErrorException e) {
                        if (Posix.getErrorCode(e) == .)
                            continue;
                        throw new IOException(String.format(
                                "cannot stat(%s): %s"inElement,
                                e.getMessage()));
                    }
                    if (inElement.getName().endsWith()
                            && stat.mtime() >= oldtemp)
                        continue;
                    if (inElement.getName().endsWith()
                            && stat.mtime() >= oldlock)
                        continue;
                    warn("removing too old volatile file: " + inElement);
                    try {
                        .unlink(inElement.getPath());
                    } catch (LastErrorException e) {
                        if (Posix.getErrorCode(e) == .)
                            continue;
                        throw new IOException(String.format(
                                "cannot unlink(%s): %s"inElement,
                                e.getMessage()));
                    }
                }
            }
        }
        // try to purge all but the last intermediate directory
        if (elements.length > 1) {
            Arrays.sort(elements);
            for (int c = 0; c < elements.length - 1; c++) {
                if (elements[c].exists() && elements[c].listFiles().length == 0)
                    specialRmdir(elements[c].getPath());
            }
        }
    }

    
Iterator over QueueSimple implementation.
    @Override
    public Iterator<Stringiterator() {
        return new QueueSimpleIterator(this);
    }
    private static class QueueSimpleIterator implements Iterator<String> {
        private QueueSimple iteratedQueue = null;
        private List<Stringdirs = new ArrayList<String>();
        private List<Stringelts = new ArrayList<String>();
        private boolean buildElements() {
            boolean result = false;
            while (!result && !.isEmpty()) {
                String dir = .remove(0);
                File[] content = new File(. + .
                        + dir).listFiles(new RegExpFilenameFilter(
                        ));
                if (content == null || content.length == 0)
                    continue;
                else
                    result = true;
                Arrays.sort(content);
                for (File element : content) {
                    .add(dir + . + element.getName());
                }
            }
            return result;
        }

        
Constructor which creates an iterator over the given queue.

Parameters:
queue queue to be iterated
        public QueueSimpleIterator(QueueSimple queue) {
             = queue;
            File[] content = new File(.getQueuePath())
                    .listFiles(new RegExpFilenameFilter());
            for (File dir : content) {
                .add(dir.getName());
            }
            Collections.sort();
        }

        
Return true if there are still elements to be iterated.
        @Override
        public boolean hasNext() {
            if (!.isEmpty())
                return true;
            if (buildElements())
                return true;
            return false;
        }

        
Return the next element to be iterated.
        @Override
        public String next() {
            if (!.isEmpty())
                return .remove(0);
            if (buildElements())
                return .remove(0);
            throw new NoSuchElementException();
        }

        
Make sure visited element is removed from the list of iterable items.
        @Override
        public void remove() {
            // already removed
        }
    }
New to GrepCode? Check out our FAQ X