Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package ch.cern.dirq;
  
  import java.util.Map;

Queue - object oriented interface to a directory based queue

Description

The goal of this module is to offer a queue system using the underlying filesystem for storage, security and to prevent race conditions via atomic operations. It focuses on simplicity, robustness and scalability.

This module allows multiple concurrent readers and writers to interact with the same queue.

Different implementations are available so readers and writers can be written in different programming languages:

  • A Perl implementation of the same algorithms is available at http://search.cpan.org/dist/Directory-Queue/
  • A Python implementation of the same algorithms is available at https://github.com/cern-mig/python-dirq

There is no knowledge of priority within a queue. If multiple priorities are needed, multiple queues should be used.

Terminology

An element is something that contains one or more pieces of data. With QueueSimple queues, an element can only contain one binary string.

A queue is a "best effort" FIFO (First In - First Out) collection of elements.

It is very hard to guarantee pure FIFO behavior with multiple writers using the same queue. Consider for instance:

  • Writer1: calls the add() method
  • Writer2: calls the add() method
  • Writer2: the add() method returns
  • Writer1: the add() method returns
Who should be first in the queue, Writer1 or Writer2?

For simplicity, this implementation provides only "best effort" FIFO, i.e. there is a very high probability that elements are processed in FIFO order but this is not guaranteed. This is achieved by using a high-resolution timer and having elements sorted by the time their final directory gets created.

Locking

Adding an element is not a problem because the add() method is atomic.

In order to support multiple reader processes interacting with the same queue, advisory locking is used. Processes should first lock an element before working with it. In fact, the get() and remove() methods report a fatal error if they are called on unlocked elements.

If the process that created the lock dies without unlocking the element, we end up with a staled lock. The purge() method can be used to remove these staled locks.

An element can basically be in only one of two states: locked or unlocked.

A newly created element is unlocked as a writer usually does not need to do anything more with it.

Iterators return all the elements, regardless of their states.

There is no method to get an element state as this information is usually useless since it may change at any time. Instead, programs should directly try to lock elements to make sure they are indeed locked.

Security

There are no specific security mechanisms in this module.

The elements are stored as plain files and directories. The filesystem security features (owner, group, permissions, ACLs...) should be used to adequately protect the data.

By default, the process' umask is respected. See the class constructor documentation if you want an other behavior. If multiple readers and writers with different uids are expected, the easiest solution is to have all the files and directories inside the toplevel directory world-writable (i.e. umask=0). Then, the permissions of the toplevel directory itself (e.g. group-writable) are enough to control who can access the queue.

Author(s):
Massimo Paladin - massimo.paladin@gmail.com
Copyright (C) CERN 2012-2013
 
 public interface Queue extends Iterable<String> {

    

Returns:
the queue id
 
     public String getId();

    
Add data as a string to the queue.

Parameters:
data data to be added to the queue
Returns:
return the element name (<directory_name>/<file_name>)
Throws:
java.io.IOException if any file operation fail
    public String add(String datathrows IOException;

    
Add data as byte array to the queue.

Parameters:
data data to be added to the queue
Returns:
return the element name (<directory_name>/<file_name>)
Throws:
java.io.IOException if any file operation fail
    public String add(byte[] datathrows IOException;

    
Add the given file (identified by its path) to the queue and return the corresponding element name, the file must be on the same filesystem and will be moved to the queue.

Parameters:
path the path of the file to be added
Returns:
return the element name (<directory_name>/<file_name>)
Throws:
java.io.IOException if any file operation fail
    public String addPath(String paththrows IOException;

    
Get locked element as a string.

Parameters:
name the name of the element to be returned
Returns:
return the value associated to the given name
    public String get(String name);

    
Get locked element as a byte array.

Parameters:
name the name of the element to be returned
Returns:
the value associated to the given name
    public byte[] getAsByteArray(String name);

    
Return the path given the name of the element.

Parameters:
name the name of the element
Returns:
the path of the element
    public String getPath(String name);

    
Lock an element in permissive mode.

Parameters:
name name of the element to be locked
Returns:
true on success, false if the element could not be locked
Throws:
java.io.IOException if any file operation fail
    public boolean lock(String namethrows IOException;

    
Lock an element.

Parameters:
name name of the element to be locked
permissive work in permissive mode
Returns:
true on success, false if the element could not be locked
Throws:
java.io.IOException if any file operation fail
    public boolean lock(String nameboolean permissivethrows IOException;

    
Unlock an element in non-permissive mode.

Parameters:
name name of the element to be locked
Returns:
true on success, false if the element could not be unlocked
Throws:
java.io.IOException if any file operation fail
    public boolean unlock(String namethrows IOException;

    
Unlock an element.

Parameters:
name name of the element to be locked
permissive work in permissive mode
Returns:
true on success, false if the element could not be unlocked
Throws:
java.io.IOException if any file operation fail
    public boolean unlock(String nameboolean permissivethrows IOException;

    
Remove a locked element from the queue.

Parameters:
name name of the element to be removed
    public void remove(String name);

    
Return the number of elements in the queue, locked or not (but not temporary).

Returns:
the number of elements in the queue
    public int count();

    
Purge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.

It uses default value for maxTemp and maxLock

Throws:
java.io.IOException if any file operation fail
    public void purge() throws IOException;

    
Purge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.

Parameters:
options map containing purge options, only maxLock and maxTemp values are used, the others are ignored
Throws:
java.io.IOException if any file operation fail
    public void purge(Map<StringIntegeroptionsthrows IOException;

    
Purge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.

Parameters:
maxLock maximum time for a locked element (in seconds, default 600); if set to 0, locked elements will not be unlocked
Throws:
java.io.IOException if any file operation fail
    public void purge(int maxLockthrows IOException;

    
Purge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.

Parameters:
maxTemp maximum time for a temporary element (in seconds, default 300); if set to 0, temporary elements will not be removed
maxLock maximum time for a locked element (in seconds, default 600); if set to 0, locked elements will not be unlocked
Throws:
java.io.IOException if any file operation fail
    public void purge(int maxTempint maxLockthrows IOException;
New to GrepCode? Check out our FAQ X