Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.arjuna.wst11.messaging.engines;
  
  import  org.jboss.ws.api.addressing.MAP;
 
The participant state engine

Author(s):
kevin
 
 public class ParticipantEngine implements ParticipantInboundEvents
 {
    
The associated participant
 
     private final Participant participant ;
    
The participant id.
 
     private final String id ;
    
The coordinator endpoint reference.
 
     private final W3CEndpointReference coordinator ;
    
The current state.
 
     private State state ;
    
The associated timer task or null.
 
     private TimerTask timerTask ;

    
the time which will elapse before the next message resend. this is incrementally increased until it reaches RESEND_PERIOD_MAX
 
     private long resendPeriod;

    
the initial period we will allow between resends.
 
     private long initialResendPeriod;

    
the maximum period we will allow between resends. n.b. the coordinator uses the value returned by getTransportTimeout as the limit for how long it waits for a response. however, we can still employ a max resend period in excess of this value. if a message comes in after the coordinator has given up it will catch it on the next retry.
 
     private long maxResendPeriod;

    
true if this participant has been recovered otherwise false
 
     private boolean recovered;

    
true if this participant's recovery details have been logged to disk otherwise false
 
     private boolean persisted;

    
Construct the initial engine for the participant.

Parameters:
participant The participant.
id The participant id.
coordinator The coordinator endpoint reference.
 
     public ParticipantEngine(final Participant participantfinal String idfinal W3CEndpointReference coordinator)
     {
         this(participantid.coordinatorfalse) ;
     }

    
Construct the engine for the participant in a specified state.

Parameters:
participant The participant.
id The participant id.
state The initial state.
coordinator The coordinator endpoint reference.
    public ParticipantEngine(final Participant participantfinal String idfinal State statefinal W3CEndpointReference coordinatorfinal boolean recovered)
    {
        this. = participant ;
        this. = id ;
        this. = state ;
        this. = coordinator ;
        this. = recovered;
        this. = recovered;
        this. = TransportTimer.getTransportPeriod();
        this. = TransportTimer.getMaximumTransportPeriod();
        this. = this.;
    }

    
Handle the commit event.

Parameters:
commit The commit notification.
map The addressing context.
arjunaContext The arjuna context. None -> None (send committed) Active -> Aborting (do nothing) Preparing -> Aborting (do nothing) PreparedSuccess -> Committing (initiate commit) Committing -> Committing (do nothing) Aborting -> Aborting (do nothing)
    public void commit(final Notification commitfinal MAP mapfinal ArjunaContext arjunaContext)
    {
        final State current ;
        synchronized(this)
        {
            current =  ;
            if (current == .)
            {
                 = . ;
                if ( != null)
                {
                    .cancel() ;
                }
            }
            else if ((current == .) || (current == .))
            {
                 = . ;
            }
        }
        if (current == .)
        {
            executeCommit() ;
        }
        else if (current == null)
        {
            sendCommitted() ;
        }
    }

    
Handle the prepare event.

Parameters:
prepare The prepare notification.
map The addressing context.
arjunaContext The arjuna context. None -> None (send aborted) Active -> Preparing (execute prepare) Preparing -> Preparing (do nothing) PreparedSuccess -> PreparedSuccess (resend prepared) Committing -> Committing (ignore) Aborting -> Aborting (send aborted and forget)
    public void prepare(final Notification preparefinal MAP mapfinal ArjunaContext arjunaContext)
    {
        final State current ;
        synchronized(this)
        {
            current =  ;
            if (current == .)
            {
                 = . ;
            }
        }
        if (current == .)
        {
            executePrepare() ;
        }
        else if (current == .)
        {
            sendPrepared() ;
        }
        else if ((current == .) || (current == null))
        {
            sendAborted() ;
            forget() ;
        }
    }

    
Handle the rollback event.

Parameters:
rollback The rollback notification.
map The addressing context.
arjunaContext The arjuna context. None -> None (send aborted) Active -> Aborting (execute rollback, send aborted and forget) Preparing -> Aborting (execute rollback, send aborted and forget) PreparedSuccess -> Aborting (execute rollback, send aborted and forget) Committing -> Committing (ignore) Aborting -> Aborting (send aborted and forget)
    public void rollback(final Notification rollbackfinal MAP mapfinal ArjunaContext arjunaContext)
    {
        final State current ;
        synchronized(this)
        {
            current =  ;
            if ((current == .) || (current == .) ||
                (current == .))
            {
                 = . ;
            }
        }
        if ((current == .) || (current == .) ||
            (current == .))
        {
            // n.b. if state is PREPARING the participant may still be in the middle
            // of prepare or may even be told to prepare after this is called. according
            // to the spec that is not our lookout. however, rollback should only get
            // called once here.
            if (!executeRollback())
            {
                synchronized (this)
                {
                     = current;
                }
                return ;
            }
            // if the participant managed to persist the log record then we should try
            // to delete it. note that persisted can only be set to true by the PREPARING
            // thread. if it detects a transtiion to ABORTING while it is doing the log write
            // it will clear up itself.
            if ( &&  instanceof Durable2PCParticipant) {
                // if we cannot delete the participant we effectively drop the rollback message
                // here in the hope that we have better luck next time..
                if (!XTSATRecoveryManager.getRecoveryManager().deleteParticipantRecoveryRecord()) {
                    // hmm, could not delete entry -- leave it so we can maybe retry later
                }
            }
            sendAborted() ;
            forget() ;
        }
        else if (current != .)
        {
            sendAborted();
        }
    }

    
Handle the early rollback event. None -> None Active -> Aborting (execute rollback, send aborted and forget) Preparing -> Aborting (execute rollback, send aborted and forget) PreparedSuccess -> PreparedSuccess Committing -> Committing Aborting -> Aborting
    public void earlyRollback()
    {
        rollbackDecision() ;
    }

    
Handle the early readonly event. None -> None Active -> None (send ReadOnly) Preparing -> None (send ReadOnly) PreparedSuccess -> PreparedSuccess Committing -> Committing Aborting -> Aborting
    public void earlyReadonly()
    {
        readOnlyDecision() ;
    }

    
Handle the recovery event. None -> None Active -> Active Preparing -> Preparing Committing -> Committing PreparedSuccess -> PreparedSuccess (resend Prepared) Aborting -> Aborting
    public void recovery()
    {
        final State current ;
        synchronized(this)
        {
            current =  ;
        }
        if (current == .)
        {
            sendPrepared() ;
        }
    }

    
Handle the soap fault event.

Parameters:
soapFault The soap fault.
map The addressing context.
arjunaContext The arjuna context.
    public void soapFault(final SoapFault soapFaultfinal MAP mapfinal ArjunaContext arjunaContext)
    {
        if (..isTraceEnabled())
        {
            final InstanceIdentifier instanceIdentifier = arjunaContext.getInstanceIdentifier() ;
            final SoapFaultType soapFaultType = soapFault.getSoapFaultType() ;
            final QName subCode = soapFault.getSubcode() ;
            ..tracev("Unexpected SOAP fault for participant {0}: {1} {2}"new Object[] {instanceIdentifiersoapFaultTypesubCode}) ;
        }
        QName subcode = soapFault.getSubcode();
        {
            final SoapFaultType soapFaultType = soapFault.getSoapFaultType() ;
            final QName subCode = soapFault.getSubcode() ;
            // unrecoverable error -- forget this participant and delete any persistent
            //  record of it
            final State current ;
            synchronized(this)
            {
                current = ;
                 = null;
            }
            if (current == . &&
                    ..equals(subcode)) {
                // we need to tell this participant to roll back
                executeRollback();
            }
            if ( &&  instanceof Durable2PCParticipant) {
                // remove any durable participant recovery record from the persistent store
                Durable2PCParticipant durableParticipant =(Durable2PCParticipant;
                // if we cannot delete the participant we record an error here
                if (!XTSATRecoveryManager.getRecoveryManager().deleteParticipantRecoveryRecord()) {
                    // hmm, could not delete entry -- log an error
                }
            }
            forget() ;
        }
    }

    
Handle the commit decision event. Preparing -> PreparedSuccess (send Prepared) Committing -> null (send committed and forget)
    private void commitDecision()
    {
        State current ;
        boolean rollbackRequired  = false;
        boolean deleteRequired  = false;
        synchronized(this)
        {
            current =  ;
        }
        if (current == .)
        {
            // ok, we need to write the recovery details to log and send prepared.
            // if we cannot write the log then we have to rollback the participant
            //  and send aborted.
            if ( instanceof Durable2PCParticipant) {
                // write a durable participant recovery record to the persistent store
                Durable2PCParticipant durableParticipant =(Durable2PCParticipant;
                ATParticipantRecoveryRecord recoveryRecord = new ATParticipantRecoveryRecord(durableParticipant);
                if (!XTSATRecoveryManager.getRecoveryManager().writeParticipantRecoveryRecord(recoveryRecord)) {
                    // we need to rollback and send aborted unless some other thread
                    //gets there first
                    rollbackRequired = true;
                }
            }
            // recheck state in case a rollback or readonly came in while we were writing the
            // log record
            synchronized (this) {
                current = ;
                if (current == .) {
                    if (rollbackRequired) {
                        // if we change state to aborting then we are responsible for
                        // calling rollback and sending aborted but we have no log record
                        // to delete
                         = .;
                    } else {
                         = .;
                        // this ensures any subsequent commit or rollback deletes the log record
                        // so we still have no log record to delete here
                         = true;
                    }
                } else if (!rollbackRequired) {
                    // an incoming rollback or readonly changed the state to aborted or null so
                    // it will already have performed a rollback if required but we need to
                    // delete the log record since the rollback/readonly thread did not know
                    // about it
                    deleteRequired = true;
                }
            }
            if (rollbackRequired)
            {
                // we need to do the rollback and send aborted
                executeRollback();
                sendAborted();
                forget();
            } else if (deleteRequired) {
                // just try to delete the log entry -- any required aborted has already been sent
                if (!XTSATRecoveryManager.getRecoveryManager().deleteParticipantRecoveryRecord()) {
                    // hmm, could not delete entry log warning
                }
            } else {
                // whew got through -- send a prepared
                sendPrepared() ;
            }
        }
        else if (current == .)
        {
            if ( &&  instanceof Durable2PCParticipant) {
                // remove any durable participant recovery record from the persistent store
                Durable2PCParticipant durableParticipant =(Durable2PCParticipant;
                // if we cannot delete the participant we effectively drop the commit message
                // here in the hope that we have better luck next time.
                if (!XTSATRecoveryManager.getRecoveryManager().deleteParticipantRecoveryRecord()) {
                    // hmm, could not delete entry -- log a warning
                    // now revert back to PREPARED_SUCCESS and drop message awaiting a retry
                    synchronized(this) {
                         = .;
                    }
                    return;
                }
            }
            sendCommitted() ;
            forget() ;
        }
    }

    
Handle the readOnly decision event. Active -> None (send ReadOnly) Preparing -> None (send ReadOnly)
    private void readOnlyDecision()
    {
        final State current ;
        synchronized(this)
        {
            current =  ;
            if ((current == .) || (current == .))
            {
        	 = null ;
            }
        }
        if ((current == .) || (current == .))
        {
            sendReadOnly() ;
            forget() ;
        }
    }

    
Handle the rollback decision event. Active -> Aborting (send aborted) Preparing -> Aborting (send aborted)
    private void rollbackDecision()
    {
        final State current ;
        synchronized(this)
        {
            current =  ;
            if ((current == .) || (current == .))
            {
                 = . ;
            }
        }
        if ((current == .) || (current == .))
        {
            sendAborted() ;
            forget() ;
        }
    }

    
Handle the comms timeout event. PreparedSuccess -> PreparedSuccess (resend Prepared)
    private void commsTimeout(TimerTask caller)
    {
        final State current ;
        synchronized(this)
        {
            if (!.equals(caller)) {
                // the timer was cancelled but it went off before it could be cancelled
                return;
            }
            // double the resend period up to our maximum limit
            if ( < ) {
                 =  * 14 / 10; // approximately doubles every two resends
                if ( > ) {
                     = ;
                }
            }
            current =  ;
        }
        if (current == .)
        {
            sendPrepared() ;
        }
    }

    
Execute the commit transition.
    private void executeCommit()
    {
        try
        {
            .commit() ;
            commitDecision() ;
        }
        catch (final Throwable th)
        {
            synchronized(this)
            {
                if ( == .)
                {
            	     = . ;
                }
            }
            if (..isTraceEnabled())
            {
                ..tracev("Unexpected exception from participant commit"th) ;
            }
        }
    }

    
Execute the rollback transition.
    private boolean executeRollback()
    {
        try
        {
            .rollback() ;
        }
        catch (final SystemException se)
        {
            return false ;
        }
        catch (final Throwable th)
        {
            if (..isTraceEnabled())
            {
                ..tracev("Unexpected exception from participant rollback"th) ;
            }
        }
        return true ;
    }

    
Execute the prepare transition.
    private void executePrepare()
    {
        final Vote vote ;
        try
        {
            vote = .prepare();
        }
        catch (final SystemException se)
        {
            if (..isTraceEnabled())
            {
                ..tracev("Unexpected exception from participant prepare"se) ;
            }
            return ;
        }
        catch (final Throwable th)
        {
            if (..isTraceEnabled())
            {
                ..tracev("Unexpected exception from participant prepare"th) ;
            }
            rollbackDecision() ;
            return ;
        }
        if (vote instanceof Prepared)
        {
            commitDecision() ;
        }
        else if (vote instanceof ReadOnly)
        {
            readOnlyDecision() ;
        }
        else if (vote instanceof Aborted)
        {
            rollbackDecision() ;
        }
        else
        {
            if (..isTraceEnabled())
            {
                ..tracev("Unexpected result from participant prepare: {0}"new Object[] {(vote == null ? "null" : vote.getClass().getName())});
            }
            rollbackDecision() ;
        }
    }

    
Forget the current participant.
    private void forget()
    {
        synchronized(this)
        {
             = null ;
        }
        ParticipantProcessor.getProcessor().deactivateParticipant(this) ;
    }

    
Send the committed message.
    private void sendCommitted()
    {
        final MAP responseAddressingContext = createContext() ;
        final InstanceIdentifier instanceIdentifier = new InstanceIdentifier() ;
        try
        {
            CoordinatorClient.getClient().sendCommitted(responseAddressingContextinstanceIdentifier) ;
        }
        catch (final Throwable th)
        {
            if (..isTraceEnabled())
            {
                ..tracev("Unexpected exception while sending Committed"th) ;
            }
        }
    }

    
Send the prepared message.
    private void sendPrepared()
    {
        sendPrepared(false);
    }

    
Send the prepared message.

Parameters:
timedOut true if this is in response to a comms timeout
    private void sendPrepared(boolean timedOut)
    {
        final MAP responseAddressingContext = createContext() ;
        final InstanceIdentifier instanceIdentifier = new InstanceIdentifier() ;
        try
        {
            CoordinatorClient.getClient().sendPrepared(responseAddressingContextinstanceIdentifier) ;
        }
        catch (final Throwable th)
        {
            if (..isTraceEnabled())
            {
                ..tracev("Unexpected exception while sending Prepared"th) ;
            }
        }
        updateResendPeriod(timedOut);
        initiateTimer() ;
    }
    private synchronized void updateResendPeriod(boolean timedOut)
    {
        // if we timed out then we multiply the resend period by ~= sqrt(2) up to the maximum
        // if not we make sure it is reset to the initial period
        if (timedOut) {
            if ( < ) {
                long newPeriod  =  * 14 / 10;  // approximately doubles every two resends
                if (newPeriod > ) {
                    newPeriod = ;
                }
                 = newPeriod;
            }
        } else {
            if ( > ) {
                 = ;
            }
        }
    }

    
Send the aborted message.
    private void sendAborted()
    {
        final MAP responseAddressingContext = createContext() ;
        final InstanceIdentifier instanceIdentifier = new InstanceIdentifier() ;
        try
        {
            CoordinatorClient.getClient().sendAborted(responseAddressingContextinstanceIdentifier) ;
        }
        catch (final Throwable th)
        {
            if (..isTraceEnabled())
            {
                ..tracev("Unexpected exception while sending Aborted"th) ;
            }
        }
    }

    
Send the read only message.
    private void sendReadOnly()
    {
        final MAP responseAddressingContext = createContext() ;
        final InstanceIdentifier instanceIdentifier = new InstanceIdentifier() ;
        try
        {
            CoordinatorClient.getClient().sendReadOnly(responseAddressingContextinstanceIdentifier) ;
        }
        catch (final Throwable th)
        {
            if (..isTraceEnabled())
            {
                ..tracev("Unexpected exception while sending ReadOnly"th) ;
            }
        }
    }

    
Initiate the timer.
    private synchronized void initiateTimer()
    {
        if ( != null)
        {
            .cancel() ;
        }
        if ( == .)
        {
             = new TimerTask() {
                public void run() {
                    commsTimeout(this) ;
                }
            } ;
            TransportTimer.getTimer().schedule() ;
        }
        else
        {
             = null ;
        }
    }

    
Create a response context from the incoming context.

Returns:
The addressing context.
    private MAP createContext()
    {
        final String messageId = MessageId.getMessageId() ;
        return AddressingHelper.createNotificationContext(messageId) ;
    }
    
    
Get the coordinator id.

Returns:
The coordinator id.
    public String getId()
    {
        return  ;
    }
    {
        return ;
    }

    
Is the participant persisted to disk?

Returns:
true if persisted, false otherwise.
    public boolean isPersisted()
    {
        return  ;
    }

    
Is the participant recovered?

Returns:
true if recovered, false otherwise.
    public boolean isRecovered()
    {
        return  ;
    }
New to GrepCode? Check out our FAQ X