Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
   /*
    * Licensed to the Apache Software Foundation (ASF) under one or more
    * contributor license agreements.  See the NOTICE file distributed with
    * this work for additional information regarding copyright ownership.
    * The ASF licenses this file to You under the Apache License, Version 2.0
    * (the "License"); you may not use this file except in compliance with
    * the License.  You may obtain a copy of the License at
    *
    *      http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  
  package org.apache.catalina.tribes.tipis;
  
  import java.util.HashMap;
  import java.util.Map;
  import java.util.Set;
  

Version:
1.0
  
  public abstract class AbstractReplicatedMap<K,V>
          implements Map<K,V>, SerializableRpcCallbackChannelListener,
          MembershipListenerHeartbeat {
  
      private static final long serialVersionUID = 1L;
  
      private final Log log = LogFactory.getLog(AbstractReplicatedMap.class);

    
The default initial capacity - MUST be a power of two.
  
      public static final int DEFAULT_INITIAL_CAPACITY = 16;

    
The load factor used when none specified in constructor.
  
      public static final float DEFAULT_LOAD_FACTOR = 0.75f;
  
  
  //------------------------------------------------------------------------------
  //              INSTANCE VARIABLES
  //------------------------------------------------------------------------------
      private final ConcurrentHashMap<K, MapEntry<K,V>> innerMap;
  
      protected abstract int getStateMessageType();


    
Timeout for RPC messages, how long we will wait for a reply
  
      protected transient long rpcTimeout = 5000;
    
Reference to the channel for sending messages
  
      protected transient Channel channel;
    
The RpcChannel to send RPC messages through
  
      protected transient RpcChannel rpcChannel;
    
The Map context name makes this map unique, this allows us to have more than one map shared through one channel
  
      protected transient byte[] mapContextName;
    
Has the state been transferred
  
     protected transient boolean stateTransferred = false;
    
Simple lock object for transfers
 
     protected final transient Object stateMutex = new Object();
    
A list of members in our map
 
     protected final transient HashMap<MemberLongmapMembers = new HashMap<>();
    
Our default send options
 
     protected transient int channelSendOptions = .;
    
The owner of this map, ala a SessionManager for example
 
     protected transient MapOwner mapOwner;
    
External class loaders if serialization and deserialization is to be performed successfully.
 
     protected transient ClassLoader[] externalLoaders;

    
The node we are currently backing up data to, this index will rotate on a round robin basis
 
     protected transient int currentNode = 0;

    
Since the map keeps internal membership this is the timeout for a ping message to be responded to If a remote map doesn't respond within this timeframe, its considered dead.
 
     protected transient long accessTimeout = 5000;

    
Readable string of the mapContextName value
 
     protected transient String mapname = "";
 
 //------------------------------------------------------------------------------
 //              map owner interface
 //------------------------------------------------------------------------------
 
     public static interface MapOwner {
         public void objectMadePrimary(Object keyObject value);
     }
 
 //------------------------------------------------------------------------------
 //              CONSTRUCTORS
 //------------------------------------------------------------------------------
 
    
Creates a new map

Parameters:
channel The channel to use for communication
timeout long - timeout for RPC messags
mapContextName String - unique name for this map, to allow multiple maps per channel
initialCapacity int - the size of this map, see HashMap
loadFactor float - load factor, see HashMap
cls - a list of classloaders to be used for deserialization of objects.
terminate - Flag for whether to terminate this map that failed to start.
 
     public AbstractReplicatedMap(MapOwner owner,
                                  Channel channel,
                                  long timeout,
                                  String mapContextName,
                                  int initialCapacity,
                                  float loadFactor,
                                  int channelSendOptions,
                                  ClassLoader[] cls,
                                  boolean terminate) {
          = new ConcurrentHashMap<>(initialCapacityloadFactor, 15);
         init(ownerchannelmapContextNametimeoutchannelSendOptionsclsterminate);
 
     }

    
Helper methods, wraps a single member in an array

Parameters:
m Member
Returns:
Member[]
 
     protected Member[] wrap(Member m) {
         if ( m == null ) return new Member[0];
         else return new Member[] {m};
     }

    
Initializes the map by creating the RPC channel, registering itself as a channel listener This method is also responsible for initiating the state transfer

Parameters:
owner Object
channel Channel
mapContextName String
timeout long
channelSendOptions int
cls ClassLoader[]
terminate - Flag for whether to terminate this map that failed to start.
 
     protected void init(MapOwner ownerChannel channelString mapContextName,
             long timeoutint channelSendOptions,ClassLoader[] clsboolean terminate) {
         long start = System.currentTimeMillis();
         if (.isInfoEnabled()) .info("Initializing AbstractReplicatedMap with context name:"+mapContextName);
         this. = owner;
         this. = cls;
         this. = channelSendOptions;
         this. = channel;
         this. = timeout;
 
         this. = mapContextName;
         //unique context is more efficient if it is stored as bytes
         this. = mapContextName.getBytes(.);
         if ( .isTraceEnabled() ) .trace("Created Lazy Map with name:"+mapContextName+", bytes:"+Arrays.toString(this.));
 
         //create an rpc channel and add the map as a listener
         this. = new RpcChannel(this.channelthis);
         //add this map as a message listener
         this..addChannelListener(this);
         //listen for membership notifications
         this..addMembershipListener(this);
 
 
         try {
             //broadcast our map, this just notifies other members of our existence
             broadcast(.true);
             //transfer state from another map
             transferState();
             //state is transferred, we are ready for messaging
             broadcast(.true);
         } catch (ChannelException x) {
             .warn("Unable to send map start message.");
             if (terminate) {
                 breakdown();
                 throw new RuntimeException("Unable to start replicated map.",x);
             }
         }
         long complete = System.currentTimeMillis() - start;
         if (.isInfoEnabled())
             .info("AbstractReplicatedMap[" +mapContextName + "] initialization was completed in " + complete + " ms.");
     }


    
Sends a ping out to all the members in the cluster, not just map members that this map is alive.

Parameters:
timeout long
Throws:
org.apache.catalina.tribes.ChannelException
 
     protected void ping(long timeoutthrows ChannelException {
         //send out a map membership message, only wait for the first reply
         MapMessage msg = new MapMessage(this.,
                                         .,
                                         false,
                                         null,
                                         null,
                                         null,
                                         .getLocalMember(false),
                                         null);
         if ( .getMembers().length > 0 ) {
             try {
                 //send a ping, wait for all nodes to reply
                 Response[] resp = .send(.getMembers(),
                                                   msg.,
                                                   (),
                                                   (int);
                 for (int i = 0; i < resp.lengthi++) {
                     memberAlive(resp[i].getSource());
                 }
             } catch (ChannelException ce) {
                 // Handle known failed members
                 FaultyMember[] faultyMembers = ce.getFaultyMembers();
                 for (FaultyMember faultyMember : faultyMembers) {
                     memberDisappeared(faultyMember.getMember());
                 }
                 throw ce;
             }
         }
         //update our map of members, expire some if we didn't receive a ping back
         synchronized () {
             Member[] members = .keySet().toArray(new Member[.size()]);
             long now = System.currentTimeMillis();
             for (Member member : members) {
                 long access = .get(member).longValue();
                 if ( (now - access) > timeout ) {
                     memberDisappeared(member);
                 }
             }
         }//synch
     }

    
We have received a member alive notification

Parameters:
member Member
 
     protected void memberAlive(Member member) {
         synchronized () {
             if (!.containsKey(member)) {
                 mapMemberAdded(member);
             } //end if
             .put(membernew Long(System.currentTimeMillis()));
         }
     }

    
Helper method to broadcast a message to all members in a channel

Parameters:
msgtype int
rpc boolean
Throws:
org.apache.catalina.tribes.ChannelException
 
     protected void broadcast(int msgtypeboolean rpcthrows ChannelException {
         Member[] members = .getMembers();
         // No destination.
         if (members.length == 0 ) return;
         //send out a map membership message, only wait for the first reply
         MapMessage msg = new MapMessage(this.msgtype,
                                         falsenullnullnull.getLocalMember(false), null);
         if ( rpc) {
             Response[] resp = .send(membersmsg,
                     ., (), );
             if (resp.length > 0) {
                 for (int i = 0; i < resp.lengthi++) {
                     mapMemberAdded(resp[i].getSource());
                     messageReceived(resp[i].getMessage(), resp[i].getSource());
                 }
             } else {
                 .warn("broadcast received 0 replies, probably a timeout.");
             }
         } else {
             .send(.getMembers(),msg,);
         }
     }
 
     public void breakdown() {
         finalize();
     }
 
     @Override
     public void finalize() {
         if (this. != null) {
             this..breakdown();
         }
         try {broadcast(.,false); }catch ( Exception ignore){}
         //cleanup
         if (this. != null) {
             this..removeChannelListener(this);
             this..removeMembershipListener(this);
         }
         this. = null;
         this. = null;
         this..clear();
         .clear();
         this. = false;
         this. = null;
     }
 
     @Override
     public int hashCode() {
         return Arrays.hashCode(this.);
     }
 
     @Override
     public boolean equals(Object o) {
         if ( !(o instanceof AbstractReplicatedMap)) return false;
         if ( !(o.getClass().equals(this.getClass())) ) return false;
         @SuppressWarnings("unchecked")
         AbstractReplicatedMap<K,V> other = (AbstractReplicatedMap<K,V>)o;
         return Arrays.equals(,other.mapContextName);
     }
 
 //------------------------------------------------------------------------------
 //              GROUP COM INTERFACES
 //------------------------------------------------------------------------------
     public Member[] getMapMembers(HashMap<MemberLongmembers) {
         synchronized (members) {
             Member[] result = new Member[members.size()];
             members.keySet().toArray(result);
             return result;
         }
     }
     public Member[] getMapMembers() {
         return getMapMembers(this.);
     }
 
     public Member[] getMapMembersExcl(Member[] exclude) {
         synchronized () {
             @SuppressWarnings("unchecked"// mapMembers has the correct type
             HashMap<MemberLonglist = (HashMap<MemberLong>).clone();
             for (int i=0; i<exclude.length;i++) list.remove(exclude[i]);
             return getMapMembers(list);
         }
     }


    
Replicates any changes to the object since the last time The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated

Parameters:
complete - if set to true, the object is replicated to its backup if set to false, only objects that implement ReplicatedMapEntry and the isDirty() returns true will be replicated
 
     public void replicate(Object keyboolean complete) {
         if ( .isTraceEnabled() )
             .trace("Replicate invoked on key:"+key);
         MapEntry<K,V> entry = .get(key);
         if ( entry == null ) return;
         if ( !entry.isSerializable() ) return;
         if (entry.isPrimary() && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0) {
             //check to see if we need to replicate this object isDirty()||complete || isAccessReplicate()
             ReplicatedMapEntry rentry = null;
             if (entry.getValue() instanceof ReplicatedMapEntryrentry = (ReplicatedMapEntry)entry.getValue();
             boolean isDirty = rentry != null && rentry.isDirty();
             boolean isAccess = rentry != null && rentry.isAccessReplicate();
             boolean repl = complete || isDirty || isAccess;
 
             if (!repl) {
                 if ( .isTraceEnabled() )
                     .trace("Not replicating:"+key+", no change made");
 
                 return;
             }
             //check to see if the message is diffable
             MapMessage msg = null;
             if (rentry != null && rentry.isDiffable() && (isDirty || complete)) {
                 rentry.lock();
                 try {
                     //construct a diff message
                     msg = new MapMessage(.,
                                          true, (Serializableentry.getKey(), null,
                                          rentry.getDiff(),
                                          entry.getPrimary(),
                                          entry.getBackupNodes());
                     rentry.resetDiff();
                 } catch (IOException x) {
                     .error("Unable to diff object. Will replicate the entire object instead."x);
                 } finally {
                     rentry.unlock();
                 }
             }
             if (msg == null && complete) {
                 //construct a complete
                 msg = new MapMessage(.,
                                      false, (Serializableentry.getKey(),
                                      (Serializableentry.getValue(),
                                      nullentry.getPrimary(),entry.getBackupNodes());
             }
             if (msg == null) {
                 //construct a access message
                 msg = new MapMessage(.,
                         false, (Serializableentry.getKey(), nullnullentry.getPrimary(),
                         entry.getBackupNodes());
             }
             try {
                 if ( !=null && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0 ) {
                     if (rentry != nullrentry.setLastTimeReplicated(System.currentTimeMillis());
                     .send(entry.getBackupNodes(), msg);
                 }
             } catch (ChannelException x) {
                 .error("Unable to replicate data."x);
             }
         } //end if
 
     }

    
This can be invoked by a periodic thread to replicate out any changes. For maps that don't store objects that implement ReplicatedMapEntry, this method should be used infrequently to avoid large amounts of data transfer

Parameters:
complete boolean
 
     public void replicate(boolean complete) {
         Iterator<Map.Entry<K,MapEntry<K,V>>> i = .entrySet().iterator();
         while (i.hasNext()) {
             Map.Entry<?,?> e = i.next();
             replicate(e.getKey(), complete);
         } //while
 
     }
 
     public void transferState() {
         try {
             Member[] members = getMapMembers();
             Member backup = members.length > 0 ? (Membermembers[0] : null;
             if (backup != null) {
                 MapMessage msg = new MapMessage(getStateMessageType(), false,
                                                 nullnullnullnullnull);
                 Response[] resp = .send(new Member[] {backup}, msg.);
                 if (resp.length > 0) {
                     synchronized () {
                         msg = (MapMessageresp[0].getMessage();
                         msg.deserialize(getExternalLoaders());
                         ArrayList<?> list = (ArrayList<?>) msg.getValue();
                         for (int i = 0; i < list.size(); i++) {
                             messageReceived( (Serializablelist.get(i), resp[0].getSource());
                         } //for
                     }
                 } else {
                     .warn("Transfer state, 0 replies, probably a timeout.");
                 }
             }
         } catch (ChannelException x) {
             .error("Unable to transfer LazyReplicatedMap state."x);
         } catch (IOException x) {
             .error("Unable to transfer LazyReplicatedMap state."x);
         } catch (ClassNotFoundException x) {
             .error("Unable to transfer LazyReplicatedMap state."x);
         }
          = true;
     }

    
TODO implement state transfer

Parameters:
msg Serializable
Returns:
Serializable - null if no reply should be sent
 
     @Override
     public Serializable replyRequest(Serializable msgfinal Member sender) {
         if (! (msg instanceof MapMessage))return null;
         MapMessage mapmsg = (MapMessagemsg;
 
         //map init request
         if (mapmsg.getMsgType() == .) {
             mapmsg.setPrimary(.getLocalMember(false));
             return mapmsg;
         }
 
         //map start request
         if (mapmsg.getMsgType() == .) {
             mapmsg.setPrimary(.getLocalMember(false));
             mapMemberAdded(sender);
             return mapmsg;
         }
 
         //backup request
         if (mapmsg.getMsgType() == .) {
             MapEntry<K,V> entry = .get(mapmsg.getKey());
             if (entry == null || (!entry.isSerializable()) )return null;
             mapmsg.setValue( (Serializableentry.getValue());
             return mapmsg;
         }
 
         //state transfer request
         if (mapmsg.getMsgType() == . || mapmsg.getMsgType() == .) {
             synchronized () { //make sure we dont do two things at the same time
                 ArrayList<MapMessagelist = new ArrayList<>();
                 Iterator<Map.Entry<K,MapEntry<K,V>>> i = .entrySet().iterator();
                 while (i.hasNext()) {
                     Map.Entry<?,?> e = i.next();
                     MapEntry<K,V> entry = .get(e.getKey());
                     if ( entry != null && entry.isSerializable() ) {
                         boolean copy = (mapmsg.getMsgType() == .);
                         MapMessage me = new MapMessage(,
                                                        copy?.:.,
                             false, (Serializableentry.getKey(), copy?(Serializableentry.getValue():nullnullentry.getPrimary(),entry.getBackupNodes());
                         list.add(me);
                     }
                 }
                 mapmsg.setValue(list);
                 return mapmsg;
 
             } //synchronized
         }
 
         return null;
 
     }

    
If the reply has already been sent to the requesting thread, the rpc callback can handle any data that comes in after the fact.

Parameters:
msg Serializable
sender Member
 
     @Override
     public void leftOver(Serializable msgMember sender) {
         //left over membership messages
         if (! (msg instanceof MapMessage))return;
 
         MapMessage mapmsg = (MapMessagemsg;
         try {
             mapmsg.deserialize(getExternalLoaders());
             if (mapmsg.getMsgType() == .) {
                 mapMemberAdded(mapmsg.getPrimary());
             } else if (mapmsg.getMsgType() == .) {
                 memberAlive(mapmsg.getPrimary());
             }
         } catch (IOException x ) {
             .error("Unable to deserialize MapMessage.",x);
         } catch (ClassNotFoundException x ) {
             .error("Unable to deserialize MapMessage.",x);
         }
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public void messageReceived(Serializable msgMember sender) {
         if (! (msg instanceof MapMessage)) return;
 
         MapMessage mapmsg = (MapMessagemsg;
         if ( .isTraceEnabled() ) {
             .trace("Map["++"] received message:"+mapmsg);
         }
 
         try {
             mapmsg.deserialize(getExternalLoaders());
         } catch (IOException x) {
             .error("Unable to deserialize MapMessage."x);
             return;
         } catch (ClassNotFoundException x) {
             .error("Unable to deserialize MapMessage."x);
             return;
         }
         if ( .isTraceEnabled() )
             .trace("Map message received from:"+sender.getName()+" msg:"+mapmsg);
         if (mapmsg.getMsgType() == .) {
             mapMemberAdded(mapmsg.getPrimary());
         }
 
         if (mapmsg.getMsgType() == .) {
             memberDisappeared(mapmsg.getPrimary());
         }
 
         if (mapmsg.getMsgType() == .) {
             MapEntry<K,V> entry = .get(mapmsg.getKey());
             if ( entry==null ) {
                 entry = new MapEntry<>((K) mapmsg.getKey(), (V) mapmsg.getValue());
                 MapEntry<K,V> old = .putIfAbsent(entry.getKey(), entry);
                 if (old != null) {
                     entry = old;
                 }
             }
             entry.setProxy(true);
             entry.setBackup(false);
             entry.setBackupNodes(mapmsg.getBackupNodes());
             entry.setPrimary(mapmsg.getPrimary());
         }
 
         if (mapmsg.getMsgType() == .) {
             .remove(mapmsg.getKey());
         }
 
         if (mapmsg.getMsgType() == . || mapmsg.getMsgType() == .) {
             MapEntry<K,V> entry = .get(mapmsg.getKey());
             if (entry == null) {
                 entry = new MapEntry<>((K) mapmsg.getKey(), (V) mapmsg.getValue());
                 entry.setBackup(mapmsg.getMsgType() == .);
                 entry.setProxy(false);
                 entry.setBackupNodes(mapmsg.getBackupNodes());
                 entry.setPrimary(mapmsg.getPrimary());
                 if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) {
                     ((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner());
                 }
             } else {
                 entry.setBackup(mapmsg.getMsgType() == .);
                 entry.setProxy(false);
                 entry.setBackupNodes(mapmsg.getBackupNodes());
                 entry.setPrimary(mapmsg.getPrimary());
                 if (entry.getValue() instanceof ReplicatedMapEntry) {
                     ReplicatedMapEntry diff = (ReplicatedMapEntryentry.getValue();
                     if (mapmsg.isDiff()) {
                         diff.lock();
                         try {
                             diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length);
                         } catch (Exception x) {
                             .error("Unable to apply diff to key:" + entry.getKey(), x);
                         } finally {
                             diff.unlock();
                         }
                     } else {
                         if ( mapmsg.getValue()!=null ) entry.setValue((V) mapmsg.getValue());
                         ((ReplicatedMapEntry)entry.getValue()).setOwner(getMapOwner());
                     } //end if
                 } else if  (mapmsg.getValue() instanceof ReplicatedMapEntry) {
                     ReplicatedMapEntry re = (ReplicatedMapEntry)mapmsg.getValue();
                     re.setOwner(getMapOwner());
                     entry.setValue((V) re);
                 } else {
                     if ( mapmsg.getValue()!=null ) entry.setValue((V) mapmsg.getValue());
                 } //end if
             } //end if
             .put(entry.getKey(), entry);
         } //end if
 
         if (mapmsg.getMsgType() == .) {
             MapEntry<K, V> entry = .get(mapmsg.getKey());
             if (entry != null) {
                 entry.setBackupNodes(mapmsg.getBackupNodes());
                 entry.setPrimary(mapmsg.getPrimary());
                 if (entry.getValue() instanceof ReplicatedMapEntry) {
                     ((ReplicatedMapEntryentry.getValue()).accessEntry();
                 }
             }
         }
     }
 
     @Override
     public boolean accept(Serializable msgMember sender) {
         boolean result = false;
         if (msg instanceof MapMessage) {
             if ( .isTraceEnabled() ) .trace("Map["++"] accepting...."+msg);
             result = Arrays.equals(, ( (MapMessagemsg).getMapId());
             if ( .isTraceEnabled() ) .trace("Msg["++"] accepted["+result+"]...."+msg);
         }
         return result;
     }
 
     public void mapMemberAdded(Member member) {
         if ( member.equals(getChannel().getLocalMember(false)) ) return;
         boolean memberAdded = false;
         //select a backup node if we don't have one
         synchronized () {
             if (!.containsKey(member) ) {
                 .put(membernew Long(System.currentTimeMillis()));
                 memberAdded = true;
             }
         }
         if ( memberAdded ) {
             synchronized () {
                 Iterator<Map.Entry<K,MapEntry<K,V>>> i = .entrySet().iterator();
                 while (i.hasNext()) {
                     Map.Entry<K,MapEntry<K,V>> e = i.next();
                     MapEntry<K,V> entry = .get(e.getKey());
                     if ( entry == null ) continue;
                     if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) {
                         try {
                             Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
                             entry.setBackupNodes(backup);
                             entry.setPrimary(.getLocalMember(false));
                         } catch (ChannelException x) {
                             .error("Unable to select backup node."x);
                         } //catch
                     } //end if
                 } //while
             } //synchronized
         }//end if
     }
 
     public boolean inSet(Member mMember[] set) {
         if ( set == null ) return false;
         boolean result = false;
         for (int i=0; i<set.length && (!result); i++ )
             if ( m.equals(set[i]) ) result = true;
         return result;
     }
 
     public Member[] excludeFromSet(Member[] mbrsMember[] set) {
         ArrayList<Memberresult = new ArrayList<>();
         for (int i=0; i<set.lengthi++ ) {
             boolean include = true;
             for (int j=0; j<mbrs.length && includej++ )
                 if ( mbrs[j].equals(set[i]) ) include = false;
             if ( include ) result.add(set[i]);
         }
         return result.toArray(new Member[result.size()]);
     }
 
     @Override
     public void memberAdded(Member member) {
         //do nothing
     }
 
     @Override
     public void memberDisappeared(Member member) {
         boolean removed = false;
         synchronized () {
             removed = (.remove(member) != null );
             if (!removed) {
                 if (.isDebugEnabled()) .debug("Member["+member+"] disappeared, but was not present in the map.");
                 return//the member was not part of our map.
             }
         }
         if (.isInfoEnabled())
             .info("Member["+member+"] disappeared. Related map entries will be relocated to the new node.");
         long start = System.currentTimeMillis();
         Iterator<Map.Entry<K,MapEntry<K,V>>> i = .entrySet().iterator();
         while (i.hasNext()) {
             Map.Entry<K,MapEntry<K,V>> e = i.next();
             MapEntry<K,V> entry = .get(e.getKey());
             if (entry==nullcontinue;
             if (entry.isPrimary() && inSet(member,entry.getBackupNodes())) {
                 if (.isDebugEnabled()) .debug("[1] Primary choosing a new backup");
                 try {
                     Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
                     entry.setBackupNodes(backup);
                     entry.setPrimary(.getLocalMember(false));
                 } catch (ChannelException x) {
                     .error("Unable to relocate[" + entry.getKey() + "] to a new backup node"x);
                 }
             } else if (member.equals(entry.getPrimary())) {
                 if (.isDebugEnabled()) .debug("[2] Primary disappeared");
                 entry.setPrimary(null);
             } //end if
 
             if ( entry.isProxy() &&
                  entry.getPrimary() == null &&
                  entry.getBackupNodes()!=null &&
                  entry.getBackupNodes().length == 1 &&
                  entry.getBackupNodes()[0].equals(member) ) {
                 //remove proxies that have no backup nor primaries
                 if (.isDebugEnabled()) .debug("[3] Removing orphaned proxy");
                 i.remove();
             } else if ( entry.getPrimary() == null &&
                         entry.isBackup() &&
                         entry.getBackupNodes()!=null &&
                         entry.getBackupNodes().length == 1 &&
                         entry.getBackupNodes()[0].equals(.getLocalMember(false)) ) {
                 try {
                     if (.isDebugEnabled()) .debug("[4] Backup becoming primary");
                     entry.setPrimary(.getLocalMember(false));
                     entry.setBackup(false);
                     entry.setProxy(false);
                     Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
                     entry.setBackupNodes(backup);
                     if ( !=null ) .objectMadePrimary(entry.getKey(),entry.getValue());
 
                 } catch (ChannelException x) {
                     .error("Unable to relocate[" + entry.getKey() + "] to a new backup node"x);
                 }
             }
 
         } //while
         long complete = System.currentTimeMillis() - start;
         if (.isInfoEnabled()) .info("Relocation of map entries was complete in " + complete + " ms.");
     }
 
     public int getNextBackupIndex() {
         int size = .size();
         if (.size() == 0)return -1;
         int node = ++;
         if (node >= size) {
             node = 0;
              = 0;
         }
         return node;
     }
     public Member getNextBackupNode() {
         Member[] members = getMapMembers();
         int node = getNextBackupIndex();
         if ( members.length == 0 || node==-1) return null;
         if ( node >= members.length ) node = 0;
         return members[node];
     }
 
     protected abstract Member[] publishEntryInfo(Object keyObject valuethrows ChannelException;
 
     @Override
     public void heartbeat() {
         try {
             ping();
         }catch ( Exception x ) {
             .error("Unable to send AbstractReplicatedMap.ping message",x);
         }
     }
 
 //------------------------------------------------------------------------------
 //              METHODS TO OVERRIDE
 //------------------------------------------------------------------------------
 
    
Removes an object from this map, it will also remove it from

Parameters:
key Object
Returns:
Object
 
     @Override
     public V remove(Object key) {
         return remove(key,true);
     }
     public V remove(Object keyboolean notify) {
         MapEntry<K,V> entry = .remove(key);
 
         try {
             if (getMapMembers().length > 0 && notify) {
                 MapMessage msg = new MapMessage(getMapContextName(), .false, (Serializablekeynullnullnull,null);
                 getChannel().send(getMapMembers(), msggetChannelSendOptions());
             }
         } catch ( ChannelException x ) {
             .error("Unable to replicate out data for a LazyReplicatedMap.remove operation",x);
         }
         return entry!=null?entry.getValue():null;
     }
 
     public MapEntry<K,V> getInternal(Object key) {
         return .get(key);
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public V get(Object key) {
         MapEntry<K,V> entry = .get(key);
         if (.isTraceEnabled()) .trace("Requesting id:"+key+" entry:"+entry);
         if ( entry == null ) return null;
         if ( !entry.isPrimary() ) {
             //if the message is not primary, we need to retrieve the latest value
             try {
                 Member[] backup = null;
                 MapMessage msg = null;
                 if ( !entry.isBackup() ) {
                     //make sure we don't retrieve from ourselves
                     msg = new MapMessage(getMapContextName(), .false,
                                          (Serializablekeynullnullnull,null);
                     Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg..getRpcTimeout());
                     if (resp == null || resp.length == 0) {
                         //no responses
                         .warn("Unable to retrieve remote object for key:" + key);
                         return null;
                     }
                     msg = (MapMessageresp[0].getMessage();
                     msg.deserialize(getExternalLoaders());
                     backup = entry.getBackupNodes();
                     if ( entry.getValue() instanceof ReplicatedMapEntry ) {
                         ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
                         val.setOwner(getMapOwner());
                     }
                     if ( msg.getValue()!=null ) entry.setValue((V) msg.getValue());
                 }
                 if (entry.isBackup()) {
                     //select a new backup node
                     backup = publishEntryInfo(keyentry.getValue());
                 } else if ( entry.isProxy() ) {
                     //invalidate the previous primary
                     msg = new MapMessage(getMapContextName(),.,false,(Serializable)key,null,null,.getLocalMember(false),backup);
                     Member[] dest = getMapMembersExcl(backup);
                     if ( dest!=null && dest.length >0) {
                         getChannel().send(destmsggetChannelSendOptions());
                     }
                     if ( entry.getValue() != null && entry.getValue() instanceof ReplicatedMapEntry ) {
                         ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
                         val.setOwner(getMapOwner());
                     }
                 }
                 entry.setPrimary(.getLocalMember(false));
                 entry.setBackupNodes(backup);
                 entry.setBackup(false);
                 entry.setProxy(false);
                 if ( getMapOwner()!=null ) getMapOwner().objectMadePrimary(keyentry.getValue());
 
             } catch (Exception x) {
                 .error("Unable to replicate out data for a LazyReplicatedMap.get operation"x);
                 return null;
             }
         }
         if (.isTraceEnabled()) .trace("Requesting id:"+key+" result:"+entry.getValue());
         return entry.getValue();
     }
 
 
     protected void printMap(String header) {
         try {
             ..println("\nDEBUG MAP:"+header);
             ..println("Map[" +
                     new String(.) +
                     ", Map Size:" + .size());
             Member[] mbrs = getMapMembers();
             for ( int i=0; i<mbrs.length;i++ ) {
                 ..println("Mbr["+(i+1)+"="+mbrs[i].getName());
             }
             Iterator<Map.Entry<K,MapEntry<K,V>>> i = .entrySet().iterator();
             int cnt = 0;
 
             while (i.hasNext()) {
                 Map.Entry<?,?> e = i.next();
                 ..println( (++cnt) + ". " + .get(e.getKey()));
             }
             ..println("EndMap]\n\n");
         }catch ( Exception ignore) {
             ignore.printStackTrace();
         }
     }

    
Returns true if the key has an entry in the map. The entry can be a proxy or a backup entry, invoking get(key) will make this entry primary for the group

Parameters:
key Object
Returns:
boolean
 
     @Override
     public boolean containsKey(Object key) {
         return .containsKey(key);
     }
 
     @Override
     public V put(K key, V value) {
         return put(keyvaluetrue);
     }
 
     public V put(K key, V valueboolean notify) {
         MapEntry<K,V> entry = new MapEntry<>(keyvalue);
         entry.setBackup(false);
         entry.setProxy(false);
         entry.setPrimary(.getLocalMember(false));
 
         V old = null;
 
         //make sure that any old values get removed
         if ( containsKey(key) ) old = remove(key);
         try {
             if ( notify ) {
                 Member[] backup = publishEntryInfo(keyvalue);
                 entry.setBackupNodes(backup);
             }
        } catch (ChannelException x) {
            .error("Unable to replicate out data for a LazyReplicatedMap.put operation"x);
        }
        .put(key,entry);
        return old;
    }


    
Copies all values from one map to this instance

Parameters:
m Map
    @Override
    public void putAll(Map<? extends K, ? extends V> m) {
        Iterator<?> i = m.entrySet().iterator();
        while ( i.hasNext() ) {
            @SuppressWarnings("unchecked")
            Map.Entry<K,V> entry = (Map.Entry<K,V>) i.next();
            put(entry.getKey(),entry.getValue());
        }
    }
    @Override
    public void clear() {