Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
  * Copyright © 2014-2015 Cask Data, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  * use this file except in compliance with the License. You may obtain a copy of
  * the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
 
 package co.cask.cdap.notifications.service.kafka;
 
 
 import java.util.Map;
 
   private static final Logger LOG = LoggerFactory.getLogger(KafkaNotificationService.class);
   private static final Gson GSON = new GsonBuilder()
     .create();
 
   private final KafkaClient kafkaClient;
   private final NotificationFeedManager feedManager;
   private final KafkaPublisher.Ack ack;
 
   private final int nbPartitions;
 
   // Executor to publish notifications to Kafka
 
   @Inject
   public KafkaNotificationService(CConfiguration cConfKafkaClient kafkaClientDatasetFramework dsFramework,
                                   TransactionSystemClient transactionSystemClient,
                                   NotificationFeedManager feedManager) {
     super(dsFrameworktransactionSystemClientfeedManager);
     this. = kafkaClient;
     this. = feedManager;
     this. = cConf.getInt("kafka.num.partitions");
 
     this. = Maps.newHashMap();
   }
 
   @Override
   protected void startUp() throws Exception {
      = MoreExecutors.listeningDecorator(
       Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("notification-publisher-%d")));
   }
 
   @Override
   protected void shutDown() throws Exception {
  }
  public <N> ListenableFuture<N> publish(final Id.NotificationFeed feedfinal N notification,
                                         final Type notificationType)
    throws NotificationException {
    .trace("Publishing on notification feed [{}]: {}"feednotification);
    return .submit(new Callable<N>() {
      @Override
      public N call() throws Exception {
        try {
          KafkaMessage message = new KafkaMessage(KafkaNotificationUtils.getMessageKey(feed),
                                                  .toJsonTree(notificationnotificationType));
          ByteBuffer bb = KafkaMessageCodec.encode(message);
          TopicPartition topicPartition = KafkaNotificationUtils.getKafkaTopicPartition(feed);
          KafkaPublisher.Preparer preparer = .prepare(topicPartition.getTopic());
          preparer.add(bbmessage.getMessageKey());
          try {
            preparer.send().get();
            return notification;
          } catch (ExecutionException e) {
            throw new NotificationException(e.getCause());
          }
        } catch (IOException e) {
          throw new NotificationException(e);
        }
      }
    });
  }
  public <N> Cancellable subscribe(Id.NotificationFeed feedNotificationHandler<N> handler,
                                   Executor executor)
    // This call will make sure that the feed exists
    .getFeed(feed);
    final TopicPartition topicPartition = KafkaNotificationUtils.getKafkaTopicPartition(feed);
    synchronized (this) {
      KafkaNotificationsCallback kafkaCallback = .get(topicPartition);
      if (kafkaCallback == null) {
        .debug("Creating new Kafka notification callback for topic-partition {}"topicPartition);
        kafkaCallback = new KafkaNotificationsCallback(topicPartition);
        .put(topicPartitionkafkaCallback);
      }
      return kafkaCallback.subscribe(feedhandlerexecutor);
    }
  }

  
Callback class called when a Kafka message is received. The onReceived(java.util.Iterator) method will extract the feed ID of the message received, and pass the notification encoded in the message to all handlers that are interested in that feed, using the delegate in-memory notification service. One callback is created per TopicPartition. It is created by the first subscription to a feed which maps to the TopicPartition.
  private final class KafkaNotificationsCallback implements KafkaConsumer.MessageCallback {
    private final TopicPartition topicPartition;
    private int subscriptions;
    private KafkaNotificationsCallback(TopicPartition topicPartition) {
      this. = topicPartition;
    }
    public <N> Cancellable subscribe(Id.NotificationFeed feedNotificationHandler<N> handler,
                                     Executor executor)
      final Cancellable cancellable = KafkaNotificationService.super.subscribe(feedhandlerexecutor);
      synchronized (KafkaNotificationService.this) {
        if ( == 0) {
          KafkaConsumer.Preparer preparer = .getConsumer().prepare();
          for (int i = 0; i < i++) {
            // TODO there is a bug in twill, that when the topic doesn't exist, add latest will not make subscription
            // start from offset 0 - but that will be fixed soon
            // For now, subscribe to all the partitions, because we don't know exactly to what
            // partition the feed will be mapped to. Twill does not expose the possibility to choose
            // a partitioner.
            preparer.addLatest(.getTopic(), i);
          }
           = preparer.consume(this);
        }
        ++;
      }
      return new Cancellable() {
        @Override
        public void cancel() {
          cancellable.cancel();
          synchronized (KafkaNotificationService.this) {
            --;
            if ( == 0) {
              .cancel();
              .remove();
            }
          }
        }
      };
    }
    @Override
    public void onReceived(Iterator<FetchedMessagemessages) {
      int count = 0;
      while (messages.hasNext()) {
        count++;
        FetchedMessage message = messages.next();
        ByteBuffer payload = message.getPayload();
        try {
          KafkaMessage decodedMessage = KafkaMessageCodec.decode(payload);
          try {
            .trace("Decoded notification from Kafka: {}"decodedMessage);
            notificationReceived(KafkaNotificationUtils.getMessageFeed(decodedMessage.getMessageKey()),
                                 decodedMessage.getNotificationJson());
          } catch (Throwable t) {
            .warn("Error while processing notification {} with handler {}",
                     decodedMessage.getNotificationJson(), t);
          }
        } catch (IOException e) {
          .error("Could not decode Kafka message {} using Gson."messagee);
        }
      }
      .trace("Handled {} messages from kafka"count);
    }
    @Override
    public void finished() {
      .info("Subscription to topic partition {} finished.");
    }
  }
New to GrepCode? Check out our FAQ X