Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
http://code.google.com/a/apache-extras.org/p/camel-extra This program is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. http://www.gnu.org/licenses/lgpl-3.0-standalone.html /
 
 
 package org.apacheextras.camel.component.couchbase;
 
 
 import static org.apacheextras.camel.component.couchbase.CouchbaseConstants.*;
 
 
     private final CouchbaseEndpoint endpoint;
     private final CouchbaseClient client;
     private final View view;
     private final Query query;
 
     public CouchbaseConsumer(CouchbaseEndpoint endpointCouchbaseClient clientProcessor processor) {
 
         super(endpointprocessor);
         this. = client;
         this. = endpoint;
         this. = client.getView(endpoint.getDesignDocumentName(), endpoint.getViewName());
         this. = new Query();
         init();
 
     }
 
     private void init() {
 
         .setIncludeDocs(true);
 
         int limit = .getLimit();
         if (limit > 0) {
             .setLimit(limit);
         }
 
         int skip = .getSkip();
         if (skip > 0) {
             .setSkip(skip);
         }
 
 
         String rangeStartKey = .getRangeStartKey();
         String rangeEndKey = .getRangeEndKey();
         if ("".equals(rangeStartKey) || "".equals(rangeEndKey)) {
             return;
         }
         .setRange(rangeStartKeyrangeEndKey);
 
     }
 
     @Override
     protected void doStart() throws Exception {
         .info("Starting Couchbase consumer");
         super.doStart();
     }
 
     @Override
     protected void doStop() throws Exception {
         .info("Stopping Couchbase consumer");
         super.doStop();
     }
 
     @Override
     protected synchronized int poll() throws Exception {
         ViewResponse result = .query();
         .info("Received result set from Couchbase");
 
         if (.isTraceEnabled()) {
             .trace("ViewResponse = {}"result);
         }
 
        String consumerProcessedStrategy = .getConsumerProcessedStrategy();
        for (ViewRow row : result) {
            String id = row.getId();
            Object doc = row.getDocument();
            String key = row.getKey();
            String designDocumentName = .getDesignDocumentName();
            String viewName = .getViewName();
            Exchange exchange = .createExchange();
            exchange.getIn().setBody(doc);
            exchange.getIn().setHeader(id);
            exchange.getIn().setHeader(key);
            exchange.getIn().setHeader(designDocumentName);
            exchange.getIn().setHeader(viewName);
            if ("delete".equalsIgnoreCase(consumerProcessedStrategy)) {
                if (.isTraceEnabled()) {
                    .trace("Deleting doc with ID {}"id);
                }
                .delete(id);
            } else if ("filter".equalsIgnoreCase(consumerProcessedStrategy)) {
                if (.isTraceEnabled()) {
                    .trace("Filtering out ID {}"id);
                }
                // add filter for already processed docs
            } else {
                .trace("No strategy set for already processed docs, beware of duplicates!");
            }
            logDetails(iddockeydesignDocumentNameviewNameexchange);
            try {
                this.getProcessor().process(exchange);
            } catch (Exception e) {
                this.getExceptionHandler().handleException("Error processing exchange."exchangee);
            }
        }
        return result.size();
    }
    private void logDetails(String idObject docString keyString designDocumentNameString viewName,
                            Exchange exchange) {
        if (.isTraceEnabled()) {
            .trace("Created exchange = {}"exchange);
            .trace("Added Document in body = {}"doc);
            .trace("Adding to Header");
            .trace("ID = {}"id);
            .trace("Key = {}"key);
            .trace("Design Document Name = {}"designDocumentName);
            .trace("View Name = {}"viewName);
        }
    }
New to GrepCode? Check out our FAQ X