Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.arjuna.wst11.messaging.engines;
  
  import  org.jboss.ws.api.addressing.MAP;
 
The coordinator state engine

Author(s):
kevin
 
 public class CoordinatorEngine implements CoordinatorInboundEvents
 {
    
Flag indicating this is a coordinator for a durable participant.
 
     private final boolean durable ;
    
The coordinator id.
 
     private final String id ;
    
The instance identifier.
 
     private final InstanceIdentifier instanceIdentifier ;
    
The participant endpoint reference.
 
     private final W3CEndpointReference participant ;
    
The current state.
 
     private State state ;
    
The flag indicating that this coordinator has been recovered from the log.
 
     private boolean recovered ;
    
The flag indicating a read only response.
 
     private boolean readOnly ;
    
The associated timer task or null.
 
     private TimerTask timerTask ;

    
Construct the initial engine for the coordinator.

Parameters:
id The coordinator id.
durable true if the participant is durable, false if volatile.
participant The participant endpoint reference.
 
     public CoordinatorEngine(final String idfinal boolean durablefinal W3CEndpointReference participant)
     {
         this(iddurableparticipantfalse.) ;
     }

    
Construct the engine for the coordinator in a specified state.

Parameters:
id The coordinator id.
durable true if the participant is durable, false if volatile.
participant The participant endpoint reference.
state The initial state.
 
     public CoordinatorEngine(final String idfinal boolean durablefinal W3CEndpointReference participantboolean recoveredfinal State state)
     {
         this. = id ;
         this. = new InstanceIdentifier(id) ;
         this. = durable ;
         this. = participant ;
         this. = state ;
         this. = recovered;
 
         // unrecovered participants are always activated
         // we only need to reactivate recovered participants which were successfully prepared
         // any others will only have been saved because of a heuristic outcome e.g. a comms
         // timeout at prepare will write a heuristic record for an ABORTED TX including a
         // participant in state PREPARING. we can safely drop it since we implement presumed abort.
 
         if (!recovered || state == .) {
             CoordinatorProcessor.getProcessor().activateCoordinator(thisid) ;
         }
     }

    
Handle the aborted event.

Parameters:
aborted The aborted notification.
map The addressing context.
arjunaContext The arjuna context. None -> None (ignore) Active -> Aborting (forget) Preparing -> Aborting (forget) PreparedSuccess -> PreparedSuccess (invalid state) Committing -> Committing (invalid state) Aborting -> Aborting (forget)
    public synchronized void aborted(final Notification abortedfinal MAP mapfinal ArjunaContext arjunaContext)
    {
        final State current =  ;
        if (current == .)
        {
            changeState(.) ;
        }
        else if ((current == .) || (current == .))
        {
            forget() ;
        }
    }

    
Handle the committed event.

Parameters:
committed The committed notification.
map The addressing context.
arjunaContext The arjuna context. None -> None (ignore) Active -> Aborting (invalid state) Preparing -> Aborting (invalid state) PreparedSuccess -> PreparedSuccess (invalid state) Committing -> Committing (forget) Aborting -> Aborting (invalid state)
    public synchronized void committed(final Notification committedfinal MAP mapfinal ArjunaContext arjunaContext)
    {
        final State current =  ;
        if (current == .)
        {
            changeState(.) ;
        }
        else if ((current == .) || (current == .))
        {
            forget() ;
        }
    }

    
Handle the prepared event.

Parameters:
prepared The prepared notification.
map The addressing context.
arjunaContext The arjuna context. None -> Durable: (send rollback), Volatile: Invalid state: none Active -> Aborting (invalid state) Preparing -> PreparedSuccess (Record Vote) PreparedSuccess -> PreparedSuccess (ignore) Committing -> Committing (resend Commit) Aborting -> Aborting (resend Rollback and forget)
    public void prepared(final Notification preparedfinal MAP mapfinal ArjunaContext arjunaContext)
    {
        final State current ;
        synchronized(this)
        {
            current =  ;
            if (current == .)
            {
                changeState(.) ;
            }
            else if (current == .)
            {
                changeState(.) ;
            }
        }
        if (current == .)
        {
            sendCommit() ;
        }
        else if ((current == .))
        {
            if () {
                sendRollback();
            } else {
                sendUnknownTransaction(maparjunaContext) ;
            }
            forget();
        }
        else if ((current == null) && !)
        {
            if ()
            {
                sendRollback() ;
            }
            else
            {
        	sendUnknownTransaction(maparjunaContext) ;
            }
        }
    }

    
Handle the readOnly event.

Parameters:
readOnly The readOnly notification.
map The addressing context.
arjunaContext The arjuna context. None -> None (ignore) Active -> Active (forget) Preparing -> Preparing (forget) PreparedSuccess -> PreparedSuccess (invalid state) Committing -> Committing (invalid state) Aborting -> Aborting (forget)
    public synchronized void readOnly(final Notification readOnlyfinal MAP mapfinal ArjunaContext arjunaContext)
    {
        final State current =  ;
        if ((current == .) || (current == .) ||
            (current == .))
        {
            if (current != .)
            {
                this. = true ;
            }
            forget() ;
        }
    }

    
Handle the soap fault event.

Parameters:
soapFault The soap fault.
map The addressing context.
arjunaContext The arjuna context.
    public void soapFault(final SoapFault soapFaultfinal MAP mapfinal ArjunaContext arjunaContext)
    {
        if (..isTraceEnabled())
        {
            final InstanceIdentifier instanceIdentifier = arjunaContext.getInstanceIdentifier() ;
            final SoapFaultType soapFaultType = soapFault.getSoapFaultType() ;
            final QName subCode = soapFault.getSubcode() ;
            ..tracev("Unexpected SOAP fault for coordinator {0}: {1} {2}"new Object[] {instanceIdentifiersoapFaultTypesubCode}) ;
        }
    }

    
Handle the prepare event. None -> None (invalid state) Active -> Preparing (send prepare) Preparing -> Preparing (resend prepare) PreparedSuccess -> PreparedSuccess (do nothing) Committing -> Committing (invalid state) Aborting -> Aborting (invalid state)
    public State prepare()
    {
        final State current ;
        synchronized(this)
        {
            current =  ;
            if (current == .)
            {
                changeState(.) ;
            }
        }
        if ((current == .) || (current == .))
        {
            sendPrepare() ;
        }
        waitForState(., TransportTimer.getTransportTimeout()) ;
        synchronized(this)
        {
            if ( != .)
            {
                return  ;
            }
            if ( != null)
            {
        	.cancel() ;
                 = null;
            }
            // ok, we leave the participant stub active because the coordinator will attempt
            // to roll it back when it notices that this has failed
            return  ;
        }
    }

    
Handle the commit event. None -> None (invalid state) Active -> Active (invalid state) Preparing -> Preparing (invalid state) PreparedSuccess -> Committing (send commit) Committing -> Committing (resend commit) Aborting -> Aborting (invalid state)
    public State commit()
    {
        final State current ;
        synchronized(this)
        {
            current =  ;
            if (current == .)
            {
                changeState(.) ;
            }
        }
        if ((current == .) || (current == .))
        {
            sendCommit() ;
        }
        waitForState(., TransportTimer.getTransportTimeout()) ;
        synchronized(this)
        {
            if ( != .)
            {
                // if this is a recovered participant then forget will not have
                // deactivated the entry so that this (recovery) thread can
                // detect it and update its log entry. so we need to deactivate
                // the entry here.
                if () {
                    CoordinatorProcessor.getProcessor().deactivateCoordinator(this) ;
                }
                return  ;
            }
            // the participant is still uncommitted so it will be rewritten to the log.
            // it remains activated in case a committed message comes in between now and
            // the next scan. the recovery code will detect this active participant when
            // rescanning the log and use it instead of recreating a new one.
            // we need to mark this one as recovered so it does not get deleted until
            // the next scan
             = true;
            return .;
        }
    }

    
Handle the rollback event. None -> None (invalid state) Active -> Aborting (send rollback) Preparing -> Aborting (send rollback) PreparedSuccess -> Aborting (send rollback) Committing -> Committing (invalid state) Aborting -> Aborting (do nothing)
    public State rollback()
    {
        final State current ;
        synchronized(this)
        {
            current =  ;
            if ((current == .) || (current == .) ||
                (current == .))
            {
                changeState(.) ;
            }
        }
        if ((current == .) || (current == .) ||
            (current == .))
        {
            sendRollback() ;
        }
        else if (current == .)
        {
            forget() ;
        }
        waitForState(., TransportTimer.getTransportTimeout()) ;
        synchronized(this)
        {
            if ( != .)
            {
                // means state must be null and the participant has already been deactivated
                return  ;
            }
            // the participant has not confirmed that it is aborted so it will be written to the
            // log in the transaction's heuristic list. it needs to be deactivated here
            // so that subsequent ABORTED messages are handled correctly, either by sending
            // an UnknownTransaction fault or a rollback depending upon whether it is
            // volatile or durable, respectively
            forget();
            return .;
        }
    }

    
Handle the comms timeout event. Preparing -> Preparing (resend Prepare) Committing -> Committing (resend Commit)
    private void commsTimeout(TimerTask caller)
    {
        final State current ;
        synchronized(this)
        {
            if ( != caller) {
                // the timer was cancelled but it went off before it could be cancelled
                
                return;
            }
            current =  ;
        }
        if (current == .)
        {
            sendPrepare() ;
        }
        else if (current == .)
        {
            sendCommit() ;
        }
    }

    
Get the coordinator id.

Returns:
The coordinator id.
    public String getId()
    {
        return  ;
    }

    
Get the participant endpoint reference

Returns:
The participant endpoint reference
    {
        return  ;
    }

    
Is the participant durable?

Returns:
true if durable, false otherwise.
    public boolean isDurable()
    {
        return  ;
    }

    
Is the participant recovered?

Returns:
true if recovered, false otherwise.
    public boolean isRecovered()
    {
        return  ;
    }

    
Was this a read only response?

Returns:
true if a read only response, false otherwise.
    public synchronized boolean isReadOnly()
    {
        return  ;
    }

    
Retrieve the current state of this participant

Returns:
the current state.
    public synchronized State getState()
    {
        return ;
    }

    
Change the state and notify any listeners.

Parameters:
state The new state.
    private synchronized void changeState(final State state)
    {
        if (this. != state)
        {
            this. = state ;
            notifyAll() ;
        }
    }

    
Wait for the state to change from the specified state.

Parameters:
origState The original state.
delay The maximum time to wait for (in milliseconds).
Returns:
The current state.
    private State waitForState(final State origStatefinal long delay)
    {
        final long end = System.currentTimeMillis() + delay ;
        synchronized(this)
        {
            while( == origState)
            {
                final long remaining = end - System.currentTimeMillis() ;
                if (remaining <= 0)
                {
                    break ;
                }
                try
                {
                    wait(remaining) ;
                }
                catch (final InterruptedException ie) {} // ignore
            }
            return  ;
        }
    }

    
Forget the current coordinator.
    private void forget()
    {
        // first, change state to null to indicate that the participant has completed.
        changeState(null) ;
        // participants which have not been recovered from the log can be deactivated now.
        // participants which have been recovered are left for the recovery thread to deactivate.
        // this is because the recovery thread may have timed out waiting for a response to
        // the commit message and gone on to complete its scan and suspend. the next scan
        // will detect this activated participant and note that it has completed. if a crash
        // happens in between the recovery thread can safely recreate and reactivate the
        // participant and resend the commit since the commit/committed exchange is idempotent.
        if (!) {
            CoordinatorProcessor.getProcessor().deactivateCoordinator(this) ;
        }
    }

    
Send the prepare message.
    private void sendPrepare()
    {
        TimerTask newTimerTask = createTimerTask();
        synchronized (this) {
            // cancel any existing timer task
            if ( != null) {
                .cancel();
            }
            // install the new timer task. this signals our intention to post a prepare which may need
            // rescheduling later but allows us to drop the lock on this while we are in the comms layer.
            // our intention can be revised by another thread by reassigning the field to a new task
            // or null
             = newTimerTask;
        }
        // ok now try the prepare
        try
        {
            ParticipantClient.getClient().sendPrepare(createContext(), ) ;
        }
        catch (final Throwable th)
        {
            if (..isTraceEnabled())
            {
                ..tracev("Unexpecting exception while sending Prepare"th) ;
            }
        }
        // reobtain the lock before deciding whether to schedule the timer
        synchronized (this) {
            if (.equals(newTimerTask)) {
                // the timer task has not been cancelled so schedule it if appropriate
                if ( == .) {
                    scheduleTimer(newTimerTask);
                } else {
                    // no need to schedule it so get rid of it
                     = null;
                }
            }
        }
    }

    
Send the commit message.
    private void sendCommit()
    {
        TimerTask newTimerTask = createTimerTask();
        synchronized (this) {
            // cancel any existing timer task
            if ( != null) {
                .cancel();
            }
            // install the new timer task. this signals our intention to post a commit which may need
            // rescheduling later but allows us to drop the lock on this while we are in the comms layer.
            // our intention can be revised by another thread by reassigning the field to a new task
            // or null
             = newTimerTask;
        }
        // ok now try the commit
        try
        {
            ParticipantClient.getClient().sendCommit(createContext(), ) ;
        }
        catch (final Throwable th)
        {
            if (..isTraceEnabled())
            {
                ..tracev("Unexpecting exception while sending Commit"th) ;
            }
        }
        // reobtain the lock before deciding whether to schedule the timer
        synchronized (this) {
            if (.equals(newTimerTask)) {
                // the timer task has not been cancelled so schedule it if appropriate
                if ( == .) {
                    scheduleTimer(newTimerTask);
                } else {
                    // no need to schedule it so get rid of it
                     = null;
                }
            }
        }
    }

    
Send the rollback message.
    private void sendRollback()
    {
        try
        {
            ParticipantClient.getClient().sendRollback(createContext(), ) ;
        }
        catch (final Throwable th)
        {
            if (..isTraceEnabled())
            {
                ..tracev("Unexpecting exception while sending Rollback"th) ;
            }
        }
    }

    
Send the UnknownTransaction message.
    private void sendUnknownTransaction(final MAP mapfinal ArjunaContext arjunaContext)
    {
        try
        {
            final MAP faultMAP = AddressingHelper.createFaultContext(map, MessageId.getMessageId()) ;
            final InstanceIdentifier instanceIdentifier = arjunaContext.getInstanceIdentifier() ;
            ParticipantClient.getClient().sendSoapFault(faultMAPsoapFaultinstanceIdentifier) ;
        }
        catch (final Throwable th)
        {
        }
    }

    
create a timer task to handle a comms timeout

Returns:
the timer task
    private TimerTask createTimerTask()
    {
        return new TimerTask() {
            public void run() {
                commsTimeout(this) ;
            }
        } ;
    }

    
schedule a timer task to handle a commms timeout

Parameters:
timerTask the timer task to be scheduled
    private void scheduleTimer(TimerTask timerTask)
    {
        TransportTimer.getTimer().schedule(timerTask, TransportTimer.getTransportPeriod()) ;
    }

    
Initiate the timer.
    private synchronized void initiateTimer()
    {
        if ( != null)
        {
            .cancel() ;
        }
        if (( == .) || ( == .))
        {
             = new TimerTask() {
                public void run() {
                    commsTimeout(this) ;
                }
            } ;
            TransportTimer.getTimer().schedule(, TransportTimer.getTransportPeriod()) ;
        }
        else
        {
             = null ;
        }
    }

    
Create a context for the outgoing message.

Returns:
The addressing context.
    private MAP createContext()
    {
        final String messageId = MessageId.getMessageId() ;
        return AddressingHelper.createNotificationContext(messageId) ;
    }
New to GrepCode? Check out our FAQ X