Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
 package backtype.storm.contrib.signals;
 
 import  org.apache.zookeeper.WatchedEvent;
 import  org.apache.zookeeper.Watcher;
 import  org.apache.zookeeper.data.Stat;
 import  org.slf4j.Logger;
 import  org.slf4j.LoggerFactory;
 
 import  com.netflix.curator.framework.CuratorFramework;
public abstract class AbstractSignalConnection implements Watcher {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractSignalConnection.class);
    static final String namespace = "storm-signals";
    protected String name;
    protected CuratorFramework client;
    protected SignalListener listener;
    
    
    protected void initWatcher() throws Exception {
        // create base path if necessary
        Stat stat = this..checkExists().usingWatcher(this).forPath(this.);
        if (stat == null) {
            String path = this..create().creatingParentsIfNeeded().forPath(this.);
            .info("Created: " + path);
        }
    }
    @Override
    public void process(WatchedEvent we) {
        try {
            this..checkExists().usingWatcher(this).forPath(this.);
            .debug("Renewed watch for path {}"this.);
        } catch (Exception ex) {
            .error("Error renewing watch."ex);
        }
        switch (we.getType()) {
        case NodeCreated:
            .debug("Node created.");
            break;
        case NodeDataChanged:
            .debug("Received signal.");
            try {
                this..onSignal(this..getData().forPath(we.getPath()));
            } catch (Exception e) {
                .warn("Unable to process signal."e);
            }
            break;
        case NodeDeleted:
            .debug("NodeDeleted");
            break;
        }
    }
    public void close() {
        this..close();
    }
    
    
    public void send(String toPathbyte[] signalthrows Exception {
        Stat stat = this..checkExists().forPath(toPath);
        if (stat == null) {
            String path = this..create().creatingParentsIfNeeded().forPath(toPath);
            .info("Created: " + path);
        }
        this..setData().forPath(toPathsignal);
    }
New to GrepCode? Check out our FAQ X