Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright (C) 2009 eXo Platform SAS.
   *
   * This is free software; you can redistribute it and/or modify it
   * under the terms of the GNU Lesser General Public License as
   * published by the Free Software Foundation; either version 2.1 of
   * the License, or (at your option) any later version.
   *
   * This software is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  * Lesser General Public License for more details.
  *
  * You should have received a copy of the GNU Lesser General Public
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 package org.exoplatform.services.jcr.ext.replication.transport;
 
 
 import java.util.List;
Created by The eXo Platform SAS.
Date: 12.12.2008

Author(s):
Alex Reshetnyak
Version:
$Id: ChannelManager.java 31925 2009-05-19 07:40:04Z rainf0x $
 
 public class ChannelManager implements RequestHandlerMembershipListener
 {

   
The initialized state.
 
    public static final int INITIALIZED = 1;

   
The connected state.
 
    public static final int CONNECTED = 2;

   
The disconnected state.
 
    public static final int DISCONNECTED = 3;

   
log. the apache logger.
 
    private static final Log LOG = ExoLogger.getLogger("exo.jcr.component.ext.AsyncChannelManager");

   
State of async channel manager {INITIALIZED, CONNECTED, DISCONNECTED}.
 
    protected int state;

   
The JChanel.
 
    protected JChannel channel;

   
dispatcher. The MessageDispatcher will be transmitted the Massage.
 
    protected MessageDispatcher dispatcher;

   
channelConfig. The configuration to JChannel.
 
    protected final String channelConfig;

   
channelName. The name to channel.
 
    protected final String channelName;

   
Members count according the configuration (other-participants-priority).
   private final int confMembersCount;

   
Packet listeners.
   private List<PacketListenerpacketListeners;

   
Channel state listeners.
   private List<StateListenerstateListeners;

   
Channel connection sate listeners.
   private final List<ConnectionListenerconnectionListeners;

   
Packets handler.
   protected final PacketHandler packetsHandler;

   
This latch will be used for sending pocket after successful connection.
   private CountDownLatch latch;

   
MemberPacket.
   class MemberPacket
   {
      
packet.
      final AbstractPacket packet;

      
member.
      final MemberAddress member;

      
MemberPacket constructor.

Parameters:
packet AbstractPacket, the packet
member MemebrAddress, the member address
      MemberPacket(AbstractPacket packetMemberAddress member)
      {
         this. = packet;
         this. = member;
      }
   }

   
PacketHandler.
   protected class PacketHandler extends Thread
   {

      
Wait lock.
      private final Object lock = new Object();

      
Packets queue.
      private final ConcurrentLinkedQueue<MemberPacketqueue = new ConcurrentLinkedQueue<MemberPacket>();

      
User flag.
      private MemberPacket current;

      
      @Override
      public void run()
      {
         while (true)
         {
            try
            {
               synchronized ()
               {
                   = .poll();
                  while ( != null)
                  {
                     PacketListener[] pl = .toArray(new PacketListener[.size()]);
                     for (PacketListener handler : pl)
                        handler.receive(..);
                      = .poll();
                  }
                  .wait();
               }
            }
            catch (InterruptedException e)
            {
               .error("Cannot handle the queue. Wait lock failed " + ee);
            }
            catch (Throwable e)
            {
               .error("Cannot handle the queue now. Error " + ee);
               try
               {
                  sleep(5000);
               }
               catch (Throwable e1)
               {
                  .error("Sleep error " + e1);
               }
            }
         }
      }

      
Add packet to the queue.

Parameters:
packet AbstractPacket
member Member
      public void add(AbstractPacket packetMemberAddress member)
      {
         .add(new MemberPacket(packetmember));
      }

      
Run handler if channel is ready.
      public void handle()
      {
         if ( == null)
         {
            synchronized ()
            {
               .notify();
            }
            // JCR-886: let other threads work
            Thread.yield();
         }
         else if (.isDebugEnabled())
            .debug("Handler already active, queue size : " + .size());
      }
   }

   
ChannelManager constructor.

Parameters:
channelConfig channel configuration
channelName name of channel
confMembersCount the how many members was configured
   public ChannelManager(String channelConfigString channelNameint confMembersCount)
   {
      this. = ;
      this. = channelConfig;
      this. = channelName;
      this. = confMembersCount;
      this. = new ArrayList<PacketListener>();
      this. = new ArrayList<StateListener>();
      this. = new PacketHandler();
      this..start();
   }

   
Tell if manager is connected to the channel and ready to work.

Returns:
boolean, true if connected
   public boolean isConnected()
   {
      return  != null;
   }

   
Connect to channel.

Throws:
org.exoplatform.services.jcr.ext.replication.ReplicationException Will be generated the ReplicationException.
   public void connect() throws ReplicationException
   {
      try
      {
         if ( == null)
         {
             = new CountDownLatch(1);
             = new JChannel();
            .setOpt(..);
            .setOpt(..);
             = new MessageDispatcher(nullnullnull);
            .setRequestHandler(this);
            .setMembershipListener(this);
         }
      }
      catch (ChannelException e)
      {
         throw new ReplicationException("Can't create JGroups channel"e);
      }
      .info("Channel name : " + );
      try
      {
         .connect();
         this. = ;
      }
      catch (ChannelException e)
      {
         throw new ReplicationException("Can't connect to JGroups channel"e);
      }
      finally
      {
         .countDown();
      }
   }

   
closeChannel. Close the channel.
   public synchronized void disconnect()
   {
      this. = ;
      if ( != null)
      {
         .setRequestHandler(null);
         .setMembershipListener(null);
         .stop();
          = null;
         if (.isDebugEnabled())
            .debug("dispatcher stopped");
         try
         {
            Thread.sleep(3000);
         }
         catch (InterruptedException e)
         {
            .error("The interapted on disconnect : " + ee);
         }
      }
      if ( != null)
      {
         .disconnect();
         if (.isDebugEnabled())
            .debug("channel disconnected");
         try
         {
            Thread.sleep(5000);
         }
         catch (InterruptedException e)
         {
            .error("The interapted on disconnect : " + ee);
         }
         .close();
          = null;
         if (.isDebugEnabled())
            .debug("Disconnect done, fire connection listeners");
         for (ConnectionListener cl : )
         {
            cl.onDisconnect();
         }
      }
   }

   
addPacketListener.

Parameters:
packetListener add the PacketListener
   public void addPacketListener(PacketListener packetListener)
   {
      this..add(packetListener);
   }

   
Remove PacketListener.

Parameters:
packetListener add the PacketListener
   public void removePacketListener(PacketListener packetListener)
   {
      this..remove(packetListener);
   }

   
Add channel state listener (AsynInitializer).

Parameters:
listener StateListener
   public void addStateListener(StateListener listener)
   {
      this..add(listener);
   }

   
Remove SatateListener.

Parameters:
listener StateListener
   public void removeStateListener(StateListener listener)
   {
      this..remove(listener);
   }

   
Add connection sate listener.

Parameters:
listener ConnectionListener
   public void addConnectionListener(ConnectionListener listener)
   {
      this..add(listener);
   }

   
Remove connection listener.

Parameters:
listener ConnectionListener
   public void removeConnectionListener(ConnectionListener listener)
   {
      this..remove(listener);
   }

   
getDispatcher.

Returns:
MessageDispatcher return the MessageDispatcher object
   {
      return ;
   }

   
getOtherMembers.

Returns:
List list of other members.
   {
      List<Addresslist = new ArrayList<Address>(.getView().getMembers());
      list.remove(.getLocalAddress());
      List<MemberAddressmembers = new ArrayList<MemberAddress>();
      for (Address address : list)
         members.add(new MemberAddress(address));
      return members;
   }

   
sendPacket.

Parameters:
packet the Packet with content
destinations the destination addresses
Throws:
java.io.IOException will be generated Exception
   public void sendPacket(AbstractPacket packetMemberAddress... destinationsthrows IOException
   {
      if ( != null && .getCount() != 0)
      {
         try
         {
            .await();
         }
         catch (InterruptedException e)
         {
            throw new RuntimeException(e);
         }
      }
      if ( == )
      {
         Vector<Addressdest = new Vector<Address>();
         for (MemberAddress address : destinations)
            dest.add(address.getAddress());
         sendPacket(packetdest);
      }
      else if ( == )
         throw new ChannelNotConnectedException("The channel is not connected.");
      else
         throw new ChannelWasDisconnectedException("The channel was disconnected.");
   }

   
Send packet using Vector of dests.

Parameters:
packet AbstractPacket
dest Vector of Address
Throws:
java.io.IOException if error
   private void sendPacket(AbstractPacket packetVector<Addressdestthrows IOException
   {
      if ( == )
      {
         byte[] buffer = PacketTransformer.getAsByteArray(packet);
         Message msg = new Message(nullnullbuffer);
         if ( ==  ||  == null)
            throw new ChannelWasDisconnectedException("The channel was disconnected.");
         .castMessage(destmsg., 0);
      }
   }

   
Send packet to all members.

Parameters:
packet the Packet with contents
Throws:
java.io.IOException will be generated Exception
   public void sendPacket(AbstractPacket packetthrows IOException
   {
      if ( != null && .getCount() != 0)
      {
         try
         {
            .await();
         }
         catch (InterruptedException e)
         {
            throw new RuntimeException(e);
         }
      }
      if ( == )
      {
         Vector<Addressdest = new Vector<Address>(.getView().getMembers());
         dest.remove(.getLocalAddress());
         sendPacket(packetdest);
      }
      else if ( == )
         throw new ChannelNotConnectedException("The channel is not connected.");
      else
         throw new ChannelWasDisconnectedException("The channel was disconnected.");
   }

   
getChannel.

Returns:
JChannel return the JChannel object
   public JChannel getChannel()
   {
      return ;
   }
   // ************ RequestHandler **********

   
   public Object handle(final Message message)
   {
      if (isConnected())
      {
         try
         {
            .add(PacketTransformer.getAsPacket(message.getBuffer()), new MemberAddress(message.getSrc()));
            if (.getView() != null)
            {
               if (.getView().getMembers().size() == )
               {
                  .handle();
               }
               else
               {
                  .warn("Not all members connected to the channel " + +.getView().getMembers().size()
                     + " != " +  + ", queue message " + message);
               }
            }
            else
            {
               .warn("No members found or channel closed, queue message " + message);
            }
            return new String("Success");
         }
         catch (IOException e)
         {
            .error("Message handler error " + ee);
            return e.getMessage();
         }
         catch (ClassNotFoundException e)
         {
            .error("Message handler error " + ee);
            return e.getMessage();
         }
      }
      else
      {
         .warn("Channel is closed but message received " + message);
         return new String("Disconnected");
      }
   }
   // ******** MembershipListener ***********

   
   public void viewAccepted(View view)
   {
      if (isConnected())
      {
         .info("View accepted " + view.printDetails());
         ArrayList<MemberAddressmembers = new ArrayList<MemberAddress>();
         for (Address address : view.getMembers())
            members.add(new MemberAddress(address));
         StateEvent event = new StateEvent(new MemberAddress(.getLocalAddress()), members);
         for (StateListener listener : )
            listener.onStateChanged(event);
         // check if we have data to be propagated to the synchronization
         .handle();
      }
      else
         .warn("Channel is closed but View accepted " + view.printDetails());
   }

   
   public void block()
   {
   }

   
   public void suspect(Address arg0)
   {
   }
   // *****************************************
New to GrepCode? Check out our FAQ X