Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright © 2014 Cask Data, Inc.
   *
   * Licensed under the Apache License, Version 2.0 (the "License"); you may not
   * use this file except in compliance with the License. You may obtain a copy of
   * the License at
   *
   * http://www.apache.org/licenses/LICENSE-2.0
   *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  * License for the specific language governing permissions and limitations under
  * the License.
  */
 
 package co.cask.cdap.examples.webanalytics;
 
 
 import java.util.List;

A co.cask.cdap.api.dataset.Dataset that stores IP to total visit counts. It uses a co.cask.cdap.api.dataset.lib.KeyValueTable underneath to hold the data. It also implements the co.cask.cdap.api.data.batch.RecordScannable interface so that it can be queried using ad-hoc SQL.
 
 public class UniqueVisitCount extends AbstractDataset implements RecordScannable<KeyValue<StringLong>> {
 
   private final KeyValueTable keyValueTable;

  
Constructor for the Dataset. Request to use a co.cask.cdap.api.dataset.lib.KeyValueTable.

Parameters:
spec The specification of the Dataset instance.
keyValueTable The underlying table
 
   public UniqueVisitCount(DatasetSpecification spec, @EmbeddedDataset("kv"KeyValueTable keyValueTable) {
     super(spec.getName(), keyValueTable);
     this. = keyValueTable;
   }

  
Performs increments of visit count of the given IP.

Parameters:
ip The IP to increment
amount The amount to increment
 
   public void increment(String iplong amount) {
     // Delegates to the system KeyValueTable for actual storage operation
     .increment(Bytes.toBytes(ip), amount);
   }

  
Reads the visit count for a given IP.

Parameters:
ip The IP to lookup
Returns:
the number of visits
 
   public long getCount(String ip) {
     byte[] value = .read(Bytes.toBytes(ip));
     return (value == null) ? 0L : Bytes.toLong(value);
   }
 
   @Override
   public Type getRecordType() {
     return new TypeToken<KeyValue<StringLong>>() { }.getType();
   }
 
   @Override
   public List<SplitgetSplits() {
     return .getSplits();
   }
 
   @Override
     // When scanning records, simply convert the type from the underlying KeyValueTable into <String, Long> pair.
     final RecordScanner<KeyValue<byte[], byte[]>> scanner = .createSplitRecordScanner(split);
     return new RecordScanner<KeyValue<StringLong>>() {
 
       @Override
       public void initialize(Split splitthrows InterruptedException {
         scanner.initialize(split);
       }
 
       @Override
       public boolean nextRecord() throws InterruptedException {
         return scanner.nextRecord();
      }
      @Override
      public KeyValue<StringLonggetCurrentRecord() throws InterruptedException {
        KeyValue<byte[], byte[]> record = scanner.getCurrentRecord();
        return new KeyValue<StringLong>(Bytes.toString(record.getKey()), Bytes.toLong(record.getValue()));
      }
      @Override
      public void close() {
        scanner.close();
      }
    };
  }
New to GrepCode? Check out our FAQ X