Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright (c) 2010-2011. Axon Framework
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
 package org.axonframework.eventhandling;
 
 
 import java.util.List;
 
 import static org.axonframework.eventhandling.YieldPolicy.DO_NOT_YIELD;

Scheduler that keeps track of (Event processing) tasks that need to be executed sequentially.

Parameters:
<T> The type of class representing the processing instruction for the event.
Author(s):
Allard Buijze
Since:
1.0
 
 public abstract class EventProcessingScheduler<T> implements Runnable {
 
     private static final Logger logger = LoggerFactory.getLogger(EventProcessingScheduler.class);
 
     private final ShutdownCallback shutDownCallback;
     private final TransactionManager transactionManager;
     private final Executor executor;
     // guarded by "this"
     private final Queue<T> eventQueue;
     private final List<T> currentBatch = new LinkedList<T>();
     // guarded by "this"
     private boolean isScheduled = false;
     private volatile boolean cleanedUp;
     private volatile long retryAfter;
     private volatile boolean transactionStarted;

    
Initialize a scheduler using the given executor. This scheduler uses an unbounded queue to schedule events.

Parameters:
transactionManager The transaction manager that manages underlying transactions
executor The executor service that will process the events
shutDownCallback The callback to notify when the scheduler finishes processing events
 
     public EventProcessingScheduler(TransactionManager transactionManagerExecutor executor,
                                     ShutdownCallback shutDownCallback) {
         this(transactionManagernew LinkedList<T>(), executorshutDownCallback);
     }

    
Initialize a scheduler using the given executor. The eventQueue is the queue from which the scheduler should obtain it's events. This queue must be thread safe, as it can be used simultaneously by multiple threads.

Parameters:
transactionManager The transaction manager that manages underlying transactions
executor The executor service that will process the events
eventQueue The queue from which this scheduler gets events
shutDownCallback The callback to notify when the scheduler finishes processing events
 
     public EventProcessingScheduler(TransactionManager transactionManagerQueue<T> eventQueueExecutor executor,
                                     ShutdownCallback shutDownCallback) {
         this. = transactionManager;
         this. = eventQueue;
         this. = shutDownCallback;
         this. = executor;
     }

    
Schedules an event for processing. Will schedule a new invoker task if none is currently active.

If the current scheduler is in the process of being shut down, this method will return false.

This method is thread safe

Parameters:
event the event to schedule
Returns:
true if the event was scheduled successfully, false if this scheduler is not available to process events
Throws:
java.lang.IllegalStateException if the queue in this scheduler does not have the capacity to add this event
 
     public synchronized boolean scheduleEvent(T event) {
         if () {
            // this scheduler has been shut down; accept no more events
            return false;
        }
        // add the event to the queue which this scheduler processes
        .add(event);
        scheduleIfNecessary();
        return true;
    }

    
Returns the next event in the queue, if available. If returns false if no further events are available for processing. In that case, it will also set the scheduled status to false.

This method is thread safe

Returns:
the next DomainEvent for processing, of null if none is available
    private synchronized T nextEvent() {
        T e = .poll();
        if (e != null) {
            .add(e);
        }
        return e;
    }

    
Tries to yield to other threads by rescheduling processing of any further queued events. If rescheduling fails, this call returns false, indicating that processing should continue in the current thread.

This method is thread safe

Returns:
true if yielding succeeded, false otherwise.
    private synchronized boolean yield() {
        if (.size() > 0 || .size() > 0) {
            try {
                if ( <= System.currentTimeMillis()) {
                    .execute(this);
                    .info("Processing of event listener yielded.");
                } else {
                    long waitTimeRemaining =  - System.currentTimeMillis();
                    boolean executionScheduled = scheduleDelayedExecution(waitTimeRemaining);
                    if (!executionScheduled) {
                        .warn("The provided executor does not seem to support delayed execution. Scheduling for "
                                            + "immediate processing and expecting processing to wait if scheduled to soon.");
                        .execute(this);
                    }
                }
            } catch (RejectedExecutionException e) {
                .info("Processing of event listener could not yield. Executor refused the task.");
                return false;
            }
        } else {
            cleanUp();
        }
        return true;
    }
    private boolean scheduleDelayedExecution(long waitTimeRemaining) {
        if ( instanceof ScheduledExecutorService) {
            .info("Executor supports delayed executing. Rescheduling for processing in {} millis",
                        waitTimeRemaining);
            ((ScheduledExecutorService).schedule(thiswaitTimeRemaining.);
            return true;
        }
        return false;
    }

    
Will look at the current scheduling status and schedule an EventHandlerInvokerTask if none is already active.

This method is thread safe

    private synchronized void scheduleIfNecessary() {
        if (!) {
             = true;
            .execute(this);
        }
    }

    
Returns the number of events currently queued for processing.

Returns:
the number of events currently queued for processing.
    private synchronized int queuedEventCount() {
        return .size();
    }

    
    @Override
    public void run() {
        boolean mayContinue = true;
        final TransactionStatus status = new TransactionStatus();
        TransactionStatus.set(status);
        while (mayContinue) {
            processOrRetryBatch(status);
            boolean inRetryMode =
                    !status.isSuccessful() && status.getRetryPolicy() != .;
            /*
             * Only continue processing in the current thread if:
             * - all of the following
             *   - not in retry mode
             *   - there are events waiting in the queue
             *   - the yielding policy is DO_NOT_YIELD
             * - or
             *   - yielding failed because the executor rejected the execution
             */
            mayContinue = (!inRetryMode && queuedEventCount() > 0 && .equals(status.getYieldPolicy()))
                    || !yield();
            status.resetTransactionStatus();
        }
        TransactionStatus.clear();
    }
    private void waitUntilAllowedStartingTime() {
        long waitTimeRemaining =  - System.currentTimeMillis();
        if (waitTimeRemaining > 0) {
            try {
                .warn("Event processing started before delay expired. Forcing thread to sleep for {} millis.",
                            waitTimeRemaining);
                Thread.sleep(waitTimeRemaining);
            } catch (InterruptedException e) {
                .warn("Thread was interrupted while waiting for retry. Scheduling for immediate retry.");
                Thread.currentThread().interrupt();
            } finally {
                 = 0;
            }
        }
    }
    private void processOrRetryBatch(TransactionStatus status) {
        try {
            this. = false;
            if (.isEmpty()) {
                handleEventBatch(status);
            } else {
                .warn("Retrying {} events from the previous failed transaction.".size());
                retryEventBatch(status);
                .warn("Continuing regular processing of events.");
                handleEventBatch(status);
            }
            if () {
                .afterTransaction(status);
            }
            .clear();
        } catch (Exception e) {
            // the batch failed.
            prepareBatchRetry(statuse);
        }
    }
    private synchronized void prepareBatchRetry(TransactionStatus statusException e) {
        status.markFailed(e);
        tryAfterTransactionCall(status);
        switch (status.getRetryPolicy()) {
            case :
                markLastEventForRetry();
                .warn("Transactional event processing batch failed. Rescheduling last event for retry."e);
                break;
            case :
                .error("Transactional event processing batch failed. Ignoring failed event."e);
                .clear();
                status.setRetryInterval(0);
                break;
            case :
                .warn("Transactional event processing batch failed. "e);
                .warn("Retrying entire batch of {} events, with {} more in queue.",
                            .size(),
                            queuedEventCount());
                break;
        }
        this. = System.currentTimeMillis() + status.getRetryInterval();
    }
    private void tryAfterTransactionCall(TransactionStatus status) {
        try {
            .afterTransaction(status);
        } catch (Exception e) {
            .warn("Call to afterTransaction method of failed transaction resulted in an exception."e);
        }
         = false;
    }
    private void markLastEventForRetry() {
        T lastEvent = .get(.size() - 1);
        .clear();
        if (lastEvent != null) {
            .add(lastEvent);
        }
    }
    private void retryEventBatch(TransactionStatus status) {
        for (T event : this.) {
            startTransactionIfNecessary(status);
            doHandle(event);
            status.recordEventProcessed();
        }
        .clear();
    }

    
Does the actual processing of the event. This method is invoked if the scheduler has decided this event is up next for execution. Implementation should not pass this scheduling to an asynchronous executor

Parameters:
event The event to handle
    protected abstract void doHandle(T event);
    private void handleEventBatch(TransactionStatus status) {
        T event;
        while (!status.isTransactionSizeReached() && (event = nextEvent()) != null) {
            startTransactionIfNecessary(status);
            doHandle(event);
            status.recordEventProcessed();
        }
    }
    private void startTransactionIfNecessary(TransactionStatus status) {
        if (!) {
            .beforeTransaction(status);
             = true;
        }
    }
    private synchronized void cleanUp() {
         = false;
         = true;
        .afterShutdown(this);
    }

    
Callback that allows the SequenceManager to receive a notification when this scheduler finishes processing events.
    interface ShutdownCallback {

        
Called when event processing is complete. This means that there are no more events waiting and the last transactional batch has been committed successfully.

Parameters:
scheduler the scheduler that completed processing.
        void afterShutdown(EventProcessingScheduler scheduler);
    }
New to GrepCode? Check out our FAQ X