Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.rabbitmq.client.impl;
  
  import java.util.HashMap;
  import java.util.HashSet;
  import java.util.Map;
  import java.util.Queue;
  import java.util.Set;

This is a generic implementation of the Channels specification in Channeling Work, Nov 2010 (channels.pdf).

Objects of type K must be registered, with registerKey(K), and then they become clients and a queue of items (type W) is stored for each client.

Each client has a state which is exactly one of dormant, in progress or ready. Immediately after registration a client is dormant.

Items may be (singly) added to (the end of) a client's queue with addWorkItem(K,W). If the client is dormant it becomes ready thereby. All other states remain unchanged.

The next ready client, together with a collection of its items, may be retrieved with nextWorkBlock(collection,max) (making that client in progress).

An in progress client can finish (processing a batch of items) with finishWorkBlock(K). It then becomes either dormant or ready, depending if its queue of work items is empty or no.

If a client has items queued, it is either in progress or ready but cannot be both. When work is finished it may be marked ready if there is further work, or dormant if there is not. There is never any work for a dormant client.

A client may be unregistered, with unregisterKey(K), which removes the client from all parts of the state, and any queue of items stored with it. All clients may be unregistered with unregisterAllKeys().

Concurrent Semantics
This implementation is thread-safe.

Implementation Notes
The state is, roughly, as follows:

 pool :: map(K, seq W)
 inProgress :: set K
 ready :: iseq K

where a seq is a sequence (queue or list) and an iseq (i for injective) is a sequence with no duplicates.

State transitions

      finish(k)            -------------
             -----------> | (dormant)   |
            |              -------------
  -------------  next()        | add(item)
 | in progress | <---------    |
  -------------            |   V
            |              -------------
             -----------> | ready       |
      finish(k)            -------------
 
dormant is not represented in the implementation state, and adding items when the client is in progress or ready does not change its state.
 
 public class WorkPool<K, W> {

    
protecting ready, inProgress and pool
 
     private final Object monitor = new Object();
        
An ordered queue of ready clients.
 
         private final SetQueue<K> ready = new SetQueue<K>();
        
The set of clients which have work in progress.
 
         private final Set<K> inProgress = new HashSet<K>();
        
The pool of registered clients, with their work queues.
 
         private final Map<K, LinkedList<W>> pool = new HashMap<K, LinkedList<W>>();

    
Add client key to pool of item queues, with an empty queue. A client is initially dormant.

No-op if key already present.

Parameters:
key client to add to pool
 
     public void registerKey(K key) {
         synchronized (this.) {
             if (!this..containsKey(key)) {
                 this..put(keynew LinkedList<W>());
             }
         }
     }

    
Remove client from pool and from any other state. Has no effect if client already absent.

Parameters:
key of client to unregister
 
     public void unregisterKey(K key) {
         synchronized (this.) {
             this..remove(key);
            this..remove(key);
            this..remove(key);
        }
    }

    
Remove all clients from pool and from any other state.
    public void unregisterAllKeys() {
        synchronized (this.) {
            this..clear();
            this..clear();
            this..clear();
        }
    }

    
Return the next ready client, and transfer a collection of that client's items to process. Mark client in progress.

If there is no ready client, return null.

Parameters:
to collection object in which to transfer items
size max number of items to transfer
Returns:
key of client to whom items belong, or null if there is none.
    public K nextWorkBlock(Collection<W> toint size) {
        synchronized (this.) {
            K nextKey = readyToInProgress();
            if (nextKey != null) {
                LinkedList<W> queue = this..get(nextKey);
                drainTo(queuetosize);
            }
            return nextKey;
        }
    }

    
Private implementation of drainTo (not implemented for LinkedList<W>s).

Parameters:
<W> element type
deList to take (poll) elements from
c to add elements to
maxElements to take from deList
Returns:
number of elements actually taken
    private static <W> int drainTo(LinkedList<W> deListCollection<W> cint maxElements) {
        int n = 0;
        while (n < maxElements) {
            W first = deList.poll();
            if (first == null)
                break;
            c.add(first);
            ++n;
        }
        return n;
    }

    
Add (enqueue) an item for a specific client. No change and returns false if client not registered. If dormant, the client will be marked ready.

Parameters:
key the client to add to the work item to
item the work item to add to the client queue
Returns:
true if and only if the client is marked readyas a result of this work item
Throws:
java.lang.IllegalArgumentException if key not registered.
    public boolean addWorkItem(K key, W item) {
        synchronized (this.) {
            Queue<W> queue = this..get(key);
            if (queue == null) {
                throw new IllegalArgumentException("Client " + key + " not registered");
            }
            queue.offer(item);
            if (isDormant(key)) {
                dormantToReady(key);
                return true;
            }
            return false;
        }
    }

    
Set client no longer in progress. Ignore unknown clients (and return false).

Parameters:
key client that has finished work
Returns:
true if and only if client becomes ready
Throws:
java.lang.IllegalStateException if registered client not in progress
    public boolean finishWorkBlock(K key) {
        synchronized (this.) {
            if (!this.isRegistered(key))
                return false;
            if (!this..contains(key)) {
                throw new IllegalStateException("Client " + key + " not in progress");
            }
            if (moreWorkItems(key)) {
                inProgressToReady(key);
                return true;
            } else {
                inProgressToDormant(key);
                return false;
            }
        }
    }
    private boolean moreWorkItems(K key) {
        LinkedList<W> leList = this..get(key);
        return (leList==null ? false : !leList.isEmpty());
    }
    
    /* State identification functions */
    private boolean isInProgress(K key){ return this..contains(key); }
    private boolean isReady(K key){ return this..contains(key); }
    private boolean isRegistered(K key) { return this..containsKey(key); }
    private boolean isDormant(K key){ return !isInProgress(key) && !isReady(key) && isRegistered(key); }
    /* State transition methods - all assume key registered */
    private void inProgressToReady(K key){ this..remove(key); this..addIfNotPresent(key); };
    private void inProgressToDormant(K key){ this..remove(key); };
    private void dormantToReady(K key){ this..addIfNotPresent(key); };
    /* Basic work selector and state transition step */
    private K readyToInProgress() {
        K key = this..poll();
        if (key != null) {
            this..add(key);
        }
        return key;
    };
New to GrepCode? Check out our FAQ X