Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * JBoss, Home of Professional Open Source
   * Copyright 2006, Red Hat Middleware LLC, and individual contributors
   * as indicated by the @author tags.
   * See the copyright.txt in the distribution for a
   * full listing of individual contributors.
   * This copyrighted material is made available to anyone wishing to use,
   * modify, copy, or redistribute it subject to the terms and conditions
   * of the GNU Lesser General Public License, v. 2.1.
  * This program is distributed in the hope that it will be useful, but WITHOUT A
  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
  * You should have received a copy of the GNU Lesser General Public License,
  * v.2.1 along with this distribution; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  * MA  02110-1301, USA.
  *
  * (C) 2005-2006,
  * @author JBoss Inc.
  */
 /*
  * Copyright (C) 1999-2001 by HP Bluestone Software, Inc. All rights Reserved.
  *
  * HP Arjuna Labs,
  * Newcastle upon Tyne,
  * Tyne and Wear,
  * UK.
  *
  * $Id: PeriodicRecovery.java 2342 2006-03-30 13:06:17Z  $
  */
 
 package com.arjuna.ats.internal.arjuna.recovery;
 
 import java.util.Date;
 
Threaded object to perform the periodic recovery. Instantiated in the RecoveryManager. The work is actually completed by the recovery modules. These modules are dynamically loaded. The modules to load are specified by properties beginning with "RecoveryExtension"

n.b. recovery scans may be performed by this object (it is a thread and may be started as a background task) and by other ad hoc threads

Author(s):
Version:
$Id: PeriodicRecovery.java 2342 2006-03-30 13:06:17Z $
 
 
 public class PeriodicRecovery extends Thread
 {
   
public API ***
 

    
state values indicating whether or not some thread is currently scanning. used to define values of field PeriodicRecovery._currentStatus
 
    public static enum Status
    {
       
state value indicating that no thread is scanning
 
        INACTIVE,
       
state value indicating that some thread is scanning. n.b. the scanning thread may not be the singleton PeriodicRecovery thread instance
 
        SCANNING
    }

    
state values indicating operating mode of scanning process for ad hoc threads and controlling behaviour of singleton periodic recovery thread. used to define values of field PeriodicRecovery._currentMode n.b. PeriodicRecovery._currentStatus may not transition to state SCANNING when PeriodicRecovery._currentStatus is in state SUSPENDED or TERMINATED. However, if a scan is in progress when PeriodicRecovery._currentMode transitions to state SUSPENDED or TERMINATED PeriodicRecovery._currentStatus may (temporarily) remain in state SCANNING before transitioning to state INACTIVE.
 
    public static enum Mode
    {
       
state value indicating that new scans may proceed
 
        ENABLED,
       
state value indicating that new scans may not proceed and the periodic recovery thread should suspend
 
        SUSPENDED,
       
state value indicating that new scans may not proceed and that the singleton PeriodicRecovery thread instance should exit if it is still running
       TERMINATED
   }

    

Parameters:
threaded
useListener if true, start a socket based listener.
    public PeriodicRecovery (boolean threadedboolean useListener)
    {
        super("Periodic Recovery");
        
        initialise();
        // Load the recovery modules that actually do the work.
        loadModules();
        if (useListener)
        {
            try
            {
                 = new WorkerService(this);
                 = new Listener(getServerSocket(), );
                .setDaemon(true);
                            Integer.toString(.getLocalPort()));
            }
            catch (Exception ex) {
                ..warn_recovery_PeriodicRecovery_9(ex);
            }
        }
        if (threaded)
        {
            if (..isDebugEnabled()) {
                ..debug("PeriodicRecovery: starting background scanner thread");
            }
            start();
        }
        if(useListener &&  != null)
        {
            if (..isDebugEnabled()) {
                ..debug("PeriodicRecovery: starting listener worker thread");
            }
            .start();
        }
    }

    
initiate termination of the periodic recovery thread and stop any subsequent scan requests from proceeding. this switches the recovery operation mode to TERMINATED. if a scan is in progress when this method is called and has not yet started phase 2 of its scan it will be forced to return before completing phase 2.

Parameters:
async false if the calling thread should wait for any in-progress scan to complete before returning
   public void shutdown (boolean async)
   {
       // stop the lsitener from adding threads which can exercise the worker
       if ( != null) {
           .stopListener();
       }
       synchronized () {
           if (getMode() != .) {
               if (..isDebugEnabled()) {
                   ..debug("PeriodicRecovery: Mode <== TERMINATED");
               }
               setMode(.);
               .notifyAll();
           }
           if (!async) {
               // synchronous, so we keep waiting until the currently active scan stops or scanning
               // changes to TERMINATED
               while (getStatus() == .) {
                   try {
                       if (..isDebugEnabled()) {
                           ..debug("PeriodicRecovery: shutdown waiting for scan to end");
                       }
                       .wait();
                   } catch(InterruptedException ie) {
                       // just ignore and retest condition
                   }
               }
               if (..isDebugEnabled()) {
                   ..debug("PeriodicRecovery: shutdown scan wait complete");
               }
           }
       }
       // if the shutdown is synchronous then make sure the periodic recovery thread really has stopped running
       if (!async && this.isAlive()) {
           try {
               this.join();
           } catch (InterruptedException e) {
               // ignore
           }
       }
   }

    
make all scanning operations suspend. This switches the recovery operation mode to SUSPENDED. Any attempt to start a new scan either by an ad hoc threads or by the periodic recovery thread will suspend its thread until the mode changes. If a scan is in progress when this method is called it will complete its scan without suspending.

Parameters:
async false if the calling thread should wait for any in-progress scan to complete before returning
Returns:
the previous mode before attempting the suspend
   public Mode suspendScan (boolean async)
   {
       synchronized ()
       {
           // only switch and kick everyone if we are currently ENABLED
           Mode currentMode = getMode();
           if (currentMode == .) {
               if (..isDebugEnabled()) {
                   ..debug("PeriodicRecovery: Mode <== SUSPENDED");
               }
               setMode(.);
               .notifyAll();
           }
           if (!async) {
               // synchronous, so we keep waiting until the currently active scan stops
               while (getStatus() == .) {
                   try {
                       if (..isDebugEnabled()) {
                           ..debug("PeriodicRecovery: suspendScan waiting for scan to end");
                       }
                       .wait();
                   } catch(InterruptedException ie) {
                       // just ignore and retest condition
                   }
                   if (..isDebugEnabled()) {
                       ..debug("PeriodicRecovery: suspendScan scan wait compelete");
                   }
               }
           }
           return currentMode;
       }
   }

    
resume scanning operations This switches the recovery operation mode from SUSPENDED to RESUMED. Any threads which suspended when they tried to start a scan will be woken up by this transition.
   public void resumeScan ()
   {
       synchronized ()
       {
           if (getMode() == .) {
               if (..isDebugEnabled()) {
                   ..debug("PeriodicRecovery: Mode <== ENABLED");
               }
               setMode(.);
               .notifyAll();
           }
       }
   }

    

Returns:
a bound server socket corresponding to the recovery manager
Throws:
java.io.IOException if the host name is unknown or the endpoint has already been bound
    public ServerSocket getServerSocket () throws IOException
    {
        synchronized ()
        {
            if ( == null)
            {
                 = new ServerSocket(RecoveryManager.getRecoveryManagerPort(), ., RecoveryManager.getRecoveryManagerHost());
                
                recoveryPropertyManager.getRecoveryEnvironmentBean().setRecoveryPort(.getLocalPort());
            }
            return ;
        }
    }

   
Implements the background thread which performs the periodic recovery
   public void run ()
   {
       boolean finished = false;
       do
       {
           boolean workToDo = false;
           // ok, get to the point where we are ready to start a scan
           synchronized() {
               if (getStatus() == .) {
                   // need to wait for some other scan to finish
                   if (..isDebugEnabled()) {
                       ..debug("PeriodicRecovery: background thread waiting on other scan");
                   }
                   doScanningWait();
                   // we don't wait around if a worker scan request has just come in
                   if (getMode() == . && !) {
                       // the last guy just finished scanning so we ought to wait a bit rather than just
                       // pile straight in to do some work
                       if (..isDebugEnabled()) {
                           ..debug("PeriodicRecovery: background thread backing off");
                       }
                       doPeriodicWait();
                       // if we got told to stop then do so
                       finished = (getMode() == .);
                   }
               } else {
                   // status == INACTIVE so we can go ahead and scan if scanning is enabled
                   switch (getMode()) {
                       case :
                           // ok grab our chance to be the scanning thread
                           if (..isDebugEnabled()) {
                               ..debug("PeriodicRecovery: background thread Status <== SCANNING");
                           }
                           setStatus(.);
                           // must kick any other waiting threads
                           .notifyAll();
                           workToDo = true;
                           break;
                       case :
                           // we need to wait while we are suspended
                           if (..isDebugEnabled()) {
                               ..debug("PeriodicRecovery: background thread wait while SUSPENDED");
                           }
                           doSuspendedWait();
                           // we come out of here with the lock and either ENABLED or TERMINATED
                           finished = (getMode() == .);
                           break;
                       case :
                           finished = true;
                           break;
                   }
               }
           }
           // its ok to start work if requested -- we cannot be stopped now by a mode change to SUSPEND
           // or TERMINATE until we get through phase 1 and maybe phase 2 if we are lucky
           if (workToDo) {
               // ok it is now this thread's turn to run a scan. before starting we check if there is a
               // worker waiting and reset the waiting flag. we will check again after the scan has
               // completed to see if a worker request has come in after starting this scan.
               // if so we avoid notifying the worker ensuring a requst is only confirmed when a
               // full scan has happened afetr the request was made
               boolean notifyRequired;
               synchronized() {
                   notifyRequired = ;
                    = false;
               }
               // we are in state SCANNING so actually do the scan
               if (..isDebugEnabled()) {
                   ..debug("PeriodicRecovery: background thread scanning");
               }
               doWorkInternal();
               // clear the SCANNING state now we have done
               synchronized() {
                   if (..isDebugEnabled()) {
                       ..debug("PeriodicRecovery: background thread Status <== INACTIVE");
                   }
                   setStatus(.);
                   // must kick any other waiting threads
                   .notifyAll();
                   
                   // check if we need to notify a listener worker that we just finished  a scan
                   if (notifyRequired && !) {
                       notifyWorker();
                   }
                   if (getMode() == . && !) {
                       // we managed a full scan and scanning is still enabled
                       // so wait a bit before the next attempt
                       if (..isDebugEnabled()) {
                           ..debug("PeriodicRecovery: background thread backing off");
                       }
                       doPeriodicWait();
                   }
                   finished = (getMode() == .);
               }
           }
       } while (!finished);
       // make sure the worker thread is not wedged waiting for a scan to complete
       synchronized() {
           if () {
               notifyWorker();
           }
       }
       if (..isDebugEnabled()) {
           ..debug("PeriodicRecovery: background thread exiting");
       }
   }

    
Perform a recovery scan on all registered modules. Caveats: if a scan is already in progress this method will wait for it to complete otherwise it will perform its own scan before returning. If scanning is suspended this will require waiting for scanning to resume.
    public final void doWork ()
    {
        boolean workToDo = false;
        synchronized() {
            if (getMode() == .) {
                if (..isDebugEnabled()) {
                    ..debug("PeriodicRecovery: ad hoc thread wait while SUSPENDED");
                }
                doSuspendedWait();
            }
            // no longer SUSPENDED --  retest in case we got TERMINATED
            if (getMode() == .) {
                if (..isDebugEnabled()) {
                    ..debug("PeriodicRecovery: ad hoc thread scan TERMINATED");
                }
            } else {
                // ok scanning must be enabled -- see if we can start a scan or whether we have to wait on another one
                if (getStatus() == .) {
                    // just wait for the other scan to finish
                    if (..isDebugEnabled()) {
                        ..debug("PeriodicRecovery: ad hoc thread waiting on other scan");
                    }
                    doScanningWait();
                } else {
                    // ok grab our chance to start a scan
                    setStatus(.);
                    // must kick any other waiting threads
                    .notifyAll();
                    if (..isDebugEnabled()) {
                        ..debug("PeriodicRecovery: ad hoc thread Status <== SCANNING");
                    }
                    workToDo = true;
                }
            }
        }
        if (workToDo) {
            // ok it is now this thread's turn to run a scan. before starting we check if there is a
            // worker waiting and reset the waiting flag. we will check again after the scan has
            // completed to see if a worker request has come in after starting this scan.
            // if so we avoid notifying the worker ensuring a request is only confirmed when a
            // full scan has happened after the request was made
            boolean notifyRequired;
            synchronized() {
                notifyRequired = ;
                 = false;
            }
            // ok to start work -- we cannot be stopped now by a mode change to SUSPEND or TERMINATE
            // until we get through phase 1 and maybe phase 2 if we are lucky
            if (..isDebugEnabled()) {
                ..debug("PeriodicRecovery: ad hoc thread scanning");
            }
            doWorkInternal();
            // clear the scan for some other thread to have a go
            synchronized() {
                if (..isDebugEnabled()) {
                    ..debug("PeriodicRecovery: ad hoc thread Status <== INACTIVE");
                }
                setStatus(.);
                // must kick any other waiting threads
                .notifyAll();
                // notify the worker if it was waiting before we started the scan otherwise just leave it to
                // be notified when the next scan finishes.
                if (notifyRequired && !) {
                    notifyWorker();
                }
            }
        }
    }

    
called by the listener worker to wake the periodic recovery thread and get it to start a scan if one is not already in progress
    public void wakeUp()
    {
        synchronized () {
             = true;
            // wake up the periodic recovery thread if no scan is in progress
            if (getStatus() != .) {
                if (..isDebugEnabled()) {
                    ..debug("PeriodicRecovery: listener worker interrupts background thread");
                }
                .notifyAll();
            }
        }
    }

    
Add the specified module to the end of the recovery module list. There is no way to specify relative ordering of recovery modules with respect to modules loaded via the property file.

Parameters:
module The module to append.
    public final void addModule (RecoveryModule module)
    {
        if (..isDebugEnabled()) {
            ..debug("PeriodicRecovery: adding module " + module.getClass().getName());
        }
        .add(module);
    }

    
remove a recovery module from the recovery modules list

Parameters:
module the module to be removed
waitOnScan true if the remove operation should wait for any in-progress scan to complete
    public final void removeModule (RecoveryModule moduleboolean waitOnScan)
    {
        if (..isDebugEnabled()) {
            ..debug("PeriodicRecovery: removing module " + module.getClass().getName());
        }
        if (waitOnScan) {
            // make sure any scan which might be using the module has completed
            synchronized () {
                    doScanningWait();
            }
        }
        
        // now remove it.
        
        .remove(module);
    }
    
    
Remove all modules.

Parameters:
waitOnScan true if the remove operation should wait for any in-progress scan to complete.
    
    public final void removeAllModules (boolean waitOnScan)
    {
        if (..isDebugEnabled()) {
            ..debug("PeriodicRecovery: removing all modules.");
        }
        
        if (waitOnScan) {
            // make sure any scan which might be using the module has completed
            synchronized () {
                    doScanningWait();
            }
        }
        
        .clear();
    }

    
return a copy of the current recovery modules list

Returns:
a copy of the the recovery modules list.
    public final Vector<RecoveryModulegetModules ()
    {
        // return a copy of the modules list so that clients are not affected by dynamic modifications to the list
        // synchronize so that we don't copy in the middle of an add or remove
        synchronized () {
            return new Vector<RecoveryModule>();
        }
    }
    /*
     * debugging aid
     */
    public Listener getListener()
    {
        return ;
    }

    
private implementation ***


    
fetch the current activity status either INACTIVE or SCANNING Caveats: must only be called while synchronized on _stateLock

Returns:
INACTIVE if no scan is in progress or SCANNING if some thread is performing a scan
    private Status getStatus ()
    {
        return ;
    }

    
fetch the current recovery operation mode either ENABLED, SUSPENDED or TERMINATED Caveats: must only be called while synchronized on _stateLock

Returns:
the current recovery operation mode
    public Mode getMode ()
    {
        return ;
    }

    
set the current activity status

Parameters:
status the new status to be used
    private void setStatus (Status status)
    {
         = status;
    }

    
set the current recovery operation mode

Parameters:
mode the new mode to be used
    private void setMode (Mode mode)
    {
         = mode;
    }

    
wait for the required backoff period or less if the scanning status or scan mode changes Caveats: this must only be called when synchronized on _stateLock and when _currentStatus is SCANNING and _currentMode is ENABLED
    private void doBackoffWait()
    {
        try {
            .wait( * 1000L);
        } catch (InterruptedException e) {
            // we can ignore this exception
        }
    }

    
wait for the required recovery period or less if the scanning status or scan mode changes Caveats: this must only be called when synchronized on _stateLock and when _currentStatus is INACTIVE and _currentMode is ENABLED
    private void doPeriodicWait()
    {
        try {
            .wait( * 1000L);
        } catch (InterruptedException e) {
            // we can ignore this exception
        }
    }

    
wait until the we move out of SUSPENDED mode Caveats: this must only be called when synchronized on _stateLock
    private void doSuspendedWait()
    {
        while (getMode() == .) {
            try {
                .wait();
            } catch (InterruptedException e) {
                // we can ignore this exception
            }
        }
    }

    
wait until some other thread stops scanning Caveats: this must only be called when synchronized on _stateLock and when _currentStatus is SCANNING
    private void doScanningWait()
    {
        while (getStatus() == .) {
            try {
                .wait();
            } catch (InterruptedException e) {
                // we can ignore this exception
            }
        }
    }

    
start performing a scan continuing to completion unless we are terminating Caveats: this must only be called when _currentStatus is SCANNING. on return _currentStatus is always still SCANNING
    private void doWorkInternal()
    {
        // n.b. we only get here if status is SCANNING
        if (..isDebugEnabled()) {
            ..debug("Periodic recovery first pass at "+.format(new Date()));
        }
        // n.b. this works on a copy of the modules list so it is not affected by
        // dynamic updates in the middle of a scan, ensuring first+second pass happen
        // for the same stable set of modules.
        Vector copyOfModules = getModules();
        
        Enumeration modules = copyOfModules.elements();
        while (modules.hasMoreElements())
        {
            RecoveryModule m = (RecoveryModulemodules.nextElement();
            // we need to ensure we use the class loader context of the recovery module while we are executing
            // its methods
            ClassLoader cl = switchClassLoader(m);
            try {
            m.periodicWorkFirstPass();
            } finally {
                restoreClassLoader(cl);
            }
            if (..isDebugEnabled()) {
                ..debug(" ");
            }
        }
        // take the lock again so we can do a backoff wait on it
        synchronized () {
            // we have to wait for a bit to avoid catching (too many)
            // transactions etc. that are really progressing quite happily
            doBackoffWait();
            // we carry on scanning even if scanning is SUSPENDED because the suspending thread
            // might be waiting on us to complete and we don't want to risk deadlocking it by waiting
            // here for a resume.
            // if we have been TERMINATED we bail out now
            // n.b. if we give up here the caller is responsible for clearing the active scan
            if (getMode() == .) {
                if (..isDebugEnabled()) {
                    ..debug("PeriodicRecovery: scan TERMINATED at phase 1");
                }
                return;
            }
        }
        // move on to phase 2
        if (..isDebugEnabled()) {
            ..debug("Periodic recovery second pass at "+.format(new Date()));
        }
        modules = copyOfModules.elements();
        while (modules.hasMoreElements())
        {
            RecoveryModule m = (RecoveryModulemodules.nextElement();
            ClassLoader cl = switchClassLoader(m);
            try {
            m.periodicWorkSecondPass();
            } finally {
                restoreClassLoader(cl);
            }
            if (..isDebugEnabled()) {
                ..debug(" ");
            }
        }
        // n.b. the caller is responsible for clearing the active scan
    }

    
notify the listener worker that a scan has completed Caveats: this must only be called when synchronized on _stateLock at the point where Status transitions from SCANNING to INACTIVE
    private void notifyWorker()
    {
        if (..isDebugEnabled()) {
            ..debug("PeriodicRecovery: scan thread signals listener worker");
        }
        if( != null)
        {
            .notifyDone();
        }
         = false;
    }

    
install the classloader associated with some specific recovery module as the current thread's class loader this avoids a problem where the background periodic recovery thread can see the same class as the recovery module's class loader, specifically where a the recovery module resides in a sar (e.g. the XTS code). If class with name "A" is loaded via the background thread class loader as A' and used to create instance a' then a cast expression in th erecovery code of the form (A)a' will try to resolve a' against version A'' loaded via the sar loader and get a class cast exception.

Parameters:
rm the recovery module whose class loader is to be installed as the new thread class loader
Returns:
the class loader currently installed as the thread class loader
    {
        Thread currentThread = Thread.currentThread();
        ClassLoader cl = currentThread.getContextClassLoader();
        currentThread.setContextClassLoader(rm.getClass().getClassLoader());
        return cl;
    }

    
restore the current thread's classloader

Parameters:
cl the class loader to be set as the current thread class loader
    private void restoreClassLoader(ClassLoader cl)
    {
        Thread currentThread = Thread.currentThread();
        currentThread.setContextClassLoader(cl);
    }

    
Load recovery modules prior to starting to recovery. These are loaded in list iteration order.
    private void loadModules ()
    {
        .addAll(recoveryPropertyManager.getRecoveryEnvironmentBean().getRecoveryModules());
    }

    
initialise the periodic recovery instance to a suitable initial state
   private void initialise ()
    {
        setStatus(.);
        setMode(.);
         = recoveryPropertyManager.getRecoveryEnvironmentBean().getPeriodicRecoveryPeriod();
        if ( !=  &&  ..isDebugEnabled()) {
            ..debug("com.arjuna.ats.arjuna.recovery.PeriodicRecovery" +
                    ": Recovery period set to " +  + " seconds");
        }
         = recoveryPropertyManager.getRecoveryEnvironmentBean().getRecoveryBackoffPeriod();
        if ( !=  && ..isDebugEnabled()) {
            ..debug("PeriodicRecovery" +
                    ": Backoff period set to " +  + " seconds");
        }
    }
   // this refers to the modules specified in the recovery manager
   // property file which are dynamically loaded.
   
list of instances of RecoiveryModule either loaded during startup as specified in the recovery manager property file or added dynamically by calls to addModule
   private final Vector<RecoveryModule_recoveryModules = new Vector<RecoveryModule>();

   
time in seconds between the first and second pass in any given scan
   private int _backoffPeriod = 0;

    
time in seconds for which the periodic recovery thread waits between scan attempts
   private int _recoveryPeriod = 0;

    
default value for _backoffPeriod if not specified via RecoveryEnvironmentBean
    public static final int _defaultBackoffPeriod = 10;

    
default value for _recoveryPeriod if not specified via RecoveryEnvironmentBean
    public static final int _defaultRecoveryPeriod = 120;

    
lock controlling access to _currentStatus, _currentMode and _workerScanRequested
   private final Object _stateLock = new Object();

    
activity status indicating whether we IDLING or some thread is SCANNING
   private Status _currentStatus;

    
operating mode indicating whether scanning is ENABLED, SUSPENDED or TERMINATED
   private Mode _currentMode;

    
flag indicating whether the listener has prodded the recovery thread
    private boolean _workerScanRequested = false;

    
format for printing dates in log messages
    private SimpleDateFormat _theTimestamper = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss");

    
socket used by listener worker thread
    private ServerSocket _socket = null;
    private final Object _socketLock = new Object();

    
listener thread running worker service
    private Listener _listener = null;

    
the worker service which handles requests via the listener socket
    private WorkerService _workerService = null;
   /*
    * Read the system properties to set the configurable options
    *
    * Note: if we start and stop the service then changes to the timeouts
    * won't be reflected. We will need to modify this eventually.
    */
   static
   {
   }
New to GrepCode? Check out our FAQ X