Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one or more
   * contributor license agreements.  See the NOTICE file distributed with
   * this work for additional information regarding copyright ownership.
   * The ASF licenses this file to You under the Apache License, Version 2.0
   * (the "License"); you may not use this file except in compliance with
   * the License.  You may obtain a copy of the License at
   *
   *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
 package org.apache.catalina.tribes.group.interceptors;
 
 
Sends a ping to all members. Configure this interceptor with the TcpFailureDetector below it, and the TcpFailureDetector will act as the membership guide.

Version:
1.0
 
 
 public class TcpPingInterceptor extends ChannelInterceptorBase {
 
     private static final Log log = LogFactory.getLog(TcpPingInterceptor.class);
 
     protected static final byte[] TCP_PING_DATA = new byte[] {
         79, -89, 115, 72, 121, -33, 67, -55, -97, 111, -119, -128, -95, 91, 7, 20,
         125, -39, 82, 91, -21, -33, 67, -102, -73, 126, -66, -113, -127, 103, 30, -74,
         55, 21, -66, -121, 69, 33, 76, -88, -65, 10, 77, 19, 83, 56, 21, 50,
         85, -10, -108, -73, 58, -33, 33, 120, -111, 4, 125, -41, 114, -124, -64, -43};
 
     protected long interval = 1000; //1 second
 
     protected boolean useThread = false;
     protected boolean staticOnly = false;
     protected volatile boolean running = true;
     protected PingThread thread = null;
     protected static final AtomicInteger cnt = new AtomicInteger(0);
 
 
     @Override
     public synchronized void start(int svcthrows ChannelException {
         super.start(svc);
          = true;
         if (  == null && ) {
              = new PingThread();
             .setDaemon(true);
             .setName("TcpPingInterceptor.PingThread-"+.addAndGet(1));
             .start();
         }
 
         //acquire the interceptors to invoke on send ping events
         ChannelInterceptor next = getNext();
         while ( next != null ) {
             if ( next instanceof TcpFailureDetector )
                  = new WeakReference<>((TcpFailureDetector)next);
             if ( next instanceof StaticMembershipInterceptor )
                  = new WeakReference<>((StaticMembershipInterceptor)next);
             next = next.getNext();
         }
 
     }
 
     @Override
     public void stop(int svcthrows ChannelException {
          = false;
         if (  != null ) {
             .interrupt();
              = null;
         }
         super.stop(svc);
     }
 
     @Override
     public void heartbeat() {
         super.heartbeat();
         if (!getUseThread()) sendPing();
    }
    public long getInterval() {
        return ;
    }
    public void setInterval(long interval) {
        this. = interval;
    }
    public void setUseThread(boolean useThread) {
        this. = useThread;
    }
    public void setStaticOnly(boolean staticOnly) {
        this. = staticOnly;
    }
    public boolean getUseThread() {
        return ;
    }
    public boolean getStaticOnly() {
        return ;
    }
    protected void sendPing() {
        TcpFailureDetector tcpFailureDetector =
                 != null ? .get() : null;
        if (tcpFailureDetector != null) {
            // We have a reference to the failure detector
            // Piggy back on it
            tcpFailureDetector.checkMembers(true);
        } else {
            StaticMembershipInterceptor smi =
                     &&  != null ? .get() : null;
            if (smi != null) {
                sendPingMessage(smi.getMembers());
            } else {
                sendPingMessage(getMembers());
            }
        }
    }
    protected void sendPingMessage(Member[] members) {
        if ( members == null || members.length == 0 ) return;
        ChannelData data = new ChannelData(true);//generates a unique Id
        data.setAddress(getLocalMember(false));
        data.setTimestamp(System.currentTimeMillis());
        data.setOptions(getOptionFlag());
        data.setMessage(new XByteBuffer(false));
        try {
            super.sendMessage(membersdatanull);
        }catch (ChannelException x) {
            .warn("Unable to send TCP ping.",x);
        }
    }
    @Override
    public void messageReceived(ChannelMessage msg) {
        //catch incoming
        boolean process = true;
        if ( okToProcess(msg.getOptions()) ) {
            //check to see if it is a ping message, if so, process = false
            process = ( (msg.getMessage().getLength() != .) ||
                        (!Arrays.equals(,msg.getMessage().getBytes()) ) );
        }//end if
        //ignore the message, it doesnt have the flag set
        if ( process ) super.messageReceived(msg);
        else if ( .isDebugEnabled() ) .debug("Received a TCP ping packet:"+msg);
    }//messageReceived
    protected class PingThread extends Thread {
        @Override
        public void run() {
            while () {
                try {
                    sleep();
                    sendPing();
                }catch ( InterruptedException ix ) {
                    // Ignore. Probably triggered by a call to stop().
                    // In the highly unlikely event it was a different trigger,
                    // simply ignore it and continue.
                }catch ( Exception x )  {
                    .warn("Unable to send ping from TCP ping thread.",x);
                }
            }
        }
    }
New to GrepCode? Check out our FAQ X