Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright (C) 2009 eXo Platform SAS.
   *
   * This is free software; you can redistribute it and/or modify it
   * under the terms of the GNU Lesser General Public License as
   * published by the Free Software Foundation; either version 2.1 of
   * the License, or (at your option) any later version.
   *
   * This software is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  * Lesser General Public License for more details.
  *
  * You should have received a copy of the GNU Lesser General Public
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 package org.exoplatform.services.jcr.ext.replication;
 
 import  org.picocontainer.Startable;
 
 import java.io.File;
 import java.util.List;
 
Created by The eXo Platform SAS.

Author(s):
Alex Reshetnyak
Version:
$Id: ReplicationService.java 34445 2009-07-24 07:51:18Z dkatayev $
 
 @ManagedDescription("JCR replication service")
 @NameTemplate(@Property(key = "service", value = "replication"))
 public class ReplicationService implements Startable, ManagementAware
 {

   
The apache logger.
 
    private static Log log = ExoLogger.getLogger("exo.jcr.component.ext.ReplicationService");

   
The service name.
 
    private static final String SERVICE_NAME = "Replication";

   
The template for ip-address in configuration.
 
    private static final String IP_ADRESS_TEMPLATE = "[$]bind-ip-address";

   
The persistent mode to replication.
 
    private static final String PERSISTENT_MODE = "persistent";

   
The proxy mode to replication.
   private static final String PROXY_MODE = "proxy";

   
Definition the static type for priority mechanism.
   public static final String PRIORITY_STATIC_TYPE = "static";

   
Definition the dynamic type for priority mechanism.
   public static final String PRIORITY_DYNAMIC_TYPE = "dynamic";

   
Definition the generic type for priority mechanism.
   public static final String PRIORITY_GENERIC_TYPE = "generic";

   
Definition the timeout to FileCLeaner.
   public static final int FILE_CLEANRE_TIMEOUT = 30030;

   
The RepositorySerice.
   private RepositoryService repoService;

   
The RegistryService.
   private RegistryService registryService;

   
Parameters to initialize.
   private InitParams initParams;

   
The testMode using only for testing.
   private String testMode;

   
If 'enabled' is false then ReplicationServis not started.
   private String enabled;

   
The replication mode (persistent or proxy).
   private String mode;

   
Bind to IP address.
   private String bindIPAddress;

   
The channel configuration.
   private String channelConfig;

   
The channel name.
   private String channelName;

   
The list of repositories. Fore this repositories will be worked replication.
   private List<StringrepoNamesList;

   
If ChangesLog was not delivered, then ChangesLog will be saved in this folder.
   private File recoveryDir;

   
If ChangesLog was not delivered, then ChangesLog will be saved in this folder.
   private String recDir;

   
The name of cluster node.
   private String ownName;

   
The list of names other participants.
   private List<StringparticipantsClusterList;

   
The names other participants.
   private String participantsCluster;

   
The definition timeout, how many time will be waited for successful save the Changeslog.
   private long waitConfirmation;

   
The definition timeout, how many time will be waited for successful save the Changeslog.
   private String sWaitConfirmation;

   
Will be started full backup if 'backupEnabled' is 'true'.
   private boolean backupEnabled;

   
Will be started full backup if 'backupEnabled' is 'true'.
   private String sBackupEnabled;

   
The definition of backup folder.
   private File backupDir;

   
The definition of backup folder.
   private String sBackupDir;

   
The definition of backup delay. Will be waited 'backupDelayTime' milliseconds before start full backup.
   private long backupDelayTime = 0;

   
The definition of backup delay. Will be waited 'backupDelayTime' milliseconds before start full backup.
   private String sDelayTime;

   
The list of BackupCreators. The BackupCreator will be started full backup.
   private List<BackupCreatorbackupCreatorList;

   
If 'started' is true then ReplicationServis was successful started.
   private boolean started;

   
The definition of priority type. (PRIORITY_STATIC_TYPE or PRIORITY_DYNAMIC_TYPE)
   private String priprityType;

   
The definition of priority value.
   private int ownPriority;

   
The definition of priority value.
   private String ownValue;

   
The management context.
ReplicationService constructor.

Parameters:
repoService the RepositoryService
params the configuration parameters
Throws:
RepositoryConfigurationException will be generated RepositoryConfigurationException
   {
      this(repoServiceparamsnull);
   }

   
ReplicationService constructor.

Parameters:
repoService the RepositoryService
params the configuration parameters
registryService the RegistryService
Throws:
RepositoryConfigurationException will be generated RepositoryConfigurationException
   public ReplicationService(RepositoryService repoServiceInitParams paramsRegistryService registryService)
   {
       = false;
      this. = repoService;
      this. = registryService;
      this. = params;
   }

   
   public void start()
   {
      {
         SessionProvider sessionProvider = SessionProvider.createSystemProvider();
         try
         {
            readParamsFromRegistryService(sessionProvider);
         }
         catch (Exception e)
         {
            readParamsFromFile();
            try
            {
               writeParamsToRegistryService(sessionProvider);
            }
            catch (Exception exc)
            {
               .error("Cannot write init configuration to RegistryService."exc);
            }
         }
         finally
         {
            sessionProvider.close();
         }
      }
      else
      {
         readParamsFromFile();
      }
      try
      {
         for (int rIndex = 0; rIndex < .size(); rIndex++)
         {
            RepositoryImpl jcrRepository = (RepositoryImpl).getRepository(.get(rIndex));
            String[] workspaces = jcrRepository.getWorkspaceNames();
            if (.equals("true"))
            {
               // set ownName & participantsClusterList for test mode
               if ( != null && "true".equals())
               {
                   = (rIndex == 0 ? "cluster_node_1" : "cluster_node_2");
                   = new ArrayList<String>();
                  if (rIndex == 0)
                  {
                      = 100;
                     .add("cluster_node_2");
                  }
                  else
                  {
                      = 50;
                     .add("cluster_node_1");
                  }
               }
               for (int wIndex = 0; wIndex < workspaces.lengthwIndex++)
                  try
                  {
                     // create the recovery for workspace
                     File dir =
                        new File(PrivilegedFileHelper.getAbsolutePath() + .
                           + .get(rIndex) + "_" + workspaces[wIndex]);
                     PrivilegedFileHelper.mkdirs(dir);
                     String systemId = IdGenerator.generate();
                     String props = .replaceAll();
                     // get workspace container
                     WorkspaceContainer wContainer =
                        (WorkspaceContainer)jcrRepository.getSystemSession(workspaces[wIndex]).getContainer();
                     String uniqueNoame = jcrRepository.getName() + "_" + workspaces[wIndex];
                     if ( != null && "true".equals())
                        uniqueNoame = "Test_Channel234";
                     ReplicationChannelManager channelManager =
                        new ReplicationChannelManager(props + (.equals("") ? "" : "_")
                           + uniqueNoame);
                     WorkspaceContainerFacade wsFacade = jcrRepository.getWorkspaceContainer(workspaces[wIndex]);
                     WorkspaceEntry wconf = (WorkspaceEntry)wsFacade.getComponent(WorkspaceEntry.class);
                     int maxBufferSize =
                        wconf.getContainer().getParameterInteger(.,
                           .);
                     FileCleanerHolder wfcleaner =
                        (FileCleanerHolder)wsFacade.getComponent(FileCleanerHolder.class);
                     FileCleaner fileCleaner = wfcleaner.getFileCleaner();
                     // create the RecoveryManager
                     RecoveryManager recoveryManager =
                        new RecoveryManager(dirsystemId,
                           jcrRepository.getName(), workspaces[wIndex], channelManagerfileCleanermaxBufferSize,
                           new ReaderSpoolFileHolder());
                     PersistentDataManager dataManager =
                        (PersistentDataManager)wsFacade.getComponent(PersistentDataManager.class);
                     ConnectionFailDetector failDetector =
                        new ConnectionFailDetector(channelManagerdataManagerrecoveryManager,
                           workspaces[wIndex]);
                     channelManager.addStateListener(failDetector);
                     // add data transmitter
                     wContainer.registerComponentImplementation(WorkspaceDataTransmitter.class);
                     WorkspaceDataTransmitter dataTransmitter =
                        (WorkspaceDataTransmitter)wContainer.getComponentInstanceOfType(WorkspaceDataTransmitter.class);
                     dataTransmitter.init(/* disp */channelManagersystemIdrecoveryManager);
                     // add data receiver
                     AbstractWorkspaceDataReceiver dataReceiver = null;
                     if (.equals())
                     {
                        wContainer.registerComponentImplementation(WorkspaceDataManagerProxy.class);
                        wContainer.registerComponentImplementation(ProxyWorkspaceDataReceiver.class);
                        dataReceiver =
                           (ProxyWorkspaceDataReceiver)wContainer
                              .getComponentInstanceOfType(ProxyWorkspaceDataReceiver.class);
                     }
                     else if (.equals())
                     {
                        wContainer.registerComponentImplementation(PersistentWorkspaceDataReceiver.class);
                        dataReceiver =
                           (PersistentWorkspaceDataReceiver)wContainer
                              .getComponentInstanceOfType(PersistentWorkspaceDataReceiver.class);
                     }
                     recoveryManager.setDataKeeper(dataReceiver.getDataKeeper());
                     dataReceiver.init(channelManagersystemIdrecoveryManager);
                     channelManager.connect();
                     // Register for management
                     if ( != null)
                        .register(recoveryManager);
                     dataReceiver.start();
                  }
                  catch (Exception e)
                  {
                     .error("Can not start replication on " + .get(rIndex) + "_" + workspaces[wIndex]
                        + " \n" + ee);
                  }
            }
            if ()
               for (int wIndex = 0; wIndex < workspaces.lengthwIndex++)
                  .add(initWorkspaceBackup(.get(rIndex), workspaces[wIndex]));
         }
      }
      catch (RepositoryException re)
      {
         .error("Can not start ReplicationService \n" + rere);
      }
      catch (RepositoryConfigurationException e)
      {
         .error("Can not start ReplicationService \n" + ee);
      }
       = true;
   }

   
initWorkspaceBackup. Will be initialized BackupCreator.

Parameters:
repositoryName the name of repository
workspaceName the name of workspace
Returns:
BackupCreator return the BackupCreator
Throws:
RepositoryException will be generated RepositoryException
RepositoryConfigurationException will be generated RepositoryConfigurationException
   private BackupCreator initWorkspaceBackup(String repositoryNameString workspaceNamethrows RepositoryException,
   {
      ManageableRepository manageableRepository = .getRepository(repositoryName);
      BackupCreator backupCreator = new BackupCreator(workspaceNamemanageableRepository);
      return backupCreator;
   }

   
   public void stop()
   {
   }

   
isStarted.

Returns:
boolean return the isStarted
   public boolean isStarted()
   {
      return ;
   }

   
Write parameters to RegistryService..

Parameters:
sessionProvider SessionProvider, the session provider
Throws:
ParserConfigurationException will be generate the exception ParserConfigurationException
RepositoryException will be generate the exception RepositoryException
   private void writeParamsToRegistryService(SessionProvider sessionProviderthrows ParserConfigurationException,
   {
      Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument();
      Element root = doc.createElement();
      doc.appendChild(root);
      StringBuilder reps = new StringBuilder();
      for (String rep : )
      {
         reps.append(rep).append(";");
      }
      Element element = doc.createElement("repositories");
      setAttributeSmart(element"repositories"reps.toString());
      root.appendChild(element);
      element = doc.createElement("replication-properties");
      setAttributeSmart(element"test-mode");
      setAttributeSmart(element"enabled");
      setAttributeSmart(element"mode");
      setAttributeSmart(element"bind-ip-address");
      setAttributeSmart(element"channel-config");
      setAttributeSmart(element"channel-name");
      setAttributeSmart(element"recovery-dir");
      setAttributeSmart(element"node-name");
      setAttributeSmart(element"other-participants");
      setAttributeSmart(element"wait-confirmation");
      root.appendChild(element);
      element = doc.createElement("replication-snapshot-properties");
      setAttributeSmart(element"snapshot-enabled");
      setAttributeSmart(element"snapshot-enabled");
      setAttributeSmart(element"snapshot-enabled");
      root.appendChild(element);
      element = doc.createElement("replication-priority-properties");
      setAttributeSmart(element"priority-type");
      setAttributeSmart(element"node-priority");
      root.appendChild(element);
      RegistryEntry serviceEntry = new RegistryEntry(doc);
      .createEntry(sessionProvider.serviceEntry);
   }

   
Read parameters to RegistryService.

Parameters:
sessionProvider SessionProvider, the session provider
Throws:
RepositoryException will be generate the exception RepositoryException
   private void readParamsFromRegistryService(SessionProvider sessionProviderthrows RepositoryException
   {
      // initialize repositories
      String entryPath = . + "/" +  + "/" + "repositories";
      RegistryEntry entry = .getEntry(sessionProviderentryPath);
      Document doc = entry.getDocument();
      Element element = doc.getDocumentElement();
      String repositories = getAttributeSmart(element"repositories");
       = new ArrayList<String>();
      String reps[] = repositories.split(";");
      for (String rep : reps)
      {
         if (!rep.equals(""))
         {
            .add(rep);
         }
      }
      // initialize replication params;
      entryPath = . + "/" +  + "/" + "replication-properties";
      entry = .getEntry(sessionProviderentryPath);
      doc = entry.getDocument();
      element = doc.getDocumentElement();
       = getAttributeSmart(element"test-mode");
       = getAttributeSmart(element"enabled");
       = getAttributeSmart(element"mode");
       = getAttributeSmart(element"bind-ip-address");
       = getAttributeSmart(element"channel-config");
       = getAttributeSmart(element"channel-name");
       = getAttributeSmart(element"recovery-dir");
       = getAttributeSmart(element"node-name");
       = getAttributeSmart(element"other-participants");
       = getAttributeSmart(element"wait-confirmation");
      // initialize snapshot params;
      entryPath = . + "/" +  + "/" + "replication-snapshot-properties";
      entry = .getEntry(sessionProviderentryPath);
      doc = entry.getDocument();
      element = doc.getDocumentElement();
       = getAttributeSmart(element"snapshot-enabled");
       = getAttributeSmart(element"snapshot-dir");
       = getAttributeSmart(element"delay-time");
      // initialize priority params;
      entryPath = . + "/" +  + "/" + "replication-priority-properties";
      entry = .getEntry(sessionProviderentryPath);
      doc = entry.getDocument();
      element = doc.getDocumentElement();
       = getAttributeSmart(element"priority-type");
       = getAttributeSmart(element"node-priority");
      .info("Params is read from RegistryService");
      checkParams();
   }

   
Get attribute value.

Parameters:
element The element to get attribute value
attr The attribute name
Returns:
Value of attribute if present and null in other case
   private String getAttributeSmart(Element elementString attr)
   {
      return element.hasAttribute(attr) ? element.getAttribute(attr) : null;
   }

   
Set attribute value. If value is null the attribute will be removed.

Parameters:
element The element to set attribute value
attr The attribute name
value The value of attribute
   private void setAttributeSmart(Element elementString attrString value)
   {
      if (value == null)
      {
         element.removeAttribute(attr);
      }
      else
      {
         element.setAttribute(attrvalue);
      }
   }

   
Read parameters from file.

Throws:
RepositoryConfigurationException
   private void readParamsFromFile()
   {
      PropertiesParam pps = .getPropertiesParam("replication-properties");
      // initialize replication params;
       = pps.getProperty("test-mode");
       = pps.getProperty("enabled");
       = pps.getProperty("mode");
       = pps.getProperty("bind-ip-address");
       = pps.getProperty("channel-config");
       = pps.getProperty("channel-name");
       = pps.getProperty("recovery-dir");
       = pps.getProperty("node-name");
       = pps.getProperty("other-participants");
       = pps.getProperty("wait-confirmation");
      // initialize repositories
      ValuesParam vp = .getValuesParam("repositories");
       = vp.getValues();
      if (vp == null || vp.getValues().size() == 0)
         throw new RuntimeException("repositories not specified");
      // initialize snapshot params;
      PropertiesParam backuParams = .getPropertiesParam("replication-snapshot-properties");
      if (backuParams != null)
      {
          = backuParams.getProperty("snapshot-enabled");
          = backuParams.getProperty("snapshot-dir");
          = backuParams.getProperty("delay-time");
      }
      else
      {
          = false;
      }
      // initialize priority params;
      PropertiesParam priorityParams = .getPropertiesParam("replication-priority-properties");
      if (priorityParams != null)
      {
          = priorityParams.getProperty("priority-type");
          = priorityParams.getProperty("node-priority");
      }
      .info("Params is read from configuration file");
      checkParams();
   }

   
Check read params and initialize.

Throws:
RepositoryConfigurationException
   private void checkParams()
   {
      // replication params;
      if ( == null)
         throw new RuntimeException("enabled not specified");
      if ( == null)
         throw new RuntimeException("mode not specified");
      else if (!.equals() && !.equals())
         throw new RuntimeException("Parameter 'mode' (persistent|proxy) required for replication configuration");
      if ( == null)
         throw new RuntimeException("bind-ip-address not specified");
      if ( == null)
         throw new RuntimeException("channel-config not specified");
      if ( == null)
          = "";
      if ( != null && "true".equals())
          = IdGenerator.generate();
      if ( == null)
         throw new RuntimeException("Recovery dir not specified");
       = new File();
      if (!PrivilegedFileHelper.exists())
      {
         PrivilegedFileHelper.mkdirs();
      }
      if (.equals())
      {
         if ( == null)
            throw new RuntimeException("Node name not specified");
         if ( == null)
            throw new RuntimeException("Other participants not specified");
          = new ArrayList<String>();
         String[] pc = .split(";");
         for (int i = 0; i < pc.lengthi++)
            if (!pc[i].equals(""))
               .add(pc[i]);
      }
      else
      {
         // for PROXY mode :
         boolean isMPing = isMPingConfigured();
         boolean isTCPPing = isTCPPingConfigured();
         if (!(isMPing | isTCPPing))
            throw new RuntimeException("The discovery protocol should be configured MPING or TCPPING protocol.");
         if ( == null && isMPing)
            throw new RuntimeException("Node name not specified");
         if ( == null && isMPing)
            throw new RuntimeException("Other participants not specified");
          = new ArrayList<String>();
         if (isMPing)
         {
            String[] pc = .split(";");
            for (int i = 0; i < pc.lengthi++)
               if (!pc[i].equals(""))
                  .add(pc[i]);
         }
         else
         {
            // node-name == binf-ip-address
            // other-participants == initial_hosts
            List<StringinitialHosts = getInitialHosts();
            if ( != null)
               .warn("The perameter 'other-participants' not use for TCPPING.");
            if ( != null)
               .warn("The perameter 'node-name' not use for TCPPING.");
            for (String host : initialHosts)
               if (!host.equals())
                  .add(host);
             = ;
         }
      }
      if ( == null)
         throw new RuntimeException("Wait confirmation not specified");
      // snapshot params;
       = ( == null ? false : Boolean.valueOf());
      if ()
      {
         if ( == null && )
            throw new RuntimeException("Backup dir not specified");
         else if ()
         {
             = new File();
            if (!PrivilegedFileHelper.exists())
            {
               PrivilegedFileHelper.mkdirs();
            }
         }
         if ( == null && )
            throw new RuntimeException("Backup dir not specified");
         else if ()
             = Long.parseLong();
          = new ArrayList<BackupCreator>();
      }
      // priority params;
      if (.equals())
      {
         if ( == null)
            throw new RuntimeException("Priority type not specified");
            throw new RuntimeException(
               "Parameter 'priority-type' (static|dynamic) required for replication configuration");
         if ( == null)
            throw new RuntimeException("Own Priority not specified");
          = Integer.valueOf();
      }
      else
      {
         if ( != null && !.equals())
            .warn("The parameter 'replication-priority-properties' not use for proxy replication.");
          = ;
      }
   }

   
getInitialHosts.

Returns:
List return list of initial hosts.
   private List<StringgetInitialHosts()
   {
      JChannel jChannel = null;
      try
      {
         jChannel = new JChannel(.replaceAll());
      }
      catch (ChannelException e)
      {
         throw new RuntimeException("Can not initialize the JChannel form 'channel-config'."e);
      }
      String initial_hosts = null;
      for (Protocol p : jChannel.getProtocolStack().getProtocols())
      {
         if (p.getName().equals("TCPPING"))
         {
            Properties props = p.getProperties();
            initial_hosts = props.getProperty("initial_hosts");
         }
      }
      if (initial_hosts == null)
         throw new RuntimeException("The propery 'initial_hosts' not specified in TCPPING ");
      List<StringinitialHosts = new ArrayList<String>();
      for (String host : initial_hosts.split(","))
         initialHosts.add(host.substring(0, host.indexOf("[")));
      return initialHosts;
   }

   
isTCPPingConfigured.

Returns:
boolean return 'true' if configured TCPPING.
   private boolean isTCPPingConfigured()
   {
      return .contains("TCPPING");
   }

   
isMPingConfigured.

Returns:
boolean return 'true' if configured MPING.
   private boolean isMPingConfigured()
   {
      return .contains("MPING");
   }

   
   public void setContext(ManagementContext context)
   {
      this. = context;
   }
New to GrepCode? Check out our FAQ X