Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  // Copyright (c) P. Taylor Goetz (ptgoetz@gmail.com)
  
  package backtype.storm.contrib.signals.spout;
  
  import java.util.Iterator;
  import java.util.List;
  import java.util.Map;
  
  import  org.apache.zookeeper.WatchedEvent;
 import  org.apache.zookeeper.Watcher;
 import  org.apache.zookeeper.data.Stat;
 import  org.slf4j.Logger;
 import  org.slf4j.LoggerFactory;
 
 import  backtype.storm.spout.SpoutOutputCollector;
 import  backtype.storm.task.TopologyContext;
 import  backtype.storm.topology.base.BaseRichSpout;
 
 import  com.netflix.curator.framework.CuratorFramework;
 import  com.netflix.curator.framework.CuratorFrameworkFactory;
 import  com.netflix.curator.retry.RetryNTimes;
 
 @SuppressWarnings("serial")
 public abstract class BaseSignalSpout extends BaseRichSpout implements Watcher {
 
     private static final Logger LOG = LoggerFactory.getLogger(BaseSignalSpout.class);
     private static final String namespace = "storm-signals";
     private String name;
     private CuratorFramework client;
 
     public BaseSignalSpout(String name) {
         this. = name;
     }
 
     @SuppressWarnings("rawtypes")
     @Override
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
         try {
             initZookeeper(conf);
         } catch (Exception e) {
             .error("Error creating zookeeper client."e);
         }
     }
 
     @SuppressWarnings("rawtypes")
     private void initZookeeper(Map confthrows Exception {
         String connectString = zkHosts(conf);
         int retryCount = (Integerconf.get("storm.zookeeper.retry.times");
         int retryInterval = (Integerconf.get("storm.zookeeper.retry.interval");
 
         this. = CuratorFrameworkFactory.builder().namespace().connectString(connectString)
                 .retryPolicy(new RetryNTimes(retryCountretryInterval)).build();
         this..start();
 
         // create base path if necessary
         Stat stat = this..checkExists().usingWatcher(this).forPath(this.);
         if (stat == null) {
             String path = this..create().creatingParentsIfNeeded().forPath(this.);
             .info("Created: " + path);
         }
     }
 
     @SuppressWarnings({ "rawtypes""unchecked" })
     private String zkHosts(Map conf) {
         int zkPort = (Integerconf.get("storm.zookeeper.port");
         List<StringzkServers = (List<String>) conf.get("storm.zookeeper.servers");
 
         Iterator<Stringit = zkServers.iterator();
         StringBuffer sb = new StringBuffer();
         while (it.hasNext()) {
             sb.append(it.next());
             sb.append(":");
             sb.append(zkPort);
             if (it.hasNext()) {
                 sb.append(",");
             }
         }
         return sb.toString();
     }

    
Releases the zookeeper connection.
 
     @Override
     public void close() {
         super.close();
         this..close();
     }
 
     @Override
     public void process(WatchedEvent we) {
         try {
             this..checkExists().usingWatcher(this).forPath(this.);
             .debug("Renewed watch for path %s"this.);
         } catch (Exception ex) {
             .error("Error renewing watch."ex);
         }
 
         switch (we.getType()) {
        case NodeCreated:
            .debug("Node created.");
            break;
        case NodeDataChanged:
            .debug("Received signal.");
            try {
                this.onSignal(this..getData().forPath(we.getPath()));
            } catch (Exception e) {
                .warn("Unable to process signal."e);
            }
            break;
        case NodeDeleted:
            .debug("NodeDeleted");
            break;
        }
    }
    protected abstract void onSignal(byte[] data);
New to GrepCode? Check out our FAQ X