Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
 /*
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
  *
  * Copyright (c) 2013-2014 sagyf Yang. The Four Group.
  */
 package com.github.sog.plugin.redis;
 
public class QueueConsumer {
    static final String PROCESSINGLIST = . + "processing";
    static final String PREFIX         = "queue" + .;
    static Map<StringQueueConsumerexistingConsumers = Maps.newHashMap();
    final Logger LOG = Logger.getLogger(getClass());
    int interval = 1000;
    boolean start = false;
    private QueueConsumer(String queueName) {
        this. = queueName;
    }
    public static QueueConsumer create(String queueName) {
        QueueConsumer consumer = .get(queueName);
        if (consumer == null) {
            consumer = new QueueConsumer(queueName);
        } else {
            throw new IllegalArgumentException("The consumer named " + queueName + " already exists");
        }
        return consumer;
    }
    public QueueConsumer interval(int interval) {
        this. = interval;
        return this;
    }
    private void waitForMessages() {
        try {
            ..sleep();
        } catch (InterruptedException e) {
            // TODO
            e.printStackTrace();
        }
    }
    public <T extends Serializable> T consume() {
        return JedisKit.rpoplpush(queueName(), processingListName());
    }
    @SuppressWarnings({"rawtypes""unchecked"})
    public void consume(final JedisMessage callback) {
        if () {
            throw new RuntimeException("The Consumer named " +  + " is working");
        }
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    JedisKit.rpoplpush(queueName(), processingListName());
                    List<Serializablemessage = JedisKit.lrange(processingListName(), -1, -1);
                    if (message.isEmpty()) {
                        waitForMessages();
                    } else {
                        callback.onMessage(message.get(0));
                        JedisKit.rpop(processingListName());
                    }
                }
            }
        }).start();
    }
    private String processingListName() {
        return  +  + ;
    }
    private String queueName() {
        return  + ;
    }
New to GrepCode? Check out our FAQ X