Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.fasterxml.clustermate.dw;
  
  import java.util.List;
  
 
 
 
 
 public abstract class DWBasedService<
     K extends EntryKey,
     E extends StoredEntry<K>,
     SCONFIG extends ServiceConfig,
     CONF extends DWConfigBase<SCONFIG, CONF>
 >
     extends Service<CONF>
     implements Managed
 {
     private final Logger LOG = LoggerFactory.getLogger(getClass());

    
List of com.fasterxml.clustermate.service.VManaged objects we will dispatch start/stop calls to.
 
     protected List<VManaged_managed = Collections.emptyList();
    
    
Marker flag used to indicate cases when service is run in test mode; propagated through configuration.
 
     protected final boolean _testMode;

    
We need to keep track of things created, to let tests access the information
 
     protected SharedServiceStuff _serviceStuff;
    
    
Container for various stores we use for data, metadata.
 
     protected StoresImpl<K,E> _stores;
    
    
This object is needed to allow test code to work around usual waiting time restrictions.
 
     protected final TimeMaster _timeMaster;

    
And we better hang on to cluster view as well
 
     protected ClusterViewByServerUpdatable _cluster;
    
    
Manager object that deals with data expiration and related clean up tasks.
 
     protected CleanerUpper<K,E> _cleanerUpper;
 
    /*
    /**********************************************************************
    /* Handlers
    /**********************************************************************
     */
    
    protected StoreHandler<K,E> _storeHandler;
    
    /*
    /**********************************************************************
    /* Construction
    /**********************************************************************
     */
    protected DWBasedService(String nameTimeMaster timings)
    {
        this(nametimingsfalse);
    }
    protected DWBasedService(String nameTimeMaster timingsboolean testMode)
    {
        super(name);
         = timings;
         = testMode;
        // Static stuff from under /html (except for root  level things
        // like /index.html that need special handling)
        addBundle(new AssetsBundle("/html"));
    }
    /*
    /**********************************************************************
    /* Life-cycle
    /**********************************************************************
     */
    @Override
    protected void initialize(CONF dwConfig,
            Environment environmentthrows IOException
    {
        // first things first: we need to get start()/stop() calls, so:
        environment.manage(this);
        final SCONFIG config = dwConfig.getServiceConfig();
        
         = new ArrayList<VManaged>();
        StoredEntryConverter<K,E> entryConverter = constructEntryConverter(configenvironment);
        FileManager files = constructFileManager(config);
         = constructServiceStuff(config,
               entryConverterfiles);
        if () {
            .markAsTest();
        }
        
        /* Let's try opening up BDB-JE: must have been created,
         * and have tables we expect; otherwise we'll fail right away.
         */
        .info("Trying open Stores (StorableStore, node store, last-access store)");
          = _constructStores();
         .add();
        .info("Opened Stores successfully");
        .initAndOpen(false);
        // Then: read in cluster information (config file, BDB):
        final int port = dwConfig.getHttpConfiguration().getPort();
        .info("Initializing cluster configuration (port {})..."port);
        final long startTime = .currentTimeMillis();
        ClusterViewByServerImpl<K,E> cl = new ClusterBootstrapper<K,E>(startTime)
                .bootstrap(port);
         = cl;
        .add();
     
        .info("Cluster configuration setup complete, with {} nodes".size());
        
        // Index page must be done via resource, otherwise will conflict with DW/JAX-RS Servlet:
        environment.addResource(new IndexResource(loadResource("/index.html"), loadResource("/favicon.jpg")));
        // Let's first construct handlers we use:
        .info("Creating handlers for service endpoints");
        SyncHandler<K,E> syncH = constructSyncHandler();
        
        .info("Adding service end points");
        addServiceEndpoints(environment,
                nodeHsyncH);
        .info("Adding health checks");
        addHealthChecks(environment);
        .info("Initializing background cleaner tasks");
         = new CleanerUpper<K,E>();
        .add();
        
        .info("Initialization complete: HTTP service now running on port {}",
                dwConfig.getHttpConfiguration().getPort());
    }
    /*
    /**********************************************************************
    /* Factory methods: basic config objects
    /**********************************************************************
     */
    
    
Overridable method that is used for getting helper object used for constructing com.fasterxml.clustermate.service.store.StoredEntry instances to store in the entry metadata store (currently BDB). Default implementation delegates to VagabondDWConfig, to allow for configuration overrides
    protected abstract StoredEntryConverter<K,E> constructEntryConverter(SCONFIG config,
            Environment environment);
    protected abstract FileManager constructFileManager(SCONFIG serviceConfig);
    protected abstract StoresImpl<K,E> constructStores(SharedServiceStuff stuff,
            SCONFIG serviceConfigStorableStore store);    
    protected abstract SharedServiceStuff constructServiceStuff(SCONFIG serviceConfig,
            TimeMaster timeMasterStoredEntryConverter<K,E> entryConverter,
            FileManager files);
    
    /*
    /**********************************************************************
    /* Factory methods for constructing handlers
    /**********************************************************************
     */
    protected abstract StoreHandler<K,E> constructStoreHandler(SharedServiceStuff serviceStuff,
            Stores<K,E> stores);
    protected SyncHandler<K,E> constructSyncHandler(SharedServiceStuff stuff,
            StoresImpl<K,E> storesClusterViewByServerUpdatable cluster)
    {
        return new SyncHandler<K,E>(stuffstorescluster);
    }
            ClusterViewByServer cluster)
    {
        return new ClusterInfoHandler(stuffcluster);
    }
    /*
    /**********************************************************************
    /* Factory methods: servlets
    /**********************************************************************
     */
    protected abstract StoreEntryServlet<K,E> constructStoreEntryServlet(SharedServiceStuff serviceStuff,
            ClusterViewByServer clusterStoreHandler<K,E> storeHandler);
        return new NodeStatusServlet(nodeHandler);
    }
            ClusterViewByServer clusterSyncHandler<K,E> syncHandler) {
        return new SyncListServlet<K,E>(stuffclustersyncHandler);
    }
            ClusterViewByServer clusterSyncHandler<K,E> syncHandler) {
        return new SyncPullServlet<K,E>(stuffclustersyncHandler);
    }
    
    /*
    /**********************************************************************
    /* Methods for service end point additions
    /**********************************************************************
     */

    
Method called to create service endpoints, given set of handlers.
    protected void addServiceEndpoints(SharedServiceStuff stuff,
            Environment environment,
            ClusterInfoHandler nodeHandlerSyncHandler<K,E> syncHandler,
            StoreHandler<K,E> storeHandler)
    {
        final ClusterViewByServer cluster = syncHandler.getCluster();
        NodeStatusServlet nodeStatusServlet = constructNodeStatusServlet(nodeHandler);
        SyncListServlet<K,E> syncListServlet = constructSyncListServlet(
                stuffclustersyncHandler);
        SyncPullServlet<K,E> syncPullServlet = constructSyncPullServlet(
                stuffclustersyncHandler);
        StoreEntryServlet<K,E> storeEntryServlet = constructStoreEntryServlet(stuff,
                clusterstoreHandler);
        
        ServiceDispatchServlet<K,E> dispatcher = new ServiceDispatchServlet<K,E>(
                clusterstuff,
                nodeStatusServletstoreEntryServletnull,
                syncListServletsyncPullServlet);
        RequestPathBuilder rootBuilder = rootPath(stuff.getServiceConfig());
        String rootPath = servletPath(rootBuilder);
        .info("Registering main Dispatcher servlet at: {}"rootPath);
        environment.addServlet(dispatcherrootPath);
        // // And finally servlet for for entry access
        
        addStoreEntryServlet(stuffenvironmentcluster);
    }
    /*
     * Old single-Servlet implementation:
     */
    /*
    {
        final ServiceConfig config = stuff.getServiceConfig();
        final ClusterViewByServer cluster = syncHandler.getCluster();
        // All paths are dynamic, so we need a mapper:
        RequestPathStrategy pathStrategy = stuff.getPathStrategy();
        RequestPathBuilder pathBuilder;
        pathBuilder = pathStrategy.appendNodeStatusPath(rootPath(config));
        // // And then start by adding cluster-info (aka node status)
        // // and sync end points as plain servlets
        
        NodeStatusServlet nsServlet = constructNodeStatusServlet(nodeHandler);
        if (nsServlet != null) {
            environment.addServlet(nsServlet, servletPath(pathBuilder));
        }
    
        pathBuilder = pathStrategy.appendSyncListPath(rootPath(config));
        environment.addServlet(new SyncListServlet<K,E>(stuff, cluster, syncHandler),
                servletPath(pathBuilder));
        pathBuilder = pathStrategy.appendSyncPullPath(rootPath(config));
        environment.addServlet(new SyncPullServlet<K,E>(stuff, cluster, syncHandler),
                servletPath(pathBuilder));
        // // And finally servlet for for entry access
        
        addStoreEntryServlet(stuff, environment, pathStrategy, cluster, _storeHandler);
    }
    */
    
    protected void addStoreEntryServlet(SharedServiceStuff stuff,
            Environment environment,
            ClusterViewByServer cluster,
            StoreHandler<K,E> storeHandler)
    {
    }
    
    protected void addHealthChecks(SharedServiceStuff stuff,
            Environment environment) { }
    
    /*
    /**********************************************************************
    /* Internal methods
    /**********************************************************************
     */
    
    protected StoresImpl<K,E> _constructStores(SharedServiceStuff stuff)
        throws IOException
    {
        final SCONFIG v = stuff.getServiceConfig();
        StoreBackendBuilder<?> b = v.instantiateBackendBuilder();
        StoreBackendConfig backendConfig = v._storeBackendConfigOverride;
        if (backendConfig == null) { // no overrides, use databinding
            Class<? extends StoreBackendConfigcfgType = b.getConfigClass();
            if (v.storeBackendConfig == null) {
                throw new IllegalStateException("Missing 'VagabondServiceConfig.storeBackendConfig");
            }
            backendConfig = stuff.convertValue(v.storeBackendConfigcfgType);
        }
        StoreBackend backend = b.with(v.storeConfig)
                .with(backendConfig)
                .build();
        StorableStore store = new StorableStoreImpl(v.storeConfigbackend,
               stuff.getFileManager());
        return constructStores(stuffvstore);
    }
    @Override
    public void start() throws Exception
    {
        .info("Starting up {} VManaged objects".size());
        for (VManaged managed : ) {
            .info("Starting up: {}"managed.getClass().getName());
            managed.start();
        }
        // TODO Auto-generated method stub
        .info("VManaged object startup complete");
        
    }
    @Override
    public void stop() throws Exception
    {
        int count = .size();
        .info("Stopping {} VManaged objects".size());
        while (--count >= 0) {
            VManaged managed = .remove(count);
            String desc = managed.getClass().getName();
            try {
                .info("Stopping: {}"desc);
                managed.stop();
            } catch (Exception e) {
                .warn(String.format("Problems trying to stop VManaged of type %s: (%s) %s",
                        desce.getClass().getName(), e.getMessage()),
                        e);
            }
        }
        .info("VManaged object shutdown complete");
    }
    /*
    /**********************************************************************
    /* Accessors for tests
    /**********************************************************************
     */
    public TimeMaster getTimeMaster() {
        return ;
    }
    
    /*
    /**********************************************************************
    /* Helper methods
    /**********************************************************************
     */
    protected RequestPathBuilder rootPath(ServiceConfig config)
    {
        return new JdkHttpClientPathBuilder("localhost")
            .addPathSegments(config.servicePathRoot);
    }

    
Helper method for constructing Servlet registration path, given a basic end point path definition. Currently just verifies prefix and suffix slashes and adds '*' as necessary.
    protected String servletPath(RequestPathBuilder pathBuilder)
    {
        String base = pathBuilder.getPath();
        if (!base.endsWith("*")) {
            if (base.endsWith("/")) {
                base += "*";
            } else {
                base += "/*";
            }
        }
        if (!base.startsWith("/")) {
            base = "/"+base;
        }
        return base;
    }
    protected byte[] loadResource(String refthrows IOException
    {
        ByteArrayOutputStream bytes = new ByteArrayOutputStream(4000);
        InputStream in = getClass().getResourceAsStream(ref);
        byte[] buffer = new byte[4000];
        int count;
     
        while ((count = in.read(buffer)) > 0) {
            bytes.write(buffer, 0, count);
        }
        in.close();
        byte[] data = bytes.toByteArray();
        if (data.length == 0) {
            String msg = "Could not find resource '"+ref+"'";
            .error(msg);
            throw new IllegalArgumentException(msg);
        }
        return bytes.toByteArray();
    }
New to GrepCode? Check out our FAQ X