Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright (C) 2009 eXo Platform SAS.
   *
   * This is free software; you can redistribute it and/or modify it
   * under the terms of the GNU Lesser General Public License as
   * published by the Free Software Foundation; either version 2.1 of
   * the License, or (at your option) any later version.
   *
   * This software is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  * Lesser General Public License for more details.
  *
  * You should have received a copy of the GNU Lesser General Public
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 package org.exoplatform.services.jcr.ext.replication.recovery;
 
 
 import java.io.File;
 import java.util.List;

Created by The eXo Platform SAS.

Author(s):
Alex Reshetnyak
Version:
$Id: RecoverySynchronizer.java 34445 2009-07-24 07:51:18Z dkatayev $
 
 
 public class RecoverySynchronizer
 {
   
The apache logger.
 
    private static Log log = ExoLogger.getLogger("exo.jcr.component.ext.RecoverySynchronizer");

   
Definition the folder to ChangesLog.
 
    // private File recoveryDir;
    
The FileNameFactory will be generated name of file to binary ChangesLog.
 
    private FileNameFactory fileNameFactory;

   
The FileCleaner will delete the temporary files.
 
    private FileCleaner fileCleaner;

   
The ReplicationChannelManager will be transmitted or receive the Packets.
 
The own name in cluster.
 
    private String ownName;

   
The system identification string.
 
    private String systemId;

   
The RecoveryReader will be read the binary ChangesLog on file system.
 
    private RecoveryReader recoveryReader;

   
The RecoveryReader will be wrote the ChangesLog to file system.
 
    private RecoveryWriter recoveryWriter;

   
The HashMap with PendingBinaryFile.
The ChangesLogs will be saved on ItemDataKeeper.
   private ItemDataKeeper dataKeeper;

   
The list of names other participants who was initialized.
The list of names other participants who was Synchronized successful.
   private List<StringsuccessfulSynchronizedList;

   
The flag for local synchronization.
   private volatile boolean localSynchronization = false;

   
RecoverySynchronizer constructor.

Parameters:
recoveryDir the recovery dir
fileNameFactory the FileNameFactory
fileCleaner the FileCleaner
channelManager the ReplicationChannelManager
ownName the own name
recoveryWriter the RecoveryWriter
recoveryReader the RecoveryReader
systemId the system identification string
   public RecoverySynchronizer(File recoveryDirFileNameFactory fileNameFactoryFileCleaner fileCleaner,
      ReplicationChannelManager channelManagerString ownNameRecoveryWriter recoveryWriter,
      RecoveryReader recoveryReaderString systemId)
   {
      // this.recoveryDir = recoveryDir;
      this. = fileNameFactory;
      this. = fileCleaner;
      this. = channelManager;
      this. = ownName;
      this. = systemId;
      this. = recoveryReader;
      this. = recoveryWriter;
   }

   
Will be initialized the synchronization.
   public void synchronizRepository()
   {
      try
      {
         if ()
         {
            .info("Synchronization init...");
            Packet packet =
               new Packet(.., IdGenerator.generate(), , Calendar
                  .getInstance());
            .sendPacket(packet);
         }
      }
      catch (Exception e)
      {
         .error("Synchronization error"e);
      }
   }

   
send.

Parameters:
packet the Packet
Throws:
java.lang.Exception will be generated the Exception
   private void send(Packet packetthrows Exception
   {
      byte[] buffer = Packet.getAsByteArray(packet);
      if (buffer.length <= .)
      {
         .send(buffer);
      }
      else
         .sendBigPacket(bufferpacket);
   }

   
processingPacket.

Parameters:
packet the Packet
status before status
Returns:
int after status
Throws:
java.lang.Exception will be generated the Exception
   public int processingPacket(Packet packetint statusthrows Exception
   {
      int stat = status;
      switch (packet.getPacketType())
      {
            sendChangesLogUpDate(packet.getTimeStamp(), packet.getOwnerName(), packet.getIdentifier());
            break;
         case .. :
            PendingBinaryFile container = .get(packet.getIdentifier());
            if (container == null)
            {
               container = new PendingBinaryFile();
               .put(packet.getIdentifier(), container);
            }
            ChangesFile chf;
            synchronized (container)
            {
               chf = container.getChangesFile(packet.getOwnerName(), packet.getFileName());
               if (chf == null)
               {
                  chf =
                     container.addChangesFile(packet.getOwnerName(), packet.getFileName(), packet.getSystemId(), packet
                        .getTotalPacketCount());
               }
            }
            chf.write(packet.getOffset(), packet.getByteArray());
            if (chf.isStored())
            {
               if (.isDebugEnabled())
                  .debug("Last packet of file has been received : " + packet.getFileName());
            }
            break;
            if (.containsKey(packet.getIdentifier()))
            {
               PendingBinaryFile pbf = .get(packet.getIdentifier());
               pbf.addToSuccessfulTransferCounter(packet.getSize());
               if (pbf.isSuccessfulTransfer())
               {
                  if (.isDebugEnabled())
                     .debug("The signal ALL_BinaryFile_transferred_OK has been received  from "
                        + packet.getOwnerName());
                  List<ChangesFilefileDescriptorList = pbf.getSortedFilesDescriptorList();
                  if (.isDebugEnabled())
                     .info("fileDescriptorList.size() == pbf.getNeedTransferCounter() : "
                        + fileDescriptorList.size() + "== " + pbf.getNeedTransferCounter());
                  if (fileDescriptorList.size() == pbf.getNeedTransferCounter())
                  {
                     List<StringfailList = new ArrayList<String>();
                     for (ChangesFile fileDescriptor : fileDescriptorList)
                     {
                        try
                        {
                           TransactionChangesLog transactionChangesLog =
                              .getChangesLog(PrivilegedFileHelper.getAbsolutePath(fileDescriptor.getFile()));
                           transactionChangesLog.setSystemId(fileDescriptor.getSystemId());
                           Calendar cLogTime = .getDateFromFileName(fileDescriptor.getFile().getName());
                           if (.isDebugEnabled())
                           {
                              .debug("Save to JCR : "
                                 + PrivilegedFileHelper.getAbsolutePath(fileDescriptor.getFile()));
                              .debug("SystemID : " + transactionChangesLog.getSystemId());
                              .debug("list size : " + fileDescriptorList.size());
                           }
                           // dump log
                           if (.isDebugEnabled())
                           {
                              ChangesLogIterator logIterator = transactionChangesLog.getLogIterator();
                              while (logIterator.hasNextLog())
                              {
                                 PlainChangesLog pcl = logIterator.nextLog();
                                 .debug(pcl.dump());
                              }
                           }
                           saveChangesLog(transactionChangesLogcLogTime);
                           if (.isDebugEnabled())
                           {
                              .debug("After save message: the owner systemId --> "
                                 + transactionChangesLog.getSystemId());
                              .debug("After save message: --> " + );
                           }
                        }
                        catch (Exception e)
                        {
                           failList.add(fileDescriptor.getFile().getName());
                           .error("Can't save to JCR "e);
                        }
                     }
                     // Send file name list
                     List<StringfileNameList =
                        new ArrayList<String>(.get(packet.getIdentifier()).getFileNameList());
                     if (failList.size() != 0)
                        fileNameList.removeAll(failList);
                     Packet packetFileNameList =
                        new Packet(..packet.getIdentifier(), ,
                           fileNameList);
                     send(packetFileNameList);
                     .info("The " + fileDescriptorList.size() + " changeslogs were received and "
                        + fileNameList.size() + " saved");
                  }
                  else if (.isDebugEnabled())
                  {
                     .debug("Do not start save : " + fileDescriptorList.size() + " of "
                        + pbf.getNeedTransferCounter());
                  }
               }
            }
            break;
         case .. :
            long removeCounter = .removeChangesLog(packet.getFileNameList(), packet.getOwnerName());
            if (.isDebugEnabled())
               .debug("Remove from file system : " + removeCounter);
            Packet removedOldChangesLogPacket =
               new Packet(..packet.getIdentifier(), );
            removedOldChangesLogPacket.setSize(removeCounter);
            .sendPacket(removedOldChangesLogPacket);
            break;
            if (.containsKey(packet.getIdentifier()) == true)
            {
               PendingBinaryFile pbf = .get(packet.getIdentifier());
               pbf.setRemovedOldChangesLogCounter(pbf.getRemovedOldChangesLogCounter() + packet.getSize());
               if (pbf.isAllOldChangesLogsRemoved())
               {
                  // remove temporary files
                  for (ChangesFile fd : pbf.getSortedFilesDescriptorList())
                     .addFile(fd.getFile());
                  // remove PendingBinaryFile
                  .remove(packet.getIdentifier());
                  // next iteration
                  if (.isDebugEnabled())
                     .debug("Next iteration of recovery ...");
                  synchronizRepository();
               }
            }
            else
               .warn("Can not find the PendingBinaryFile whith id: " + packet.getIdentifier());
            break;
         case .. :
            if (.containsKey(packet.getIdentifier()) == false)
               .put(packet.getIdentifier(), new PendingBinaryFile());
            PendingBinaryFile pbf = .get(packet.getIdentifier());
            pbf.setNeedTransferCounter(pbf.getNeedTransferCounter() + packet.getSize());
            if (.isDebugEnabled())
               .debug("NeedTransferCounter : " + pbf.getNeedTransferCounter());
            break;
         case .. :
            if (.contains(packet.getOwnerName()) == false)
               .add(packet.getOwnerName());
            {
               stat = .;
                = false;
            }
            break;
         default :
            break;
      }
      return stat;
   }

   
sendChangesLogUpDate.

Parameters:
timeStamp the update to this date
ownerName the member name who initialize synchronization
identifier the operation identifier
   private void sendChangesLogUpDate(Calendar timeStampString ownerNameString identifier)
   {
      try
      {
         if (.isDebugEnabled())
            .debug("+++ sendChangesLogUpDate() +++ : " + Calendar.getInstance().getTime().toGMTString());
         List<StringfilePathList = .getFilePathList(timeStampownerName);
         Packet needTransferCounter = new Packet(..identifier);
         needTransferCounter.setSize(filePathList.size());
         .sendPacket(needTransferCounter);
         if (filePathList.size() > 0)
         {
            for (String filePath : filePathList)
            {
               .sendBinaryFile(filePathownerNameidentifier,
                  ..);
            }
            Packet endPocket = new Packet(..identifier);
            endPocket.setOwnName(ownerName);
            endPocket.setSize(filePathList.size());
            .sendPacket(endPocket);
         }
         else
         {
            Packet synchronizedOKPacket =
               new Packet(.., IdGenerator.generate(), ownerName);
            .sendPacket(synchronizedOKPacket);
         }
      }
      catch (Exception e)
      {
         .error("ChangesLogs was send with error"e);
      }
   }

   
setDataKeeper.

Parameters:
dataKeeper the ItemDataKeeper
   public void setDataKeeper(ItemDataKeeper dataKeeper)
   {
      this. = dataKeeper;
   }

   
updateInitedParticipantsClusterList.

Parameters:
list the list of initialized members
   public void updateInitedParticipantsClusterList(Collection<? extends Stringlist)
   {
   }

   
localSynchronization.
   public void localSynchronization()
   {
       = true;
   }

   
saveChangesLog.

Parameters:
dataManager the ItemDataKeeper
changesLog the ChangesLog with data
cLogTime the date of ChangesLog
Throws:
org.exoplatform.services.jcr.ext.replication.ReplicationException will be generated the ReplicationException
   private void saveChangesLog(ItemDataKeeper dataManagerTransactionChangesLog changesLogCalendar cLogTime)
      throws ReplicationException
   {
      try
      {
         try
         {
            dataManager.save(changesLog);
         }
         catch (JCRInvalidItemStateException e)
         {
            TransactionChangesLog normalizeChangesLog =
               getNormalizedChangesLog(e.getIdentifier(), e.getState(), changesLog);
            if (normalizeChangesLog != null)
               saveChangesLog(dataManagernormalizeChangesLogcLogTime);
         }
      }
      catch (Throwable t)
      {
         throw new ReplicationException("Save error. Log time " + cLogTime.getTime(), t);
      }
   }

   
getNormalizedChangesLog.

Parameters:
collisionID String, id of collision
state int, the state
changesLog TransactionChangesLog, the changes log
Returns:
TransactionChangesLog return the normalized changes log
   private TransactionChangesLog getNormalizedChangesLog(String collisionIDint stateTransactionChangesLog changesLog)
   {
      ItemState citem = changesLog.getItemState(collisionID);
      if (citem != null)
      {
         TransactionChangesLog result = new TransactionChangesLog();
         result.setSystemId(changesLog.getSystemId());
         ChangesLogIterator cli = changesLog.getLogIterator();
         while (cli.hasNextLog())
         {
            ArrayList<ItemStatenormalized = new ArrayList<ItemState>();
            PlainChangesLog next = cli.nextLog();
            for (ItemState change : next.getAllStates())
            {
               if (state == change.getState())
               {
                  ItemData item = change.getData();
                  // targeted state
                  if (citem.isNode())
                  {
                     // Node... by ID and desc path
                     if (!item.getIdentifier().equals(collisionID)
                        && !item.getQPath().isDescendantOf(citem.getData().getQPath()))
                        normalized.add(change);
                  }
                  else if (!item.getIdentifier().equals(collisionID))
                  {
                     // Property... by ID
                     normalized.add(change);
                  }
               }
               else
                  // another state
                  normalized.add(change);
            }
            PlainChangesLog plog = new PlainChangesLogImpl(normalizednext.getSessionId(), next.getEventType());
            result.addLog(plog);
         }
         return result;
      }
      return null;
   }
Counter.
class Counter
{

   
The count value.
   int count = 0;

   
inc.

Returns:
int return the value of count
   public int inc()
   {
      return ++;
   }

   
clear.
   public void clear()
   {
       = 0;
   }

   
getValue.

Returns:
int return the value of count
   public int getValue()
   {
      return ;
   }
New to GrepCode? Check out our FAQ X