Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one or more
   * contributor license agreements.  See the NOTICE file distributed with
   * this work for additional information regarding copyright ownership.
   * The ASF licenses this file to You under the Apache License, Version 2.0
   * (the "License"); you may not use this file except in compliance with
   * the License.  You may obtain a copy of the License at
   *
   *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.catalina.tribes.group;
 
 
 import java.util.List;
 
The default implementation of a Channel.
The GroupChannel manages the replication channel. It coordinates message being sent and received with membership announcements. The channel has an chain of interceptors that can modify the message or perform other logic.
It manages a complete group, both membership and replication.
 
 public class GroupChannel extends ChannelInterceptorBase implements ManagedChannel {
     private static final Log log = LogFactory.getLog(GroupChannel.class);

    
Flag to determine if the channel manages its own heartbeat If set to true, the channel will start a local thread for the heart beat.
 
     protected boolean heartbeat = true;
    
If heartbeat == true then how often do we want this heartbeat to run. default is one minute
 
     protected long heartbeatSleeptime = 5*1000;//every 5 seconds
 
    
Internal heartbeat thread
 
     protected HeartbeatThread hbthread = null;

    
The ChannelCoordinator coordinates the bottom layer components:
- MembershipService
- ChannelSender
- ChannelReceiver
 
     protected final ChannelCoordinator coordinator = new ChannelCoordinator();

    
The first interceptor in the interceptor stack. The interceptors are chained in a linked list, so we only need a reference to the first one
 
     protected ChannelInterceptor interceptors = null;

    
A list of membership listeners that subscribe to membership announcements
 
     protected final List<ObjectmembershipListeners = new CopyOnWriteArrayList<>();

    
A list of channel listeners that subscribe to incoming messages
 
     protected final List<ObjectchannelListeners = new CopyOnWriteArrayList<>();

    
If set to true, the GroupChannel will check to make sure that
    protected boolean optionCheck = false;

    
Creates a GroupChannel. This constructor will also add the first interceptor in the GroupChannel.
The first interceptor is always the channel itself.
    public GroupChannel() {
        addInterceptor(this);
    }


    
Adds an interceptor to the stack for message processing
Interceptors are ordered in the way they are added.
channel.addInterceptor(A);
channel.addInterceptor(C);
channel.addInterceptor(B);
Will result in a interceptor stack like this:
A -> C -> B
The complete stack will look like this:
Channel -> A -> C -> B -> ChannelCoordinator

Parameters:
interceptor ChannelInterceptorBase
    @Override
    public void addInterceptor(ChannelInterceptor interceptor) {
        if (  == null ) {
             = interceptor;
            .setNext();
            .setPrevious(null);
            .setPrevious();
        } else {
            ChannelInterceptor last = ;
            while ( last.getNext() !=  ) {
                last = last.getNext();
            }
            last.setNext(interceptor);
            interceptor.setNext();
            interceptor.setPrevious(last);
            .setPrevious(interceptor);
        }
    }

    
Sends a heartbeat through the interceptor stack.
Invoke this method from the application on a periodic basis if you have turned off internal heartbeats channel.setHeartbeat(false)
    @Override
    public void heartbeat() {
        super.heartbeat();
        Iterator<Objecti = .iterator();
        while ( i.hasNext() ) {
            Object o = i.next();
            if ( o instanceof Heartbeat ) ((Heartbeat)o).heartbeat();
        }
        i = .iterator();
        while ( i.hasNext() ) {
            Object o = i.next();
            if ( o instanceof Heartbeat ) ((Heartbeat)o).heartbeat();
        }
    }


    
Send a message to the destinations specified

Parameters:
destination Member[] - destination.length > 0
msg Serializable - the message to send
options sender options, options can trigger guarantee levels and different interceptors to react to the message see class documentation for the Channel object.
Returns:
UniqueId - the unique Id that was assigned to this message
Throws:
org.apache.catalina.tribes.ChannelException - if an error occurs processing the message
See also:
org.apache.catalina.tribes.Channel
    @Override
    public UniqueId send(Member[] destinationSerializable msgint options)
            throws ChannelException {
        return send(destination,msg,options,null);
    }

    

Parameters:
destination Member[] - destination.length > 0
msg Serializable - the message to send
options sender options, options can trigger guarantee levels and different interceptors to react to the message see class documentation for the Channel object.
handler - callback object for error handling and completion notification, used when a message is sent asynchronously using the Channel.SEND_OPTIONS_ASYNCHRONOUS flag enabled.
Returns:
UniqueId - the unique Id that was assigned to this message
Throws:
org.apache.catalina.tribes.ChannelException - if an error occurs processing the message
See also:
org.apache.catalina.tribes.Channel
    @Override
    public UniqueId send(Member[] destinationSerializable msgint optionsErrorHandler handler)
            throws ChannelException {
        if ( msg == null ) throw new ChannelException("Cant send a NULL message");
        XByteBuffer buffer = null;
        try {
            if (destination == null || destination.length == 0) {
                throw new ChannelException("No destination given");
            }
            ChannelData data = new ChannelData(true);//generates a unique Id
            data.setAddress(getLocalMember(false));
            data.setTimestamp(System.currentTimeMillis());
            byte[] b = null;
            if ( msg instanceof ByteMessage ){
                b = ((ByteMessage)msg).getMessage();
                options = options | ;
            } else {
                b = XByteBuffer.serialize(msg);
                options = options & (~);
            }
            data.setOptions(options);
            //XByteBuffer buffer = new XByteBuffer(b.length+128,false);
            buffer = BufferPool.getBufferPool().getBuffer(b.length+128, false);
            buffer.append(b,0,b.length);
            data.setMessage(buffer);
            InterceptorPayload payload = null;
            if ( handler != null ) {
                payload = new InterceptorPayload();
                payload.setErrorHandler(handler);
            }
            getFirstInterceptor().sendMessage(destinationdatapayload);
            if ( ..isTraceEnabled() ) {
                ..trace("GroupChannel - Sent msg:" + new UniqueId(data.getUniqueId()) +
                        " at " + new java.sql.Timestamp(System.currentTimeMillis()) + " to " +
                        Arrays.toNameString(destination));
                ..trace("GroupChannel - Send Message:" +
                        new UniqueId(data.getUniqueId()) + " is " + msg);
            }
            return new UniqueId(data.getUniqueId());
        }catch ( Exception x ) {
            if ( x instanceof ChannelException ) throw (ChannelException)x;
            throw new ChannelException(x);
        } finally {
            if ( buffer != null ) BufferPool.getBufferPool().returnBuffer(buffer);
        }
    }


    
Callback from the interceptor stack.
When a message is received from a remote node, this method will be invoked by the previous interceptor.
This method can also be used to send a message to other components within the same application, but its an extreme case, and you're probably better off doing that logic between the applications itself.

Parameters:
msg ChannelMessage
    @Override
    public void messageReceived(ChannelMessage msg) {
        if ( msg == null ) return;
        try {
            if ( ..isTraceEnabled() ) {
                ..trace("GroupChannel - Received msg:" +
                        new UniqueId(msg.getUniqueId()) + " at " +
                        new java.sql.Timestamp(System.currentTimeMillis()) + " from " +
                        msg.getAddress().getName());
            }
            Serializable fwd = null;
            if ( (msg.getOptions() & ) ==  ) {
                fwd = new ByteMessage(msg.getMessage().getBytes());
            } else {
                try {
                    fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(), 0,
                            msg.getMessage().getLength());
                }catch (Exception sx) {
                    .error("Unable to deserialize message:"+msg,sx);
                    return;
                }
            }
            if ( ..isTraceEnabled() ) {
                ..trace("GroupChannel - Receive Message:" +
                        new UniqueId(msg.getUniqueId()) + " is " + fwd);
            }
            //get the actual member with the correct alive time
            Member source = msg.getAddress();
            boolean rx = false;
            boolean delivered = false;
            for ( int i=0; i<.size(); i++ ) {
                ChannelListener channelListener = (ChannelListener).get(i);
                if (channelListener != null && channelListener.accept(fwdsource)) {
                    channelListener.messageReceived(fwdsource);
                    delivered = true;
                    //if the message was accepted by an RPC channel, that channel
                    //is responsible for returning the reply, otherwise we send an absence reply
                    if ( channelListener instanceof RpcChannel ) rx = true;
                }
            }//for
            if ((!rx) && (fwd instanceof RpcMessage)) {
                //if we have a message that requires a response,
                //but none was given, send back an immediate one
                sendNoRpcChannelReply((RpcMessage)fwd,source);
            }
            if ( ..isTraceEnabled() ) {
                ..trace("GroupChannel delivered[" + delivered + "] id:" +
                        new UniqueId(msg.getUniqueId()));
            }
        } catch ( Exception x ) {
            //this could be the channel listener throwing an exception, we should log it
            //as a warning.
            if ( .isWarnEnabled() ) .warn("Error receiving message:",x);
            throw new RemoteProcessException("Exception:"+x.getMessage(),x);
        }
    }

    
Sends a NoRpcChannelReply message to a member
This method gets invoked by the channel if a RPC message comes in and no channel listener accepts the message. This avoids timeout

Parameters:
msg RpcMessage
destination Member - the destination for the reply
    protected void sendNoRpcChannelReply(RpcMessage msgMember destination) {
        try {
            //avoid circular loop
            if ( msg instanceof RpcMessage.NoRpcChannelReplyreturn;
            RpcMessage.NoRpcChannelReply reply =
                    new RpcMessage.NoRpcChannelReply(msg.rpcIdmsg.uuid);
            send(new Member[]{destination},reply,.);
        } catch ( Exception x ) {
            .error("Unable to find rpc channel, failed to send NoRpcChannelReply.",x);
        }
    }

    
memberAdded gets invoked by the interceptor below the channel and the channel will broadcast it to the membership listeners

Parameters:
member Member - the new member
    @Override
    public void memberAdded(Member member) {
        //notify upwards
        for (int i=0; i<.size(); i++ ) {
            MembershipListener membershipListener = (MembershipListener).get(i);
            if (membershipListener != nullmembershipListener.memberAdded(member);
        }
    }

    
memberDisappeared gets invoked by the interceptor below the channel and the channel will broadcast it to the membership listeners

Parameters:
member Member - the member that left or crashed
    @Override
    public void memberDisappeared(Member member) {
        //notify upwards
        for (int i=0; i<.size(); i++ ) {
            MembershipListener membershipListener = (MembershipListener).get(i);
            if (membershipListener != nullmembershipListener.memberDisappeared(member);
        }
    }

    
Sets up the default implementation interceptor stack if no interceptors have been added

    protected synchronized void setupDefaultStack() throws ChannelException {
        if (getFirstInterceptor() != null &&
                ((getFirstInterceptor().getNext() instanceof ChannelCoordinator))) {
            addInterceptor(new MessageDispatch15Interceptor());
        }
    }

    
Validates the option flags that each interceptor is using and reports an error if two interceptor share the same flag.

    protected void checkOptionFlags() throws ChannelException {
        StringBuilder conflicts = new StringBuilder();
        ChannelInterceptor first = ;
        while ( first != null ) {
            int flag = first.getOptionFlag();
            if ( flag != 0 ) {
                ChannelInterceptor next = first.getNext();
                while ( next != null ) {
                    int nflag = next.getOptionFlag();
                    if (nflag!=0 && (((flag & nflag) == flag ) || ((flag & nflag) == nflag)) ) {
                        conflicts.append("[");
                        conflicts.append(first.getClass().getName());
                        conflicts.append(":");
                        conflicts.append(flag);
                        conflicts.append(" == ");
                        conflicts.append(next.getClass().getName());
                        conflicts.append(":");
                        conflicts.append(nflag);
                        conflicts.append("] ");
                    }//end if
                    next = next.getNext();
                }//while
            }//end if
            first = first.getNext();
        }//while
        if ( conflicts.length() > 0 ) throw new ChannelException("Interceptor option flag conflict: "+conflicts.toString());
    }

    
Starts the channel

Parameters:
svc int - what service to start
Throws:
org.apache.catalina.tribes.ChannelException
See also:
org.apache.catalina.tribes.Channel.start(int)
    @Override
    public synchronized void start(int svcthrows ChannelException {
        setupDefaultStack();
        if (checkOptionFlags();
        super.start(svc);
        if (  == null &&  ) {
             = new HeartbeatThread(this,);
            .start();
        }
    }

    
    @Override
    public synchronized void stop(int svcthrows ChannelException {
        if ( != null) {
            .stopHeartbeat();
             = null;
        }
        super.stop(svc);
    }

    
Returns the first interceptor of the stack. Useful for traversal.

Returns:
ChannelInterceptor
        if ( != nullreturn ;
        else return ;
    }

    
Returns the channel receiver component

Returns:
ChannelReceiver
    @Override
        return .getClusterReceiver();
    }

    
Returns the channel sender component

Returns:
ChannelSender
    @Override
    public ChannelSender getChannelSender() {
        return .getClusterSender();
    }

    
Returns the membership service component

Returns:
MembershipService
    @Override
        return .getMembershipService();
    }

    
Sets the channel receiver component

Parameters:
clusterReceiver ChannelReceiver
    @Override
    public void setChannelReceiver(ChannelReceiver clusterReceiver) {
        .setClusterReceiver(clusterReceiver);
    }

    
Sets the channel sender component

Parameters:
clusterSender ChannelSender
    @Override
    public void setChannelSender(ChannelSender clusterSender) {
        .setClusterSender(clusterSender);
    }

    
Sets the membership component

Parameters:
membershipService MembershipService
    @Override
    public void setMembershipService(MembershipService membershipService) {
        .setMembershipService(membershipService);
    }

    
Adds a membership listener to the channel.
Membership listeners are uniquely identified using the equals(Object) method

Parameters:
membershipListener MembershipListener
    @Override
    public void addMembershipListener(MembershipListener membershipListener) {
        if (!this..contains(membershipListener) )
            this..add(membershipListener);
    }

    
Removes a membership listener from the channel.
Membership listeners are uniquely identified using the equals(Object) method

Parameters:
membershipListener MembershipListener
    @Override
    public void removeMembershipListener(MembershipListener membershipListener) {
        .remove(membershipListener);
    }

    
Adds a channel listener to the channel.
Channel listeners are uniquely identified using the equals(Object) method

Parameters:
channelListener ChannelListener
    @Override
    public void addChannelListener(ChannelListener channelListener) {
        if (!this..contains(channelListener) ) {
            this..add(channelListener);
        } else {
            throw new IllegalArgumentException("Listener already exists:"+channelListener+"["+channelListener.getClass().getName()+"]");
        }
    }

    
Removes a channel listener from the channel.
Channel listeners are uniquely identified using the equals(Object) method

Parameters:
channelListener ChannelListener
    @Override
    public void removeChannelListener(ChannelListener channelListener) {
        .remove(channelListener);
    }

    
Returns an iterator of all the interceptors in this stack

Returns:
Iterator
    @Override
        return new InterceptorIterator(this.getNext(),this.);
    }

    
Enables/disables the option check
Setting this to true, will make the GroupChannel perform a conflict check on the interceptors. If two interceptors are using the same option flag and throw an error upon start.

Parameters:
optionCheck boolean
    public void setOptionCheck(boolean optionCheck) {
        this. = optionCheck;
    }

    
Configure local heartbeat sleep time
Only used when getHeartbeat()==true

Parameters:
heartbeatSleeptime long - time in milliseconds to sleep between heartbeats
    public void setHeartbeatSleeptime(long heartbeatSleeptime) {
        this. = heartbeatSleeptime;
    }

    
Enables or disables local heartbeat. if setHeartbeat(true) is invoked then the channel will start an internal thread to invoke Channel.heartbeat() every getHeartbeatSleeptime milliseconds

Parameters:
heartbeat boolean
    @Override
    public void setHeartbeat(boolean heartbeat) {
        this. = heartbeat;
    }

    

Returns:
boolean
See also:
setOptionCheck(boolean)
    public boolean getOptionCheck() {
        return ;
    }

    

Returns:
boolean
See also:
setHeartbeat(boolean)
    public boolean getHeartbeat() {
        return ;
    }

    
Returns the sleep time in milliseconds that the internal heartbeat will sleep in between invocations of Channel.heartbeat()

Returns:
long
    public long getHeartbeatSleeptime() {
        return ;
    }

    

Title: Interceptor Iterator

Description: An iterator to loop through the interceptors in a channel

Version:
1.0
    public static class InterceptorIterator implements Iterator<ChannelInterceptor> {
        private final ChannelInterceptor end;
        private ChannelInterceptor start;
        public InterceptorIterator(ChannelInterceptor startChannelInterceptor end) {
            this. = end;
            this. = start;
        }
        @Override
        public boolean hasNext() {
            return !=null &&  != ;
        }
        @Override
        public ChannelInterceptor next() {
            ChannelInterceptor result = null;
            if ( hasNext() ) {
                result = ;
                 = .getNext();
            }
            return result;
        }
        @Override
        public void remove() {
            //empty operation
        }
    }

    

Title: Internal heartbeat thread

Description: if Channel.getHeartbeat()==true then a thread of this class is created

Version:
1.0
    public static class HeartbeatThread extends Thread {
        private static final Log log = LogFactory.getLog(HeartbeatThread.class);
        protected static int counter = 1;
        protected static synchronized int inc() {
            return ++;
        }
        protected volatile boolean doRun = true;
        protected final GroupChannel channel;
        protected final long sleepTime;
        public HeartbeatThread(GroupChannel channellong sleepTime) {
            super();
            this.setPriority();
            setName("GroupChannel-Heartbeat-"+inc());
            setDaemon(true);
            this. = channel;
            this. = sleepTime;
        }
        public void stopHeartbeat() {
             = false;
            interrupt();
        }
        @Override
        public void run() {
            while () {
                try {
                    Thread.sleep();
                    .heartbeat();
                } catch ( InterruptedException x ) {
                    // Ignore. Probably triggered by a call to stopHeartbeat().
                    // In the highly unlikely event it was a different trigger,
                    // simply ignore it and continue.
                } catch ( Exception x ) {
                    .error("Unable to send heartbeat through Tribes interceptor stack. Will try to sleep again.",x);
                }//catch
            }//while
        }//run
    }//HeartbeatThread
New to GrepCode? Check out our FAQ X