Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
   *
   * Copyright (c) 2013-2014 sagyf Yang. The Four Group.
   */
  package com.jfinal.ext.plugin.redis;
  
  import java.util.Set;
 
 
 
 public class TopicConsumer {
     protected final Logger logger = Logger.getLogger(getClass());
 
     private TopicNest topic;
     private TopicNest subscriber;
     private String id;
     private int interval = 1000;
 
     public TopicConsumer(final String idfinal String topic) {
         this. = new TopicNest("topic:" + topic);
         this. = new TopicNest(this..cat("subscribers").key());
         this. = id;
     }
 
     public TopicConsumer interval(int interval) {
         this. = interval;
         return this;
     }
 
     private void waitForMessages() {
         try {
             ..sleep();
         } catch (InterruptedException e) {
             // TODO
             e.printStackTrace();
         }
     }
 
     @SuppressWarnings({ "unchecked""rawtypes" })
     public void consume(JedisMessage callback) {
         while (true) {
             Serializable  message = readUntilEnd();
             if (message != null) {
                 callback.onMessage(message);
                 goNext();
             } else {
                 waitForMessages();
             }
         }
     }
 
     public  <T extends Serializable> T consume() {
         T message = readUntilEnd();
         goNext();
         return message;
     }
 
     private <T extends Serializable> T readUntilEnd() {
         if (unreadMessages() > 0) {
             Serializable message = read();
             return (T)message;
         }
         return null;
     }
 
     private void goNext() {
         JedisKit.zincrby(.key(), 1, );
     }
 
     private int getLastReadMessage() {
         Double lastMessageRead = JedisKit.zscore(.key(), );
         if (lastMessageRead == null) {
             Set<TuplezrangeWithScores = JedisKit.zrangeWithScores(.key(), 0, 1);
             if (zrangeWithScores.iterator().hasNext()) {
                 Tuple next = zrangeWithScores.iterator().next();
                 Integer lowest = (intnext.getScore() - 1;
                 JedisKit.zadd(.key(), lowest);
                 return lowest;
             } else {
                 return 0;
             }
         }
         return lastMessageRead.intValue();
     }
 
     private int getTopicSize() {
         String stopicSize = JedisKit.get(.key());
         int topicSize = 0;
         if (stopicSize != null) {
             topicSize = Integer.valueOf(stopicSize);
         }
         return topicSize;
     }
 
    public <T extends Serializable> T  read() {
        int lastReadMessage = getLastReadMessage();
        .debug("lastReadMessage "+lastReadMessage);
        String key = .cat("message").cat(lastReadMessage + 1).key();
        T  message = JedisKit.get(key);
        .info("consume the message," + "key[" + key + "],value[" + message + "]");
        return message;
    }
    public int unreadMessages() {
        return getTopicSize() - getLastReadMessage();
    }
New to GrepCode? Check out our FAQ X