Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one or more
   * contributor license agreements.  See the NOTICE file distributed with
   * this work for additional information regarding copyright ownership.
   * The ASF licenses this file to You under the Apache License, Version 2.0
   * (the "License"); you may not use this file except in compliance with
   * the License.  You may obtain a copy of the License at
   *
   *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
 package org.apache.catalina.tribes.group.interceptors;
 
 
The order interceptor guarantees that messages are received in the same order they were sent. This interceptor works best with the ack=true setting.
There is no point in using this with the replicationMode="fastasynchqueue" as this mode guarantees ordering.
If you are using the mode ack=false replicationMode=pooled, and have a lot of concurrent threads, this interceptor can really slow you down, as many messages will be completely out of order and the queue might become rather large. If this is the case, then you might want to set the value OrderInterceptor.maxQueue = 25 (meaning that we will never keep more than 25 messages in our queue)
Configuration Options
OrderInteceptor.expire=<milliseconds> - if a message arrives out of order, how long before we act on it default=3000ms
OrderInteceptor.maxQueue=<max queue size> - how much can the queue grow to ensure ordering. This setting is useful to avoid OutOfMemoryErrorsdefault=Integer.MAX_VALUE
OrderInterceptor.forwardExpired=<boolean> - this flag tells the interceptor what to do when a message has expired or the queue has grown larger than the maxQueue value. true means that the message is sent up the stack to the receiver that will receive and out of order message false means, forget the message and reset the message counter. default=true

Version:
1.1
 
 public class OrderInterceptor extends ChannelInterceptorBase {
     private final HashMap<MemberCounteroutcounter = new HashMap<>();
     private final HashMap<MemberCounterincounter = new HashMap<>();
     private final HashMap<MemberMessageOrderincoming = new HashMap<>();
     private long expire = 3000;
     private boolean forwardExpired = true;
     private int maxQueue = .;
 
     final ReentrantReadWriteLock inLock = new ReentrantReadWriteLock(true);
 
     @Override
     public void sendMessage(Member[] destinationChannelMessage msgInterceptorPayload payloadthrows ChannelException {
         if ( !okToProcess(msg.getOptions()) ) {
             super.sendMessage(destinationmsgpayload);
             return;
         }
         ChannelException cx = null;
         for (int i=0; i<destination.lengthi++ ) {
             try {
                 int nr = 0;
                 .writeLock().lock();
                 try {
                     nr = incCounter(destination[i]);
                 } finally {
                     .writeLock().unlock();
                 }
                 //reduce byte copy
                 msg.getMessage().append(nr);
                 try {
                     getNext().sendMessage(new Member[] {destination[i]}, msgpayload);
                 } finally {
                     msg.getMessage().trim(4);
                 }
             }catch ( ChannelException x ) {
                 if ( cx == null ) cx = x;
                 cx.addFaultyMember(x.getFaultyMembers());
             }
         }//for
         if ( cx != null ) throw cx;
     }
 
     @Override
     public void messageReceived(ChannelMessage msg) {
         if ( !okToProcess(msg.getOptions()) ) {
            super.messageReceived(msg);
            return;
        }
        int msgnr = XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4);
        msg.getMessage().trim(4);
        MessageOrder order = new MessageOrder(msgnr,(ChannelMessage)msg.deepclone());
        .writeLock().lock();
        try {
            if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false);
        } finally {
            .writeLock().unlock();
        }
    }
    protected void processLeftOvers(Member memberboolean force) {
        MessageOrder tmp = .get(member);
        if ( force ) {
            Counter cnt = getInCounter(member);
            cnt.setCounter(.);
        }
        if ( tmp!= null ) processIncoming(tmp);
    }
    

Parameters:
order MessageOrder
Returns:
boolean - true if a message expired and was processed
    protected boolean processIncoming(MessageOrder order) {
        boolean result = false;
        Member member = order.getMessage().getAddress();
        Counter cnt = getInCounter(member);
        MessageOrder tmp = .get(member);
        if ( tmp != null ) {
            order = MessageOrder.add(tmp,order);
        }
        while ( (order!=null) && (order.getMsgNr() <= cnt.getCounter())  ) {
            //we are right on target. process orders
            if ( order.getMsgNr() == cnt.getCounter() ) cnt.inc();
            else if ( order.getMsgNr() > cnt.getCounter() ) cnt.setCounter(order.getMsgNr());
            super.messageReceived(order.getMessage());
            order.setMessage(null);
            order = order.next;
        }
        MessageOrder head = order;
        MessageOrder prev = null;
        tmp = order;
        //flag to empty out the queue when it larger than maxQueue
        boolean empty = order!=null?order.getCount()>=:false;
        while ( tmp != null ) {
            //process expired messages or empty out the queue
            if ( tmp.isExpired() || empty ) {
                //reset the head
                if ( tmp == head ) head = tmp.next;
                cnt.setCounter(tmp.getMsgNr()+1);
                if ( getForwardExpired() )
                    super.messageReceived(tmp.getMessage());
                tmp.setMessage(null);
                tmp = tmp.next;
                if ( prev != null ) prev.next = tmp;
                result = true;
            } else {
                prev = tmp;
                tmp = tmp.next;
            }
        }
        if ( head == null ) .remove(member);
        else .put(memberhead);
        return result;
    }
    @Override
    public void memberAdded(Member member) {
        //notify upwards
        super.memberAdded(member);
    }
    @Override
    public void memberDisappeared(Member member) {
        //reset counters - lock free
        .remove(member);
        .remove(member);
        //clear the remaining queue
        processLeftOvers(member,true);
        //notify upwards
        super.memberDisappeared(member);
    }
    protected int incCounter(Member mbr) {
        Counter cnt = getOutCounter(mbr);
        return cnt.inc();
    }
    protected Counter getInCounter(Member mbr) {
        Counter cnt = .get(mbr);
        if ( cnt == null ) {
            cnt = new Counter();
            cnt.inc(); //always start at 1 for incoming
            .put(mbr,cnt);
        }
        return cnt;
    }
    protected Counter getOutCounter(Member mbr) {
        Counter cnt = .get(mbr);
        if ( cnt == null ) {
            cnt = new Counter();
            .put(mbr,cnt);
        }
        return cnt;
    }
    protected static class Counter {
        private final AtomicInteger value = new AtomicInteger(0);
        public int getCounter() {
            return .get();
        }
        public void setCounter(int counter) {
            this..set(counter);
        }
        public int inc() {
            return .addAndGet(1);
        }
    }
    protected static class MessageOrder {
        private final long received = System.currentTimeMillis();
        private MessageOrder next;
        private final int msgNr;
        private ChannelMessage msg = null;
        public MessageOrder(int msgNr,ChannelMessage msg) {
            this. = msgNr;
            this. = msg;
        }
        public boolean isExpired(long expireTime) {
            return (System.currentTimeMillis()-) > expireTime;
        }
        public ChannelMessage getMessage() {
            return ;
        }
        public void setMessage(ChannelMessage msg) {
            this. = msg;
        }
        public void setNext(MessageOrder order) {
            this. = order;
        }
        public MessageOrder getNext() {
            return ;
        }
        public int getCount() {
            int counter = 1;
            MessageOrder tmp = ;
            while ( tmp != null ) {
                counter++;
                tmp = tmp.next;
            }
            return counter;
        }
        @SuppressWarnings("null"// prev cannot be null
        public static MessageOrder add(MessageOrder headMessageOrder add) {
            if ( head == null ) return add;
            if ( add == null ) return head;
            if ( head == add ) return add;
            if ( head.getMsgNr() > add.getMsgNr() ) {
                add.next = head;
                return add;
            }
            MessageOrder iter = head;
            MessageOrder prev = null;
            while ( iter.getMsgNr() < add.getMsgNr() && (iter.next !=null ) ) {
                prev = iter;
                iter = iter.next;
            }
            if ( iter.getMsgNr() < add.getMsgNr() ) {
                //add after
                add.next = iter.next;
                iter.next = add;
            } else if (iter.getMsgNr() > add.getMsgNr()) {
                //add before
                prev.next = add// prev cannot be null here, warning suppressed
                add.next = iter;
            } else {
                throw new ArithmeticException("Message added has the same counter, synchronization bug. Disable the order interceptor");
            }
            return head;
        }
        public int getMsgNr() {
            return ;
        }
    }
    public void setExpire(long expire) {
        this. = expire;
    }
    public void setForwardExpired(boolean forwardExpired) {
        this. = forwardExpired;
    }
    public void setMaxQueue(int maxQueue) {
        this. = maxQueue;
    }
    public long getExpire() {
        return ;
    }
    public boolean getForwardExpired() {
        return ;
    }
    public int getMaxQueue() {
        return ;
    }
New to GrepCode? Check out our FAQ X