Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
 package backtype.storm.contrib.signals;
 
 import java.util.List;
 import java.util.Map;
 
 import  org.apache.zookeeper.WatchedEvent;
 import  org.apache.zookeeper.Watcher;
 import  org.apache.zookeeper.data.Stat;
import  org.slf4j.Logger;
import  org.slf4j.LoggerFactory;
import  backtype.storm.utils.Utils;
import  com.netflix.curator.framework.CuratorFramework;
import  com.netflix.curator.framework.CuratorFrameworkFactory;
import  com.netflix.curator.retry.RetryNTimes;
public class SignalConnection implements Watcher {
	private static final Logger LOG = LoggerFactory.getLogger(SignalConnection.class);
	private static final String namespace = "storm-signals";
    private String name;
    private CuratorFramework client;
    private SignalListener listener;
    
    public SignalConnection(String nameSignalListener listener){
    	this. = name;
    	this. = listener;
    }
    
    
    @SuppressWarnings("rawtypes")
    public void init(Map confthrows Exception {
        String connectString = zkHosts(conf);
        int retryCount = Utils.getInt(conf.get("storm.zookeeper.retry.times"));
        int retryInterval = Utils.getInt(conf.get("storm.zookeeper.retry.interval"));
        this. = CuratorFrameworkFactory.builder().namespace().connectString(connectString)
                .retryPolicy(new RetryNTimes(retryCountretryInterval)).build();
        this..start();
        // create base path if necessary
        Stat stat = this..checkExists().usingWatcher(this).forPath(this.);
        if (stat == null) {
            String path = this..create().creatingParentsIfNeeded().forPath(this.);
            .info("Created: " + path);
        }
    }
    
    @SuppressWarnings({ "rawtypes""unchecked" })
    private static String zkHosts(Map conf) {
        int zkPort = Utils.getInt(conf.get("storm.zookeeper.port"));
        List<StringzkServers = (List<String>) conf.get("storm.zookeeper.servers");
        Iterator<Stringit = zkServers.iterator();
        StringBuffer sb = new StringBuffer();
        while (it.hasNext()) {
            sb.append(it.next());
            sb.append(":");
            sb.append(zkPort);
            if (it.hasNext()) {
                sb.append(",");
            }
        }
        return sb.toString();
    }
    @Override
    public void process(WatchedEvent we) {
        try {
            this..checkExists().usingWatcher(this).forPath(this.);
            .debug("Renewed watch for path {}"this.);
        } catch (Exception ex) {
            .error("Error renewing watch."ex);
        }
        switch (we.getType()) {
        case NodeCreated:
            .debug("Node created.");
            break;
        case NodeDataChanged:
            .debug("Received signal.");
            try {
                this..onSignal(this..getData().forPath(we.getPath()));
            } catch (Exception e) {
                .warn("Unable to process signal."e);
            }
            break;
        case NodeDeleted:
            .debug("NodeDeleted");
            break;
        }
    }
    
    public void close(){
    	this..close();
    }
New to GrepCode? Check out our FAQ X