Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Druid - a distributed column store.
   * Copyright (C) 2012, 2013  Metamarkets Group Inc.
   *
   * This program is free software; you can redistribute it and/or
   * modify it under the terms of the GNU General Public License
   * as published by the Free Software Foundation; either version 2
   * of the License, or (at your option) any later version.
   *
  * This program is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
  *
  * You should have received a copy of the GNU General Public License
  * along with this program; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
  */
 
 package io.druid.examples.web;
 
 
 import java.util.Map;
 
 public class InputSupplierUpdateStream implements UpdateStream
 {
   private static final EmittingLogger log = new EmittingLogger(InputSupplierUpdateStream.class);
   private static final long queueWaitTime = 15L;
   private final TypeReference<HashMap<StringObject>> typeRef;
   private final InputSupplier<BufferedReadersupplier;
   private final int QUEUE_SIZE = 10000;
   private final ObjectMapper mapper = new DefaultObjectMapper();
   private final String timeDimension;
   private final Thread addToQueueThread;
 
       final InputSupplier<BufferedReadersupplier,
       final String timeDimension
   )
   {
      = new Thread()
     {
       public void run()
       {
         while (!isInterrupted()) {
           try {
             BufferedReader reader = supplier.getInput();
             String line;
             while ((line = reader.readLine()) != null) {
               if (isValid(line)) {
                 HashMap<StringObjectmap = .readValue(line);
                 if (map.get(timeDimension) != null) {
                   .offer(map.);
                   .debug("Successfully added to queue");
                 } else {
                   .info("missing timestamp");
                 }
               }
             }
           }
 
           catch (InterruptedException e){
             .info(e"Thread adding events to the queue interrupted");
             return;
           }
           catch (JsonMappingException e) {
             .info(e"Error in converting json to map");
           }
           catch (JsonParseException e) {
             .info(e"Error in parsing json");
           }
           catch (IOException e) {
             .info(e"Error in connecting to InputStream");
           }
         }
       }
     };
     .setDaemon(true);
 
     this. = supplier;
     this. = new TypeReference<HashMap<StringObject>>()
     {
     };
     this. = timeDimension;
   }
  private boolean isValid(String s)
  {
    return !(s.isEmpty());
  }
  public void start()
  {
  }
  public void stop()
  {
  }
  public Map<StringObjectpollFromQueue(long waitTimeTimeUnit unitthrows InterruptedException
  {
    return .poll(waitTimeunit);
  }
  public int getQueueSize()
  {
    return .size();
  }
  {
    return ;
  }
New to GrepCode? Check out our FAQ X