Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Druid - a distributed column store.
   * Copyright (C) 2012, 2013  Metamarkets Group Inc.
   *
   * This program is free software; you can redistribute it and/or
   * modify it under the terms of the GNU General Public License
   * as published by the Free Software Foundation; either version 2
   * of the License, or (at your option) any later version.
   *
  * This program is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
  *
  * You should have received a copy of the GNU General Public License
  * along with this program; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
  */
 
 package io.druid.examples.web;
 
 
 import java.util.Map;
 
 @JsonTypeName("webstream")
 public class WebFirehoseFactory implements FirehoseFactory
 {
   private static final EmittingLogger log = new EmittingLogger(WebFirehoseFactory.class);
   private final String timeFormat;
   private final UpdateStreamFactory factory;
   private final long queueWaitTime = 15L;
 
   public WebFirehoseFactory(
       @JsonProperty("url"String url,
       @JsonProperty("renamedDimensions"Map<StringStringrenamedDimensions,
       @JsonProperty("timeDimension"String timeDimension,
       @JsonProperty("timeFormat"String timeFormat
   )
   {
     this(
         new RenamingKeysUpdateStreamFactory(
             new InputSupplierUpdateStreamFactory(new WebJsonSupplier(url), timeDimension),
             renamedDimensions
         ), timeFormat
     );
   }
 
   public WebFirehoseFactory(UpdateStreamFactory factoryString timeFormat)
   {
     this. = factory;
     if (timeFormat == null) {
       this. = "auto";
     } else {
       this. = timeFormat;
     }
   }
 
   @Override
   public Firehose connect() throws IOException
   {
 
     final UpdateStream updateStream = .build();
     updateStream.start();
 
     return new Firehose()
     {
       Map<StringObjectmap;
       private final Runnable doNothingRunnable = Runnables.getNoopRunnable();
 
       @Override
       public boolean hasMore()
       {
         try {
            = updateStream.pollFromQueue(.);
           return  != null;
         }
         catch (InterruptedException e) {
           throw Throwables.propagate(e);
         }
       }
 
 
       @Override
      public InputRow nextRow()
      {
        try {
          DateTime date = TimestampParser.createTimestampParser()
                                         .apply(.get(updateStream.getTimeDimension()).toString());
          return new MapBasedInputRow(
              date.getMillis(),
              new ArrayList(.keySet()),
              
          );
        }
        catch (Exception e) {
          throw Throwables.propagate(e);
        }
        finally {
           = null;
        }
      }
      @Override
      public Runnable commit()
      {
        // ephemera in, ephemera out.
        return // reuse the same object each time
      }
      @Override
      public void close() throws IOException
      {
        updateStream.stop();
      }
    };
  }
New to GrepCode? Check out our FAQ X