Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   * http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.gh.bmd.jrt.runner;
 
 
 import java.util.Map;
 
Class providing ordering of executions based on priority.
Each class instance wraps a supporting runner and then provides different runner instances, each one enqueuing executions with a specific priority.

Each enqueued execution will age each time an higher priority one takes the precedence, so that older executions slowly increases their priority. Such mechanism has been implemented to avoid starvation of low priority executions. Hence, when assigning priority to different runners, it is important to keep in mind that the difference between two priorities corresponds to the maximum age the lower priority execution will have, before getting precedence over the higher priority one.

Note that the queue is not shared between different instances of this class.

Created by davide-maestroni on 28/04/15.

 
 public class PriorityRunner {
 
     private static final PriorityExecutionComparator PRIORITY_EXECUTION_COMPARATOR =
             new PriorityExecutionComparator();
 
     private static final WeakIdentityHashMap<RunnerPriorityRunnersRunnerMap =
             new WeakIdentityHashMap<RunnerPriorityRunner>();
 
     private final AtomicLong mAge = new AtomicLong(. - .);
 
             Collections.synchronizedMap(
                     new WeakIdentityHashMap<PriorityExecutionDelayedExecution>());
 
             new WeakIdentityHashMap<ExecutionWeakHashMap<PriorityExecutionVoid>>();
 
     private final PriorityBlockingQueue<PriorityExecutionmQueue;
 
     private final TemplateExecution mExecution = new TemplateExecution() {
 
         public void run() {
 
             final PriorityExecution execution = .poll();
 
             if (execution != null) {
 
                 execution.run();
             }
         }
     };
 
     private final Runner mRunner;
 
     private final WeakHashMap<QueuingRunnerVoidmRunnerMap =
             new WeakHashMap<QueuingRunnerVoid>();

    
Constructor.

Parameters:
wrapped the wrapped instance.
 
     @SuppressWarnings("ConstantConditions")
     private PriorityRunner(@Nonnull final Runner wrapped) {
 
         if (wrapped == null) {
 
             throw new NullPointerException("the wrapped runner must not be null");
         }
 
          = wrapped;
     }
 
     @Nonnull
    static PriorityRunner getInstance(@Nonnull final Runner wrapped) {
        if (wrapped instanceof QueuingRunner) {
            return ((QueuingRunnerwrapped).enclosingRunner();
        }
        synchronized () {
            final WeakIdentityHashMap<RunnerPriorityRunnerrunnerMap = ;
            PriorityRunner runner = runnerMap.get(wrapped);
            if (runner == null) {
                runner = new PriorityRunner(wrapped);
                runnerMap.put(wrappedrunner);
            }
            return runner;
        }
    }
    private static int compareLong(long xlong y) {
        return (x < y) ? -1 : ((x == y) ? 0 : 1);
    }

    
Returns a runner enqueuing executions with the specified priority.

Parameters:
priority the execution priority.
Returns:
the runner instance.
    @Nonnull
    public Runner getRunner(final int priority) {
        synchronized () {
            final WeakHashMap<QueuingRunnerVoidrunnerMap = ;
            for (final QueuingRunner runner : runnerMap.keySet()) {
                if (runner.mPriority == priority) {
                    return runner;
                }
            }
            final QueuingRunner runner = new QueuingRunner(priority);
            runnerMap.put(runnernull);
            return runner;
        }
    }

    
Execution implementation delaying the enqueuing of the priority execution.
    private static class DelayedExecution implements Execution {
        private final PriorityExecution mExecution;
        private final PriorityBlockingQueue<PriorityExecutionmQueue;

        
Constructor.

Parameters:
queue the queue.
execution the priority execution.
        private DelayedExecution(@Nonnull final PriorityBlockingQueue<PriorityExecutionqueue,
                @Nonnull final PriorityExecution execution) {
             = queue;
             = execution;
        }
        public boolean mayBeCanceled() {
            return .mayBeCanceled();
        }
        public void run() {
            final PriorityBlockingQueue<PriorityExecutionqueue = ;
            queue.put();
            final PriorityExecution execution = queue.poll();
            if (execution != null) {
                execution.run();
            }
        }
    }

    
Execution implementation providing a comparison based on priority and the wrapped execution age.
    private static class PriorityExecution implements Execution {
        private final long mAge;
        private final Execution mExecution;
        private final int mPriority;

        
Constructor.

Parameters:
execution the wrapped execution.
priority the execution priority.
age the execution age.
        private PriorityExecution(@Nonnull final Execution executionfinal int priority,
                final long age) {
             = execution;
             = priority;
             = age;
        }
        public boolean mayBeCanceled() {
            return .mayBeCanceled();
        }
        public void run() {
            .run();
        }
    }

    
Comparator of priority execution instances.
    private static class PriorityExecutionComparator
            implements Comparator<PriorityExecution>, Serializable {
        // just don't care...
        private static final long serialVersionUID = -1;
        public int compare(final PriorityExecution o1final PriorityExecution o2) {
            final int thisPriority = o1.mPriority;
            final long thisAge = o1.mAge;
            final int thatPriority = o2.mPriority;
            final long thatAge = o2.mAge;
            final int compare = compareLong(thatAge + thatPrioritythisAge + thisPriority);
            return (compare == 0) ? compareLong(thatAgethisAge) : compare;
        }
    }

    
Enqueuing runner implementation.
    private class QueuingRunner implements Runner {
        private final int mPriority;

        
Constructor.

Parameters:
priority the execution priority.
        private QueuingRunner(final int priority) {
             = priority;
        }
        public void cancel(@Nonnull final Execution execution) {
            synchronized () {
                final WeakHashMap<PriorityExecutionVoidpriorityExecutions =
                        .remove(execution);
                if (priorityExecutions != null) {
                    final Runner runner = ;
                    final PriorityBlockingQueue<PriorityExecutionqueue = ;
                    final Map<PriorityExecutionDelayedExecutiondelayedExecutions =
                            ;
                    for (final PriorityExecution priorityExecution : priorityExecutions.keySet()) {
                        if (!queue.remove(priorityExecution)) {
                            runner.cancel(delayedExecutions.remove(priorityExecution));
                        }
                    }
                }
            }
        }
        public boolean isExecutionThread() {
            return .isExecutionThread();
        }
        public void run(@Nonnull final Execution executionfinal long delay,
                @Nonnull final TimeUnit timeUnit) {
            final boolean mayBeCanceled = execution.mayBeCanceled();
            final PriorityExecution priorityExecution =
                    new PriorityExecution(execution.getAndDecrement());
            if (mayBeCanceled) {
                synchronized () {
                    final WeakIdentityHashMap<ExecutionWeakHashMap<PriorityExecutionVoid>>
                            executions = ;
                    WeakHashMap<PriorityExecutionVoidpriorityExecutions =
                            executions.get(execution);
                    if (priorityExecutions == null) {
                        priorityExecutions = new WeakHashMap<PriorityExecutionVoid>();
                        executions.put(executionpriorityExecutions);
                    }
                    priorityExecutions.put(priorityExecutionnull);
                }
            }
            if (delay == 0) {
                .put(priorityExecution);
                .run(, 0, timeUnit);
            } else {
                final DelayedExecution delayedExecution =
                        new DelayedExecution(priorityExecution);
                if (mayBeCanceled) {
                    .put(priorityExecutiondelayedExecution);
                }
                .run(delayedExecutiondelaytimeUnit);
            }
        }
        private PriorityRunner enclosingRunner() {
            return PriorityRunner.this;
        }
    }
New to GrepCode? Check out our FAQ X