Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
http://code.google.com/a/apache-extras.org/p/camel-extra This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. http://www.gnu.org/licenses/gpl-2.0-standalone.html /
 
 
 package org.apacheextras.camel.component.couchbase;
 
 
 import java.util.Map;
 
 import static org.apacheextras.camel.component.couchbase.CouchbaseConstants.*;

Couchbase producer generates various type of operations. PUT, GET, and DELETE are currently supported
 
 
 public class CouchbaseProducer extends DefaultProducer {
 
     private CouchbaseEndpoint endpoint;
     private CouchbaseClientIF client;
     private long startId;
     private PersistTo persistTo;
     private ReplicateTo replicateTo;
     private int producerRetryAttempts;
     private int producerRetryPause;
 
     public CouchbaseProducer(CouchbaseEndpoint endpointCouchbaseClientIF clientint persistToint replicateTothrows Exception {
         super(endpoint);
         this. = endpoint;
         this. = client;
         if (endpoint.isAutoStartIdForInserts()) {
             this. = endpoint.getStartingIdForInsertsFrom();
         }
         this. = endpoint.getProducerRetryAttempts();
         this. = endpoint.getProducerRetryPause();
 
         switch (persistTo) {
             case 0:
                 this. = .;
                 break;
             case 1:
                 this. = .;
                 break;
             case 2:
                 this. = .;
                 break;
             case 3:
                 this. = .;
                 break;
             case 4:
                 this. = .;
                 break;
             default:
                 throw new IllegalArgumentException("Unsupported persistTo parameter. Supported values are 0 to 4. Currently provided: " + persistTo);
         }
 
         switch (replicateTo) {
             case 0:
                 this. = .;
                 break;
             case 1:
                 this. = .;
                 break;
             case 2:
                 this. = .;
                 break;
             case 3:
                 this. = .;
                 break;
             default:
                 throw new IllegalArgumentException("Unsupported replicateTo parameter. Supported values are 0 to 3. Currently provided: " + replicateTo);
         }
 
     }
 
    public void process(Exchange exchangethrows Exception {
        Map<StringObjectheaders = exchange.getIn().getHeaders();
        String id = (headers.containsKey())
                ? exchange.getIn().getHeader(String.class)
                : .getId();
        int ttl = (headers.containsKey())
                ? Integer.parseInt(exchange.getIn().getHeader(String.class))
                : ;
        if (.isAutoStartIdForInserts()) {
            id = Long.toString();
            ++;
        } else if (id == null) {
            throw new CouchbaseException( + " is not specified in message header or endpoint URL.",
                    exchange);
        }
        if (.getOperation().equals()) {
            .info("Type of operation: PUT");
            Object obj = exchange.getIn().getBody();
            exchange.getOut().setBody(setDocument(idttlobj));
        } else if (.getOperation().equals()) {
            .info("Type of operation: GET");
            Object result = .get(id);
            exchange.getOut().setBody(result);
        } else if (.getOperation().equals()) {
            .info("Type of operation: DELETE");
            Future<Booleanresult = .delete(id);
            exchange.getOut().setBody(result.get());
        }
        //cleanup the cache headers
        exchange.getIn().removeHeader();
    }
    private Boolean setDocument(String idint expiryObject objPersistTo persistToReplicateTo replicateTothrows Exception {
        return setDocument(idexpiryobjpersistToreplicateTo);
    }
    private Boolean setDocument(String idint expiryObject objint retryAttemptsPersistTo persistToReplicateTo replicateTothrows Exception {
        OperationFuture<Booleanresult = .set(idexpiryobjpersistToreplicateTo);
        try {
            if (!result.get()) {
                throw new Exception("Unable to save Document. " + id);
            }
            return true;
        } catch (Exception e) {
            if (retryAttempts <= 0) {
                throw e;
            } else {
                .info("Unable to save Document, retrying in " +  + "ms (" + retryAttempts + ")");
                Thread.sleep();
                return setDocument(idexpiryobj, (retryAttempts - 1), persistToreplicateTo);
            }
        }
    }
New to GrepCode? Check out our FAQ X