Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * JBoss, Home of Professional Open Source
   * Copyright 2007, Red Hat Middleware LLC, and individual contributors
   * as indicated by the @author tags.
   * See the copyright.txt in the distribution for a
   * full listing of individual contributors.
   * This copyrighted material is made available to anyone wishing to use,
   * modify, copy, or redistribute it subject to the terms and conditions
   * of the GNU Lesser General Public License, v. 2.1.
  * This program is distributed in the hope that it will be useful, but WITHOUT A
  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
  * You should have received a copy of the GNU Lesser General Public License,
  * v.2.1 along with this distribution; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  * MA  02110-1301, USA.
  *
  * (C) 2005-2007,
  * @author JBoss Inc.
  */
 /*
  * Copyright (C) 1998, 1999, 2000, 2001,
  *
  * Arjuna Solutions Limited,
  * Newcastle upon Tyne,
  * Tyne and Wear,
  * UK.
  *
  * $Id: TransactionReaper.java 2342 2006-03-30 13:06:17Z  $
  */
 
 package com.arjuna.ats.arjuna.coordinator;
 
 import java.util.List;
 
Class to record transactions with non-zero timeout values, and class to implement a transaction reaper thread which terminates these transactions once their timeout elapses.

Author(s):
Mark Little (mark@arjuna.com)
Version:
$Id: TransactionReaper.java 2342 2006-03-30 13:06:17Z $
Since:
JTS 1.0.
 
 
 public class TransactionReaper
 {
 
     public static final String NORMAL = "NORMAL";
 
     public static final String DYNAMIC = "DYNAMIC";
 
     public static final String PERIODIC = "PERIODIC"// the new name for 'NORMAL'
 
     private TransactionReaper(long checkPeriod)
     {
         if (..isTraceEnabled()) {
             ..trace("TransactionReaper::TransactionReaper ( " + checkPeriod
                     + " )");
         }
 
          = checkPeriod;
     }
 
     public final long checkingPeriod()
     {
         if () {
             return .get() - System.currentTimeMillis();
         } else {
             // if we have a cancel in progress which needs
             // checking up on then we have to wake up in time
             // for it whether we are using a static or
             // dynamic model
 
             final ReaperElement head = .getFirst();
             if(head != null) {
                 if (head._status != .) {
                     long waitTime = head.getAbsoluteTimeout() - System.currentTimeMillis();
                     if (waitTime < ) {
                         return waitTime;
                     }
                 }
             }
 
             return ;
         }
    }

    
process all entries in the timeout queue which have expired. entries for newly expired transactions are passed to a worker thread for cancellation and requeued for subsequent progress checks. the worker is given a kick if such checks find it is wedged.

Timeout is given in milliseconds.

    public final void check()
    {
        if (..isTraceEnabled()) {
            ..trace("TransactionReaper::check ()");
        }
        do {
            final ReaperElement reaperElement;
            synchronized(this) {
                final long now = System.currentTimeMillis();
                final long next = .get();
                if (..isTraceEnabled()) {
                    ..trace("TransactionReaper::check - comparing " + Long.toString(next));
                }
                if (now < next) {
                    break;
                }
                reaperElement = .getFirst();
                // TODO close window where first can change - maybe record nextDynamicCheckTime before probing first,
                // then use compareAndSet? Although something will need to check before sleeping anyhow...
                if (reaperElement == null) {
                    .set(.);
                    return;
                } else {
                    final long nextTimeout = reaperElement.getAbsoluteTimeout();
                    if(nextTimeout > now) {
                        .set(nextTimeout);
                        return// nothing to do yet.
                    }
                }
            }
            ..warn_coordinator_TransactionReaper_18(reaperElement._control.get_uid(), reaperElement.statusName());
            // if we have to synchronize on multiple objects we always
            // do so in a fixed order ReaperElement before Reaper and
            // ReaperElement before Reaper._cancelQueue in order to
            // ensure we don't deadlock. We never sychronize on the
            // reaper and the cancel queue at the same time.
            synchronized(reaperElement) {
                switch (reaperElement._status) {
                    case .: {
                        // this tx has just timed out. remove it from the
                        // TX list, update the timeout to take account of
                        // cancellation period and reinsert as a cancelled
                        // TX. this ensures we process it again if it does
                        // not get cancelled in time
                        reaperElement._status = .;
                        reinsertElement(reaperElement);
                        if (..isTraceEnabled()) {
                            ..trace("Reaper scheduling TX for cancellation " + reaperElement._control.get_uid());
                        }
                        // insert into cancellation queue for a worker
                        // thread to process and then make sure a worker
                        // thread is awake
                        synchronized() {
                            .add(reaperElement);
                            .notifyAll();
                        }
                    }
                    break;
                    case .: {
                        // hmm, a worker is taking its time to
                        // start processing this scheduled entry.
                        // we may just be running slow ... but the
                        // worker may be wedged under a cancel for
                        // some other TX. add an extra delay to
                        // give the worker more time to complete
                        // its current task and progress this
                        // entry to the CANCEL state. if the
                        // worker *is* wedged then this will
                        // ensure the wedged TX entry comes to the
                        // front of the queue.
                        reinsertElement(reaperElement);
                        if (..isTraceEnabled()) {
                            ..trace("Reaper deferring interrupt for TX scheduled for cancel " + reaperElement._control.get_uid());
                        }
                    }
                    break;
                    case .: {
                        // ok, the worker must be wedged under a
                        // call to cancel() -- kick the thread and
                        // reschedule the element for a later
                        // check to ensure the thread responded to
                        // the kick
                        reaperElement._status = .;
                        reaperElement._worker.interrupt();
                        reinsertElement(reaperElement);
                        // log that we interrupted cancel()
                        if (..isTraceEnabled()) {
                            ..trace("TransactionReaper::check interrupting cancel in progress for " + reaperElement._control.get_uid());
                        }
                    }
                    break;
                    case .: {
                        // cancellation got truly wedged -- mark
                        // the element as a zombie so the worker
                        // exits when (if?) it wakes up and create
                        // a new worker thread to handle further
                        // cancellations. then mark the
                        // transaction as rollback only.
                        reaperElement._status = .;
                        synchronized(this) {
                            ++;
                            if (..isTraceEnabled()) {
                                ..trace("Reaper " + Thread.currentThread() + " got a zombie " + reaperElement._worker + " (zombie count now " +  + ") cancelling " + reaperElement._control.get_uid());
                            }
                            if ( == ) {
                                // log zombie overflow error call()
                                ..error_coordinator_TransactionReaper_5(Integer.toString());
                            }
                        }
                         = new ReaperWorkerThread(.);
                        .setDaemon(true);
                        .start();
                        // log a failed cancel()
                        ..warn_coordinator_TransactionReaper_6(reaperElement._worker.toString(),
                                reaperElement._control.get_uid());
                        // ok, since the worker was wedged we need to
                        // remove the entry from the timeouts and
                        // transactions lists then mark this tx as
                        // rollback only. we have to log a message
                        // whether we succeed, fail or get interrupted
                        removeElementReaper(reaperElement);
                        try {
                            if (reaperElement._control.preventCommit()) {
                                // log a successful preventCommit()
                                ..warn_coordinator_TransactionReaper_10(reaperElement._control.get_uid());
                                notifyListeners(reaperElement._controlfalse);
                            } else {
                                // log a failed preventCommit()
                                ..warn_coordinator_TransactionReaper_11(reaperElement._control.get_uid());
                            }
                        }
                        catch (Exception e1) {
                            // log an exception under preventCommit()
                            ..warn_coordinator_TransactionReaper_12(reaperElement._control.get_uid(), e1);
                        }
                    }
                    break;
                    case .:
                    case .: {
                        // ok, the worker should remove the tx
                        // from the transactions queue very soon
                        // but we need to progress to the next
                        // entry so we will steal in and do it
                        // first
                        removeElementReaper(reaperElement);
                    }
                    break;
                }
            }
        } while (true);
    }

    
called by check, this method removes and reinserts an element in the timeout ordered set, recalculating the next wakeup time accordingly.
    private void reinsertElement(ReaperElement elong delay)
    {
        synchronized (this) {
            long newWakeup = .reorder(edelay);
            .set(newWakeup); // TODO - set should be atomic with reorder?
        }
    }
    public final void waitForCancellations()
    {
        synchronized () {
            try {
                while (.isEmpty()) {
                    .wait();
                }
            }
            catch (InterruptedException e) {
            }
        }
    }
    public final void doCancellations()
    {
        for (; ;) {
            ReaperElement e;
            // see if we have any cancellations to process
            synchronized () {
                try {
                    e = .remove(0);
                }
                catch (IndexOutOfBoundsException ioobe) {
                    break;
                }
            }
            // ok, current status must be SCHEDULE_CANCEL.
            // progress state to CANCEL and call cancel()
            if (..isTraceEnabled()) {
                ..trace("Reaper Worker " + Thread.currentThread() + " attempting to cancel " + e._control.get_uid());
            }
            boolean cancelled = false;
            Exception exception = null;
            synchronized (e) {
                e._worker = Thread.currentThread();
                e._status = .;
                e.notifyAll();
            }
            // we are now exposed to at most one interrupt from
            // the reaper. test for running and try the cancel if
            // required
            try {
                if (e._control.running()) {
                    // try to cancel the transaction
                    if (e._control.cancel() == .) {
                        cancelled = true;
                        if (TxStats.enabled()) {
                            // note that we also count timeouts as application rollbacks via
                            // the stats unpdate in the TwoPhaseCoordinator cancel() method.
                            TxStats.getInstance().incrementTimeouts();
                        }
                        notifyListeners(e._controltrue);
                    }
                }
            }
            catch (Exception e1) {
                exception = e1;
            }
            // ok, close the interrupt window by resetting the
            // state -- unless we have been told to go away by
            // being set to ZOMBIE
            synchronized (e) {
                if (e._status == .) {
                    // we need to decrement the zombie count and
                    // force an immediate thread exit. the reaper
                    // will have removed the entry from the
                    // transactions list and started another
                    // worker thread.
                    ReaperWorkerThread worker = (ReaperWorkerThread) Thread.currentThread();
                    worker.shutdown();
                    synchronized(this) {
                        --;
                    }
                    ..warn_coordinator_TransactionReaper_13(Thread.currentThread().toString(),
                            e._control.get_uid(), Integer.toString());
                    // this gets us out of the for(;;) loop and
                    // the shutdown call above makes sure we exit
                    // after returning
                    break;
                } else if (cancelled &&
                        e._status == .) {
                    // ok the call to cancel() returned true but
                    // we cannot trust it because the reaper sent
                    // the thread an interrupt
                    cancelled = false;
                    e._status = .;
                    e.notifyAll();
                } else {
                    e._status = (cancelled
                            ? .
                            : .);
                    e.notifyAll();
                }
            }
            // log a message notifying success, failure or
            // exception during cancel(), remove the element from
            // the transactions queue and mark TX as rollback only
            if (cancelled) {
                        e._control.get_uid());
            } else if (e._control.running()) {
                if (exception != null) {
                    ..warn_coordinator_TransactionReaper_9(Thread.currentThread().toString(), e._control.get_uid(), exception);
                } else {
                    ..warn_coordinator_TransactionReaper_8(Thread.currentThread().toString(),
                            e._control.get_uid());
                }
                try {
                    if (e._control.preventCommit()) {
                        // log a successful preventCommit()
                        ..warn_coordinator_TransactionReaper_14(Thread.currentThread().toString(),
                                e._control.get_uid());
                        notifyListeners(e._controlfalse);
                    } else {
                        // log a failed preventCommit()
                        ..warn_coordinator_TransactionReaper_15(Thread.currentThread().toString(),
                                e._control.get_uid());
                    }
                }
                catch (Exception e1) {
                    // log an exception under preventCommit()
                    ..warn_coordinator_TransactionReaper_16(Thread.currentThread().toString(), e._control.get_uid(), e1);
                }
            }
            removeElementReaper(e);
        }
    }

    

Returns:
the number of items in the reaper's list.
Since:
JTS 2.2. Note: this is a) expensive and b) an approximation. Should be called only by test code.
    public final long numberOfTransactions()
    {
        return .size();
    }

    
Return the number of timeouts registered. Note: this is a) expensive and b) an approximation. Should be called only by test code.

Returns:
The number of timeouts registered.
    public final long numberOfTimeouts()
    {
        return .size();
    }
    public final void addListener(ReaperMonitor listener)
    {
        .add(listener);
    }
    public final boolean removeListener(ReaperMonitor listener)
    {
        return .remove(listener);
    }

    
timeout is given in seconds, but we work in milliseconds. Attempting to insert an element that is already present is an error (IllegalStateException)
    public final void insert(Reapable controlint timeout)
    {
        if (..isTraceEnabled()) {
            ..trace("TransactionReaper::insert ( " + control + ", " + timeout
                    + " )");
        }
        /*
         * Ignore if the timeout is zero, since this means the transaction
         * should never timeout.
         */
        if (timeout == 0)
            return;
        ReaperElement reaperElement = new ReaperElement(controltimeout);
        .addAndGet(timeout);
        // insert the element only if it's not already present. We check _timeouts first, as elements
        // maybe temporarily removed and reinserted in _reaperElements, so that is not as good a check.
        // We use lazy eval to ensure we insert to _reaperElements only if we inserted to _timeouts.
        // Note: removal works in reverse order i.e. _reaperElements then _timeouts.
        if ((.putIfAbsent(reaperElement._controlreaperElement) == null)) {
            .add(reaperElement);
        } else {
        }
        if ( && reaperElement.getAbsoluteTimeout() < .get()) {
            updateCheckTimeForEarlierInsert(reaperElement.getAbsoluteTimeout());
        }
    }

    
Reset the next wakeup time, when a new element has a timeout earlier than the currently scheduled wakeup.

Parameters:
newCheckTime absolute time in ms.
    private void updateCheckTimeForEarlierInsert(long newCheckTime)
    {
        synchronized (this) {
            long oldCheckTime = .get();
            while (newCheckTime < oldCheckTime) {
                if (.compareAndSet(oldCheckTimenewCheckTime)) {
                    notifyAll(); // force recalc of next wakeup time, taking into account the newly inserted element(s)
                } else {
                    oldCheckTime = .get();
                }
            }
        }
    }
    // takes an Object because OTSManager.destroyControl(Control|ControlImple) uses PseudoControlWrapper not Reapable
    public final void remove(Object control)
    {
        if (..isTraceEnabled()) {
            ..trace("TransactionReaper::remove ( " + control + " )");
        }
        if (control == null)
            return;
        ReaperElement key = .get(control);
        if (key == null) {
            return;
        }
        // if a cancellation is in progress then we have to
        // see it through as we have to ensure that the worker
        // thread does not get wedged. so we have to tell the
        // control has gone away. in order to test the status
        // we need to synchronize on the element before we
        // synchronize on this so we can ensure that we don't
        // deadlock ourselves.
        synchronized (key) {
            if (key._status != .) {
                // we are cancelling this TX anyway and need
                // to track the progress of the cancellation
                // using this entry so we cnanot remove it
                return;
            }
            removeElementClient(key);
        }
    }

    
Given the transaction instance, this will return the time left before the transaction is automatically rolled back if it has not been terminated.

Parameters:
control
Returns:
the remaining time in milliseconds.
    public final long getRemainingTimeoutMills(Object control)
    {
        // arg is an Object because ArjunaTransactionImple.propagationContext does not have a Reapable
        if ((.isEmpty()) || (control == null)) {
            if (..isTraceEnabled()) {
                ..trace("TransactionReaper::getRemainingTimeout for " + control
                        + " returning 0");
            }
            return 0;
        }
        final ReaperElement reaperElement = .get(control);
        long timeout = 0;
        if (reaperElement == null) {
            timeout = 0;
        } else {
            // units are in milliseconds at this stage.
            timeout = reaperElement.getAbsoluteTimeout() - System.currentTimeMillis();
        }
        if (..isTraceEnabled()) {
            ..trace("TransactionReaper::getRemainingTimeoutMillis for "+control+" returning "+timeout);
        }
        return timeout;
    }

    
Given a Control, return the associated timeout, or 0 if we do not know about it.

Return in seconds! Takes an Object because TransactionFactoryImple.getTransactionInfo and ArjunaTransactionImple.propagationContext use it and don't have a Reapable.

    public final int getTimeout(Object control)
    {
        if ((.isEmpty()) || (control == null)) {
            if (..isTraceEnabled()) {
                ..trace("TransactionReaper::getTimeout for " + control
                        + " returning 0");
            }
            return 0;
        }
        final ReaperElement reaperElement = .get(control);
        int timeout = (reaperElement == null ? 0 : reaperElement._timeout);
        ..trace("TransactionReaper::getTimeout for "+control+" returning "+timeout);
        return timeout;
    }
    /*
    * Terminate the transaction reaper. This is a synchronous operation
    * and will only return once the reaper has been shutdown cleanly.
    *
    * Note, this method assumes that the transaction system has been
    * shutdown already so no new transactions can be created, or we
    * could be here for a long time!
    *
    * @param waitForTransactions if <code>true</code> then the reaper will
    * wait until all transactions have terminated (or been terminated by it).
    * If <code>false</code> then the reaper will call setRollbackOnly on all
    * the transactions.
    */
    private final void shutdown(boolean waitForTransactions)
    {
        // the reaper thread synchronizes and waits on this
        synchronized (this) {
             = true;
            /*
                * If the caller does not want to wait for the normal transaction timeout
                * periods to elapse before terminating, then we first start by enabling
                * our time machine!
                */
            if (!waitForTransactions) {
                .setAllTimeoutsToZero();
            }
            /*
                * Wait for all of the transactions to
                * terminate normally.
                */
            while (!.isEmpty()) {
                try {
                    this.wait();
                }
                catch (final Exception ex) {
                }
            }
            .shutdown();
            notifyAll();
        }
        try {
            .join();
        }
        catch (final Exception ex) {
        }
         = null;
        // the reaper worker thread synchronizes and wais on the work queue
        synchronized () {
            .shutdown();
            .notifyAll();
            // hmm, not sure we really need to do this but . . .
            .interrupt();
        }
        try {
            .join();
        }
        catch (final Exception ex) {
        }
         = null;
    }
    // called (indirectly) by user code doing removals on e.g. commit/rollback
    // does not reset the wakeup time - we prefer leaving an unnecessary wakeup as it's
    // cheaper than locking to recalculate the new time here.
    private final void removeElementClient(ReaperElement reaperElement)
    {
        .remove(reaperElement);        
        .remove(reaperElement._control);
        // don't recalc time, just wake up as planned
        if() {
            synchronized (this) {
                this.notifyAll(); // TODO: use different lock for shutdown?
            }
        }
    }
    /*
      * Remove element from list and trigger waiter if we are
      * being shutdown.
      *
      */
    // called internally by the reaper when removing elements - note the different
    // behaviour with regard to check time recalculation. Here we need to ensure the
    // new time is correct.
    private final void removeElementReaper(ReaperElement reaperElement)
    {
        .remove(reaperElement);
        .remove(reaperElement._control);
        synchronized (this) {
            // TODO set needs tobe atomic to getFirst?
            ReaperElement first = .getFirst();
            if(first != null) {
                .set(first.getAbsoluteTimeout());
            } else {
                .set(.);
                if() {
                    this.notifyAll(); // TODO: use different lock for shutdown?
                }
            }
        }
    }
    private final void notifyListeners(Reapable elementboolean rollback)
    {
        // notify listeners. Ignore errors.
        for (int i = 0; i < .size(); i++) {
            try {
                if (rollback)
                    .get(i).rolledBack(element.get_uid());
                else
                    .get(i).markedRollbackOnly(element.get_uid());
            }
            catch (final Throwable ex) {
                // ignore
            }
        }
    }

    
Currently we let the reaper thread run at same priority as other threads. Could get priority from environment.
    public static synchronized void instantiate()
    {
        if (. == null)
        {
            if (..isTraceEnabled()) {
                ..trace("TransactionReaper::instantiate()");
            }
            // default to dynamic mode
            . = true;
            String mode = arjPropertyManager.getCoordinatorEnvironmentBean().getTxReaperMode();
            if (mode.compareTo(.) == 0) {
                . = false;
            }
            if (mode.compareTo(.) == 0) {
                . = false;
            }
            long checkPeriod = .;
            if (!.) {
                checkPeriod = arjPropertyManager.getCoordinatorEnvironmentBean().getTxReaperTimeout();
            }
            . = new TransactionReaper(checkPeriod);
            // must give TX at least 10 millisecs to
            // respond to cancel
            if (.. < 10) {
                .. = 10;
            }
            // must give TX at least 10 millisecs to
            // respond to cancel
            if (.. < 10) {
                .. = 10;
            }
            // we start bleating if the zombie count
            // reaches zombieMax so it has to be at
            // least 1
            if (.. <= 0) {
                .. = 1;
            }
             = new ReaperThread(.);
            // _reaperThread.setPriority(Thread.MIN_PRIORITY);
            .setDaemon(true);
            .setDaemon(true);
            .start();
            .start();
        }
    }

    
Starting with 4.8, this method will always return an instance, will never return null. This causes the reaper to be instantiated unnecessarily in some cases, but that's cheaper than the alternatives.

Returns:
a TransactionReaper singleton.
    public static TransactionReaper transactionReaper() {
        if( == null) {
            instantiate();
        }
        return ;
    }

    
Terminate the transaction reaper. This is a synchronous operation and will only return once the reaper has been shutdown cleanly.

Note, this method assumes that the transaction system has been shutdown already so no new transactions can be created, or we could be here for a long time!

Parameters:
waitForTransactions if true then the reaper will wait until all transactions have terminated (or been terminated by it). If false then the reaper will call setRollbackOnly on all the transactions.
    public static synchronized void terminate(boolean waitForTransactions)
    {
        if ( != null) {
            .shutdown(waitForTransactions);
             = null;
        }
    }
    public static boolean isDynamic()
    {
        return ;
    }
    public static synchronized long transactionLifetime()
    {
        return .get();
    }
    public static final long defaultCheckPeriod = 120000; // in milliseconds
    public static final long defaultCancelWaitPeriod = 500; // in milliseconds
    public static final long defaultCancelFailWaitPeriod = 500; // in milliseconds
    public static final int defaultZombieMax = 8;
    static final synchronized void reset()
    {
         = null;
    }
    private final ReaperElementManager _reaperElements = new ReaperElementManager();
    // The keys are actually Reapable, as that's what insert takes. However, some functions use get(Object)
    // and rely on clever hashcode/equals behaviour, especially for the JTS. Thus the generics key type is Object.
    private final List<ReaperElement_workQueue = new LinkedList<ReaperElement>();
    private final Vector<ReaperMonitor_listeners = new Vector<ReaperMonitor>(); // TODO sync properly
    private long _checkPeriod = 0;
    // Although it is atomic, writes (but not reads) need to by synchronized(this) i.e. on the TransactionReaper instance
    // in order to ensure proper timing with respect to wait/notify and wakeups on the _reaperElements queue.
    private final AtomicLong nextDynamicCheckTime = new AtomicLong(.);

    
number of millisecs delay afer a cancel() is scheduled before the reaper tries to interrupt the worker thread executing the cancel()
    private long _cancelWaitPeriod = 0;

    
number of millisecs delay afer a worker thread is interrupted before the reaper writes the it off as a zombie and starts a new thread
    private long _cancelFailWaitPeriod = 0;

    
threshold for count of non-exited zombies at which system starts logging error messages
    private int _zombieMax = 0;
    private static volatile TransactionReaper _theReaper = null;
    private static ReaperThread _reaperThread = null;
    private static ReaperWorkerThread _reaperWorkerThread = null;
    private static boolean _dynamic = true;
    private static AtomicLong _lifetime = new AtomicLong(0);
    private static int _zombieCount = 0;
	private boolean _inShutdown = false;
New to GrepCode? Check out our FAQ X