Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.fasterxml.transistore.service;
  
  import java.util.Random;
Helper class used by BasicTSOperationThrottler to implement throttling logic for contested read/write operations (mostly for file system, although theoretically also for DBs if necessary).

Basic idea is simple: when things are going smoothly, we will allow a reasonable number of concurrent reads and/or writes to proceed without throttling. But if limit on either is reached, queuing is used to apply specific ratio to try to avoid starving of either reads or writes, by basically fixing ratio in which queue is drained.

 
 public final class ReadWriteOperationPrioritizer
 {
 	// By default, we will use 2:1 ratio between allowing queued reads vs writes
     private final static double READ_RATIO = 2.0 / 3.0;
 
     private final static double WRITE_RATIO = 1.0 - ;
 
     protected final Lease READ_LEASE;
 
     protected final Lease WRITE_LEASE;
     
     {
         final Operation reads = new Operation("Read", 3, 6, );
         final Operation writes = new Operation("Write", 2, 5, );
 
         final int _maxConcurrentThreads = 8;
 
         // We will use a global lock for updating state of currently
         // active entries; it is shared by this class and {@link Lease}.
         final Object SCHEDULE_LOCK = new Object();
         
          = new Lease(SCHEDULE_LOCK_maxConcurrentThreadsreadswrites, 1);
          = new Lease(SCHEDULE_LOCK_maxConcurrentThreadswritesreads, 2);
     }
 
     public final Lease obtainReadLease() throws InterruptedException {
         return .obtainLease();
     }
 
     public final Lease obtainWriteLease() throws InterruptedException {
         return .obtainLease();
     }
     
     /*
     /**********************************************************************
     /* Helper classes: lease handling
     /**********************************************************************
      */
    
    
Objects used for returning leases.
 
     final static class Lease
     {
         protected final Object _lock;
 
         protected final int _maxConcurrentThreads;
        
        
To get statistical distribution based on ratios, we need to use (pseudo)random numbers.
 
         protected final Random _rnd;
         
         protected final Operation _primary;
 
         protected final Operation _secondary;
 
         public Lease(Object lockint maxConcOperation primOperation seclong rndSeed)
         {
              = lock;
              = maxConc;
              = new Random(rndSeed);
              = prim;
              = sec;
         }
 
         public Lease obtainLease() throws InterruptedException
         {
             CountDownLatch latch;
 
             synchronized () {
                 // First: perhaps we have uncontested operations?
                 if (.canProceedWithoutQueueing()) {
                     return this;
                 }
                 // If not, queue it up...
                 latch = .queueOperation();
             }
             // ... and wait for it to obtain the lease that way...
            latch.await();
            return this;
        }
        
        public void returnLease() {
            synchronized () {
            	int primaryCount = .markCompleted();
            	boolean q1 = .couldReleaseQueued(primaryCount);
                boolean q2 = .couldReleaseQueued();
            	
            	/* 23-Jul-2013, tatu: I don't think there's strict need to loop here;
            	 *  but let's play it safe and do that.
            	 */
            	do {
	                if (q1) { // has an entry in primary queue
	                	if (q2) { // and in secondary -- choose one, with bias:
	                		double rnd = .nextDouble();
	                		if (rnd < .getWeight()) {
	                    		.releaseFromQueue();
	                		} else {
	                			.releaseFromQueue();
	                		}
	                	} else { // no, just primary
	                		.releaseFromQueue();
	                	}
	                } else if (q2) { // none in primary, but got secondary
	                	.releaseFromQueue();
	                } else {
	                	break;
	                }
	                q1 = .couldReleaseQueued();
	                q2 = .couldReleaseQueued();
            	} while (q1 | q2);
            }
        }
    }
    
    /*
    /**********************************************************************
    /* Helper classes: operation modelling
    /**********************************************************************
     */
    
    final static class Operation
    {
        
Just need to have enough room for any number of elements ever.
        private final static int MAX_QUEUED = 600;
        public final String _desc;
        
        public final int _guaranteedOperations;
        
        public final int _maxOperations;

        
Relative weight of operations of this type, normalized to be within range of 0.0 and 1.0.
        public final double _weight;
        
        
Number of operations being actively executed currently.
        protected final AtomicInteger _activeCount = new AtomicInteger(0);
        protected final ArrayBlockingQueue<CountDownLatch_queued;
        
        public Operation(String descint guarint maxdouble w)
        {
             = desc;
             = guar;
             = max;
             = w;
             = new ArrayBlockingQueue<CountDownLatch>();
        }
        public double getWeight() {
        	return ;
        }
        public int getActive() {
            return .get();
        }
        public CountDownLatch queueOperation()
        {
            CountDownLatch latch = new CountDownLatch(1);
            if (!.offer(latch)) { // should never occur
                throw new IllegalStateException("INTERNAL ERROR: Can not queue more "
                        ++" operations, queue full: "+);
            }
            return latch;
        }

        
Method called when Operation of this type completed; just needs to subtract counter.
        public int markCompleted() {
        	return .addAndGet(-1);
        }
        
        public boolean isQueueEmpty() {
        	return .isEmpty();
        }
        public boolean couldReleaseQueued(int currentCount) {
        	if (isQueueEmpty()) { // nothing queued, nothing to release
        		return false;
        	}
        	// but also need to have room for one more:
        	return currentCount < ;
        }
        public boolean couldReleaseQueued()
        {
        	if (isQueueEmpty()) { // nothing queued, nothing to release
        		return false;
        	}
        	// but also need to have room for one more:
        	return .get() < ;
        }
        public boolean canProceedWithoutQueueing(int maxConcurrent,
                Operation otherQueue)
        {
        	// First rule: both queues must be empty, before proceeding
            if (isQueueEmpty() && otherQueue.isQueueEmpty()) {
                int count = .get();
                // guaranteed slots are free for taking
                if (count >= ) {
	                // otherwise, perhaps we can just use "at-large" slots?
	                // but not beyond max per operation
	                if (count >= ) {
	                    return false;
	                }
	                int total = count + otherQueue.getActive();
	                if (total >= maxConcurrent) {
	                    return false;
	                }
                }
                // Either way, yes, we are ready to proceed:
                .addAndGet(1);
                return true;
            }
            return false;
        }
        public int releaseFromQueue()
        {
        	CountDownLatch l = .poll();
        	if (l == null) { // sanity check; should never occur
                throw new IllegalStateException("INTERNAL ERROR: failed to release from queue of "
                        ++" operations, queue empty");
        	}
        	int count = .addAndGet(1);
        	l.countDown();
        	return count;
        }
    }
New to GrepCode? Check out our FAQ X