Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package org.apache.helix.controller.restlet;
  
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 
 import java.util.List;
 
 import  org.restlet.Component;
 import  org.restlet.Context;
 import  org.restlet.data.Protocol;

Controller side restlet server that receives ZNRecordUpdate requests from clients, and batch the ZNRecordUpdate and apply them to zookeeper. This is to optimize the concurrency level of zookeeper access for ZNRecord updates that does not require real-time, like message handling status updates and healthcheck reports. As one server will be used by multiple helix controllers that runs on the same machine, This class is designed as a singleton. Application is responsible to call init() and shutdown() on the getInstance().
 
 public class ZKPropertyTransferServer {
   public static final String PORT = "port";
   public static String RESTRESOURCENAME = "ZNRecordUpdates";
   public static final String SERVER = "ZKPropertyTransferServer";
 
   // Frequency period for the ZNRecords are batch written to zookeeper
   public static int PERIOD = 10 * 1000;
   // If the buffered ZNRecord updates exceed the limit, do a zookeeper batch update.
   public static int MAX_UPDATE_LIMIT = 10000;
   private static Logger LOG = Logger.getLogger(ZKPropertyTransferServer.class);
 
 
 
   boolean _initialized = false;
   boolean _shutdownFlag = false;
   Component _component = null;
   Timer _timer = null;
 
   static {
     org.restlet.engine.Engine.setLogLevel(.);
   }

  
Timertask for zookeeper batched writes
 
   class ZKPropertyTransferTask extends TimerTask {
     @Override
     public void run() {
       try {
         sendData();
       } catch (Throwable t) {
         .error(""t);
       }
 
     }
   }
 
   void sendData() {
     .info("ZKPropertyTransferServer transfering data to zookeeper");
     ConcurrentHashMap<StringZNRecordUpdateupdateCache = null;
 
     synchronized () {
      updateCache = .getAndSet(new ConcurrentHashMap<StringZNRecordUpdate>());
    }
    if (updateCache != null) {
      List<Stringpaths = new ArrayList<String>();
      List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
      List<ZNRecordvals = new ArrayList<ZNRecord>();
      // BUGBUG : what if the instance is dropped?
      for (ZNRecordUpdate holder : updateCache.values()) {
        paths.add(holder.getPath());
        updaters.add(holder.getZNRecordUpdater());
        vals.add(holder.getRecord());
      }
      // Batch write the accumulated updates into zookeeper
      long timeStart = System.currentTimeMillis();
      if (paths.size() > 0) {
        .updateChildren(pathsupdaters.);
      }
      .info("ZKPropertyTransferServer updated " + vals.size() + " records in "
          + (System.currentTimeMillis() - timeStart) + " ms");
    } else {
      .warn("null _dataQueueRef. Should be in the beginning only");
    }
  }
  private ZKPropertyTransferServer() {
  }
  public static ZKPropertyTransferServer getInstance() {
    return ;
  }
  public boolean isInitialized() {
    return ;
  }
  public void init(int localWebservicePortString zkAddress) {
    if (! && !) {
      .error("Initializing with port " + localWebservicePort + " zkAddress: " + zkAddress);
       = localWebservicePort;
      ZkClient zkClient = new ZkClient(zkAddress);
      zkClient.setZkSerializer(new ZNRecordSerializer());
       = new ZkBaseDataAccessor<ZNRecord>(zkClient);
       = zkAddress;
      startServer();
    } else {
      .error("Already initialized with port " +  + " shutdownFlag: "
          + );
    }
  }
  public String getWebserviceUrl() {
    if (! || ) {
      .debug("inited:" +  + " shutdownFlag:" +  + " , return");
      return null;
    }
    return ;
  }

  
Add an ZNRecordUpdate to the change queue. Called by the webservice front-end.
    if (! || ) {
      .error("zkDataTransferServer inited:" +  + " shutdownFlag:" + 
          + " , return");
      return;
    }
    // Do local merge if receive multiple update on the same path
    synchronized () {
      if (.get().containsKey(e.getPath())) {
        ZNRecord oldVal = .get().get(e.getPath()).getRecord();
        oldVal = e.getZNRecordUpdater().update(oldVal);
        .get().get(e.getPath()). = oldVal;
      } else {
        .get().put(e.getPath(), e);
      }
    }
    if (.get().size() > ) {
      sendData();
    }
  }
  void startServer() {
    .info("zkDataTransferServer starting on Port " +  + " zkAddress "
        + );
     = new Component();
    .getServers().add(Protocol.HTTP, );
    Context applicationContext = .getContext().createChildContext();
    applicationContext.getAttributes().put(this);
    applicationContext.getAttributes().put("" + );
    ZkPropertyTransferApplication application =
        new ZkPropertyTransferApplication(applicationContext);
    // Attach the application to the component and start it
    .getDefaultHost().attach(application);
     = new Timer(true);
    try {
       =
          "http://" + InetAddress.getLocalHost().getCanonicalHostName() + ":"
              +  + "/" + ;
      .start();
       = true;
    } catch (Exception e) {
      .error(""e);
    }
    .info("zkDataTransferServer started on Port " +  + " zkAddress "
        + );
  }
  public void shutdown() {
    if () {
      .error("ZKPropertyTransferServer already has been shutdown...");
      return;
    }
    .info("zkDataTransferServer shuting down on Port " +  + " zkAddress "
        + );
    if ( != null) {
      .cancel();
    }
    if ( != null) {
      try {
        .stop();
      } catch (Exception e) {
        .error(""e);
      }
    }
     = true;
  }
  public void reset() {
    if ( == true) {
       = false;
       = false;
       = null;
       = null;
    }
  }
New to GrepCode? Check out our FAQ X