Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright (c) 2011. Axon Framework
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   * http://www.apache.org/licenses/LICENSE-2.0
   *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
 package org.axonframework.eventhandling;
 
 
Abstract implementation that schedules tasks for execution. This implementation allows for certain tasks to be executed sequentially, while other (groups of) tasks are processed in parallel.

Parameters:
<T> The type of object defining the task
Author(s):
Allard Buijze
Since:
1.0
 
 public abstract class AsynchronousExecutionWrapper<T> {
 
     private static final Logger logger = LoggerFactory.getLogger(AsynchronousExecutionWrapper.class);
     private final Executor executor;
     private final ConcurrentMap<ObjectEventProcessingScheduler<T>> transactions =
             new ConcurrentHashMap<ObjectEventProcessingScheduler<T>>();
     private final SequencingPolicy<? super T> sequencingPolicy;
     private final BlockingQueue<T> concurrentEventQueue = new LinkedBlockingQueue<T>();
     private final TransactionManager transactionManager;

    
Initialize the AsynchronousExecutionWrapper using the given executor and transactionManager. The transaction manager is used to start and stop any underlying transactions necessary for task processing.

Parameters:
transactionManager The transaction manager that will manage underlying transactions for this task
sequencingPolicy The sequencing policy for concurrent execution of tasks
executor The executor that processes the tasks
 
     public AsynchronousExecutionWrapper(Executor executorTransactionManager transactionManager,
                                         SequencingPolicy<? super T> sequencingPolicy) {
         this. = executor;
         this. = transactionManager;
         this. = sequencingPolicy;
     }

    
Initialize the AsynchronousExecutionWrapper using the given executor.

Note that the underlying bean will not be notified of any transactions.

Parameters:
sequencingPolicy The sequencing policy for concurrent execution of tasks
executor The executor that processes the tasks
See also:
AsynchronousExecutionWrapper(java.util.concurrent.Executor,org.axonframework.eventhandling.TransactionManager,org.axonframework.eventhandling.SequencingPolicy)
 
     public AsynchronousExecutionWrapper(Executor executorSequencingPolicy<? super T> sequencingPolicy) {
         this(executornew NoTransactionManager(), sequencingPolicy);
     }

    
Does the actual processing of the task. This method is invoked if the scheduler has decided this task is up next for execution. Implementation should not pass this scheduling to an asynchronous executor.

Parameters:
task The task to handle
 
     protected abstract void doHandle(T task);

    
Schedules this task for execution when all pre-conditions have been met.

Parameters:
task The task to schedule for processing.
 
     protected void schedule(T task) {
         final Object sequenceIdentifier = .getSequenceIdentifierFor(task);
         if (sequenceIdentifier == null) {
             .debug("Scheduling task of type [{}] for full concurrent processing",
                          task.getClass().getSimpleName());
             EventProcessingScheduler<T> scheduler = newProcessingScheduler(new NoActionCallback(),
                                                                            this.);
             scheduler.scheduleEvent(task);
         } else {
             .debug("Scheduling task of type [{}] for sequential processing in group [{}]",
                         task.getClass().getSimpleName(),
                         sequenceIdentifier.toString());
            assignEventToScheduler(tasksequenceIdentifier);
        }
    }
    private void assignEventToScheduler(T taskObject sequenceIdentifier) {
        boolean taskScheduled = false;
        while (!taskScheduled) {
            EventProcessingScheduler<T> currentScheduler = .get(sequenceIdentifier);
            if (currentScheduler == null) {
                .putIfAbsent(sequenceIdentifier,
                                         newProcessingScheduler(new TransactionCleanUp(sequenceIdentifier)));
            } else {
                taskScheduled = currentScheduler.scheduleEvent(task);
                if (!taskScheduled) {
                    // we know it can be cleaned up.
                    .remove(sequenceIdentifiercurrentScheduler);
                }
            }
        }
    }

    
Creates a new scheduler instance that schedules tasks on the executor service for the managed EventListener.

Parameters:
shutDownCallback The callback that needs to be notified when the scheduler stops processing.
Returns:
a new scheduler instance
            EventProcessingScheduler.ShutdownCallback shutDownCallback) {
        .debug("Initializing new processing scheduler.");
        return newProcessingScheduler(shutDownCallbacknew LinkedList<T>());
    }

    
Creates a new scheduler instance schedules tasks on the executor service for the managed EventListener. The Scheduler must get tasks from the given taskQueue.

Parameters:
shutDownCallback The callback that needs to be notified when the scheduler stops processing.
taskQueue The queue from which this scheduler should store and get tasks
Returns:
a new scheduler instance
            EventProcessingScheduler.ShutdownCallback shutDownCallback,
            Queue<T> taskQueue) {
        return new EventProcessingScheduler<T>(taskQueueshutDownCallback) {
            @Override
            protected void doHandle(T task) {
                AsynchronousExecutionWrapper.this.doHandle(task);
            }
        };
    }
    private static class NoActionCallback implements EventProcessingScheduler.ShutdownCallback {

        
        @Override
        public void afterShutdown(EventProcessingScheduler scheduler) {
        }
    }
    private final class TransactionCleanUp implements EventProcessingScheduler.ShutdownCallback {
        private final Object sequenceIdentifier;
        private TransactionCleanUp(Object sequenceIdentifier) {
            this. = sequenceIdentifier;
        }

        
        @Override
        public void afterShutdown(EventProcessingScheduler scheduler) {
            .debug("Cleaning up processing scheduler for sequence [{}]".toString());
            .remove(scheduler);
        }
    }
New to GrepCode? Check out our FAQ X