Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2011 Red Hat, Inc, and individual contributors.
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *   http://www.apache.org/licenses/LICENSE-2.0
   *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
 package org.projectodd.stilts.conduit.stomp;
 
 import java.util.Map;
 
 
 
 public class ConduitStompConnection implements StompConnection {
 
     private static Logger log = Logger.getLogger(ConduitStompConnection.class);
 
     public ConduitStompConnection(ConduitStompProvider stompProviderMessageConduit messageConduitVersion versionHeartbeat hbthrows StompException {
         .debugf"New connection: %s"messageConduit );
         this. = stompProvider;
         this. = messageConduit;
         this. = version;
         this. = hb;
     }
 
     public Heartbeat getHeartbeat() {
         return this.;
     }
 
     @Override
     public StompSession getSession() {
         return this..getSession();
     }
 
     public Version getVersion() {
         return this.;
     }
 
         return this.;
     }
 
     public MessageConduit getMessageConduit() {
         return this.;
     }
 
     public void send(StompMessage messageString transactionIdthrows StompException {
         if (transactionId != null) {
             getTransactiontransactionId ).sendmessage );
         } else {
             sendmessage );
         }
     }
 
     protected void send(StompMessage messagethrows StompException {
         try {
             this..sendmessage );
         } catch (Exception e) {
             .errorf(e"Cannot send message: %s"message);
             throw new StompExceptione );
         }
     }
 
     void ack(Acknowledger acknowledgerString transactionIdthrows StompException {
         if (transactionId != null) {
             getTransactiontransactionId ).ackacknowledger );
         } else {
             try {
                 acknowledger.ack();
             } catch (Exception e) {
                throw new StompExceptione );
            }
        }
    }
    void nack(Acknowledger acknowledgerString transactionIdthrows StompException {
        if (transactionId != null) {
            getTransactiontransactionId ).nackacknowledger );
        } else {
            try {
                acknowledger.nack();
            } catch (Exception e) {
                throw new StompExceptione );
            }
        }
    }
    synchronized ConduitStompTransaction getTransaction(String transactionIdthrows InvalidTransactionException {
        ConduitStompTransaction transaction = this..gettransactionId );
        if (transaction == null) {
            throw new InvalidTransactionExceptiontransactionId );
        }
        return transaction;
    }
    synchronized ConduitStompTransaction removeTransaction(String transactionId) {
        return this..removetransactionId );
    }
    @Override
    public synchronized void begin(String transactionIdHeaders headersthrows StompException {
        Transaction jtaTransaction = null;
        try {
            tm.begin();
            jtaTransaction = tm.getTransaction();
            tm.suspend();
        } catch (NotSupportedException e) {
            throw new StompExceptione );
        } catch (SystemException e) {
            throw new StompExceptione );
        }
        try {
            ConduitStompTransaction transaction = createTransactionjtaTransactiontransactionId );
            this..puttransactionIdtransaction );
        } catch (Exception e) {
            throw new StompExceptione );
        }
    }
    @Override
    public synchronized void commit(String transactionIdthrows StompException {
        StompTransaction transaction = removeTransactiontransactionId );
        if (transaction == null) {
            throw new InvalidTransactionExceptiontransactionId );
        }
        transaction.commit();
    }
    @Override
    public synchronized void abort(String transactionIdthrows StompException {
        StompTransaction transaction = removeTransactiontransactionId );
        if (transaction == null) {
            throw new InvalidTransactionExceptiontransactionId );
        }
        transaction.abort();
    }
    @Override
    public synchronized Subscription subscribe(String destinationString subscriptionIdHeaders headersthrows StompException {
        try {
            Subscription subscription = createSubscriptiondestinationsubscriptionIdheaders );
            if (subscription == null) {
                .debugf"unable to create subscription for destination %s"destination );
                return null;
            }
            this..putsubscription.getId(), subscription );
            return subscription;
        } catch (Exception e) {
            throw new StompExceptione );
        }
    }
    public Subscription createSubscription(String destinationString subscriptionIdHeaders headersthrows Exception {
        return this..subscribesubscriptionIddestinationheaders );
    }
    @Override
    public synchronized void unsubscribe(String idHeaders headersthrows StompException {
        Subscription subscription = this..removeid );
        if (subscription == null) {
            throw new InvalidSubscriptionExceptionid );
        }
        subscription.cancel();
    }
    @Override
    public synchronized void disconnect() throws NotConnectedException {
        for (StompTransaction each : this..values()) {
            try {
                each.abort();
            } catch (StompException e) {
                .errorf(e"Cannot disconnect");
            }
        }
        this..clear();
        for (Subscription each : this..values()) {
            try {
                each.cancel();
            } catch (StompException e) {
                .errorf(e"Cannot cancel subsrciption: %s"each);
            }
        }
        this..clear();
        this..unregisterthis );
    }
    protected ConduitStompTransaction createTransaction(Transaction jtaTransactionString transactionIdthrows Exception {
        return new ConduitStompTransactionthisjtaTransactiontransactionId );
    }
    private Map<StringSubscriptionsubscriptions = new HashMap<StringSubscription>();
    private Version version;
    private Heartbeat heartbeat;
New to GrepCode? Check out our FAQ X