Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.fasterxml.clustermate.dw;
  
  import java.io.*;
  import java.util.*;
  
  import org.slf4j.Logger;
  
 
 
 
 public abstract class DWBasedService<
     K extends EntryKey,
     E extends StoredEntry<K>,
     L extends ListItem,
     SCONFIG extends ServiceConfig,
     CONF extends DWConfigBase<SCONFIG, CONF>
 >
     extends Service<CONF>
 {
     private final Logger LOG = LoggerFactory.getLogger(getClass());

    
List of com.fasterxml.clustermate.service.StartAndStoppable objects we will dispatch start/stop calls to.
 
     protected List<StartAndStoppable_managed = null;
    
    
Marker flag used to indicate cases when service is run in test mode; propagated through configuration.
 
     protected final boolean _testMode;

    
We need to keep track of things created, to let tests access the information
 
     protected SharedServiceStuff _serviceStuff;
    
    
Container for various stores we use for data, metadata.
 
     protected StoresImpl<K,E> _stores;
    
    
This object is needed to allow test code to work around usual waiting time restrictions.
 
     protected final TimeMaster _timeMaster;

    
And we better hang on to cluster view as well
 
     protected ClusterViewByServerUpdatable _cluster;
    
    
Manager object that deals with data expiration and related clean up tasks.
 
     protected CleanerUpper<K,E> _cleanerUpper;
 
     /*
     /**********************************************************************
     /* Handlers
     /**********************************************************************
      */
     
     protected StoreHandler<K,E,L> _storeHandler;
     
     /*
    /**********************************************************************
    /* Construction
    /**********************************************************************
     */
    protected DWBasedService(TimeMaster timings)
    {
        this(timingsfalse);
    }
    protected DWBasedService(TimeMaster timingsboolean testMode)
    {
        super();
         = timings;
         = testMode;
    }
    /*
    /**********************************************************************
    /* Life-cycle
    /**********************************************************************
     */
    @Override
    public void initialize(Bootstrap<CONF> bootstrap) {
        // Static stuff from under /html (except for root  level things
        // like /index.html that need special handling)
        bootstrap.addBundle(new AssetsBundle("/html"));
    }
    public void _start() throws Exception
    {
        .info("Starting up {} VManaged objects".size());
        for (StartAndStoppable managed : ) {
            .info("Starting up: {}"managed.getClass().getName());
            managed.start();
        }
        .info("VManaged object startup complete");
        /* 27-Mar-2013, tatu: Also: need to register shutdown hook to be
         *    able to do 'prepareForStop()'
         */
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                _prepareForStop();
            }
        });
    }
    public void _prepareForStop()
    {
        .info("Calling prepareForStop on {} VManaged objects".size());
        for (StartAndStoppable managed : ) {
            try {
                managed.prepareForStop();
            } catch (Exception e) {
                .warn("Problems with prepareForStop on {}: {}"managed.getClass().getName(),
                        e.getMessage());
            }
        }
        .info("prepareForStop() for managed objects complete");
    }
    
    public void _stop() throws Exception
    {
        int count = .size();
        .info("Stopping {} VManaged objects"count);
        while (--count >= 0) {
            StartAndStoppable managed = .remove(count);
            String desc = managed.getClass().getName();
            try {
                .info("Stopping: {}"desc);
                managed.stop();
            } catch (Exception e) {
                .warn(String.format("Problems trying to stop VManaged of type %s: (%s) %s",
                        desce.getClass().getName(), e.getMessage()),
                        e);
            }
        }
        .info("VManaged object shutdown complete");
    }
    @Override
    public void run(CONF dwConfigEnvironment environmentthrows IOException
    {
        // first things first: we need to get start()/stop() calls, so:
        environment.manage(new Managed() {
            @Override
            public void start() throws Exception {
                _start();
            }
            @Override
            public void stop() throws Exception {
                _stop();
            }
        });
        final SCONFIG config = dwConfig.getServiceConfig();
        
         = new ArrayList<StartAndStoppable>();
        StoredEntryConverter<K,E,L> entryConverter = constructEntryConverter(configenvironment);
        FileManager files = constructFileManager(config);
         = constructServiceStuff(config,
               entryConverterfiles);
        if () {
            .markAsTest();
        }
        
        /* Let's try opening up StorableStore: must have been created,
         * and have tables we expect; otherwise we'll fail right away.
         */
        .info("Trying open Stores (StorableStore, node store, last-access store)");
          = _constructStores();
         .add();
        .info("Opened StorableStore successfully");
        .initAndOpen(false);
        // Then: read in cluster information (config file, backend store settings):
        final int port = dwConfig.getHttpConfiguration().getPort();
        .info("Initializing cluster configuration (port {})..."port);
        final long startTime = .currentTimeMillis();
        ClusterViewByServerImpl<K,E> cl = new ClusterBootstrapper<K,E>(startTime)
                .bootstrap(port);
         = cl;
        .add();
     
        .info("Cluster configuration setup complete, with {} nodes".size());
        
        // Index page must be done via resource, otherwise will conflict with DW/JAX-RS Servlet:
        environment.addResource(new IndexResource(loadResource("/index.html"), loadResource("/favicon.jpg")));
        // Let's first construct handlers we use:
        .info("Creating handlers for service endpoints");
        SyncHandler<K,E> syncH = constructSyncHandler();
        .add();
        .info("Adding service end points");
        addServiceEndpoints(environment,
                nodeHsyncH);
        .info("Adding health checks");
        addHealthChecks(environment);
        .info("Initializing background cleaner tasks");
        if ( != null) {
            .add();
        }
        .info("Initialization complete: HTTP service now running on port {}",
                dwConfig.getHttpConfiguration().getPort());
    }
    /*
    /**********************************************************************
    /* Factory methods: basic config objects
    /**********************************************************************
     */
    
    
Overridable method that is used for getting helper object used for constructing com.fasterxml.clustermate.service.store.StoredEntry instances to store in the entry metadata store.
    protected abstract StoredEntryConverter<K,E,L> constructEntryConverter(SCONFIG config,
            Environment environment);
    protected abstract FileManager constructFileManager(SCONFIG serviceConfig);
    protected abstract StoresImpl<K,E> constructStores(SharedServiceStuff stuff,
            SCONFIG serviceConfigStorableStore store);    
    protected abstract SharedServiceStuff constructServiceStuff(SCONFIG serviceConfig,
            TimeMaster timeMasterStoredEntryConverter<K,E,L> entryConverter,
            FileManager files);
    
    /*
    /**********************************************************************
    /* Factory methods for constructing handlers
    /**********************************************************************
     */
    protected abstract StoreHandler<K,E,L> constructStoreHandler(SharedServiceStuff serviceStuff,
            Stores<K,E> storesClusterViewByServer cluster);
    protected SyncHandler<K,E> constructSyncHandler(SharedServiceStuff stuff,
            StoresImpl<K,E> storesClusterViewByServerUpdatable cluster)
    {
        return new SyncHandler<K,E>(stuffstorescluster);
    }
            ClusterViewByServerUpdatable cluster)
    {
        return new ClusterInfoHandler(stuffcluster);
    }
    // since 0.9.6
            Stores<K,E> storesClusterViewByServer cluster)
    {
        return new CleanerUpper<K,E>(,
                constructCleanupTasks());
    }
    // since 0.9.6
    protected abstract List<CleanupTask<?>> constructCleanupTasks();
    /*
    {
        ArrayList<CleanupTask<?>> tasks = new ArrayList<CleanupTask<?>>();
        tasks.add(new LocalEntryCleaner<K,E>());
        tasks.add(new FileCleaner());
        return tasks;
    }
    */
    
    /*
    /**********************************************************************
    /* Factory methods: servlets
    /**********************************************************************
     */
    protected abstract StoreEntryServlet<K,E> constructStoreEntryServlet(SharedServiceStuff stuff,
            ClusterViewByServer clusterStoreHandler<K,E,L> storeHandler);
            ClusterInfoHandler nodeHandler) {
        return new NodeStatusServlet(stuffnodeHandler);
    }
            ClusterViewByServer clusterStores<K,E> stores,
            AllOperationMetrics.Provider[] metrics) {
        return new NodeMetricsServlet(stuffstoresmetrics);
    }
        
            ClusterViewByServer clusterSyncHandler<K,E> syncHandler) {
        return new SyncListServlet<K,E>(stuffclustersyncHandler);
    }
            ClusterViewByServer clusterSyncHandler<K,E> syncHandler) {
        return new SyncPullServlet<K,E>(stuffclustersyncHandler);
    }
            ClusterViewByServer clusterStoreHandler<K,E,L> storeHandler) {
        return new StoreListServlet<K,E>(stuffclusterstoreHandler);
    }
    
    /*
    /**********************************************************************
    /* Methods for service end point additions
    /**********************************************************************
     */

    
Method called to create service endpoints, given set of handlers.
    protected void addServiceEndpoints(SharedServiceStuff stuff,
            Environment environment,
            ClusterInfoHandler nodeHandlerSyncHandler<K,E> syncHandler,
            StoreHandler<K,E,L> storeHandler)
    {
        final ClusterViewByServer cluster = syncHandler.getCluster();
        EnumMap<PathTypeServletBaseservlets = new EnumMap<PathTypeServletBase>(PathType.class);
        servlets.put(.constructNodeStatusServlet(stuffnodeHandler));
        ServletWithMetricsBase syncListServlet = constructSyncListServlet(stuffclustersyncHandler);
        servlets.put(.syncListServlet);
        ServletBase syncPullServlet = constructSyncPullServlet(stuffclustersyncHandler);
        servlets.put(.syncPullServlet);
        StoreEntryServlet<K,E> storeEntryServlet = constructStoreEntryServlet(stuff,
                clusterstoreHandler);
        servlets.put(.storeEntryServlet);
        ServletWithMetricsBase storeListServlet = constructStoreListServlet(stuff,
                clusterstoreHandler);
        servlets.put(.storeListServlet);
        servlets.put(.constructNodeMetricsServlet(stuffcluster,
                storeHandler.getStores(),
                new AllOperationMetrics.Provider[] {
                    storeEntryServletstoreListServletsyncListServlet
                }
        ));
//        servlets.put(PathType.STORE_FIND_ENTRY, );
//        servlets.put(PathType.STORE_FIND_LIST, );
        
        ServiceDispatchServlet<K,E> dispatcher = new ServiceDispatchServlet<K,E>(cluster,
                nullstuffservlets);
        RequestPathBuilder rootBuilder = rootPath(stuff.getServiceConfig());
        String rootPath = servletPath(rootBuilder);
        .info("Registering main Dispatcher servlet at: {}"rootPath);
        environment.addServlet(dispatcherrootPath);
        // // And finally servlet for for entry access
        
        addStoreEntryServlet(stuffenvironmentcluster);
    }
    /*
     * Old single-Servlet implementation:
     */
    /*
    {
        final ServiceConfig config = stuff.getServiceConfig();
        final ClusterViewByServer cluster = syncHandler.getCluster();
        // All paths are dynamic, so we need a mapper:
        RequestPathStrategy pathStrategy = stuff.getPathStrategy();
        RequestPathBuilder pathBuilder;
        pathBuilder = pathStrategy.appendNodeStatusPath(rootPath(config));
        // // And then start by adding cluster-info (aka node status)
        // // and sync end points as plain servlets
        
        NodeStatusServlet nsServlet = constructNodeStatusServlet(nodeHandler);
        if (nsServlet != null) {
            environment.addServlet(nsServlet, servletPath(pathBuilder));
        }
    
        pathBuilder = pathStrategy.appendSyncListPath(rootPath(config));
        environment.addServlet(new SyncListServlet<K,E>(stuff, cluster, syncHandler),
                servletPath(pathBuilder));
        pathBuilder = pathStrategy.appendSyncPullPath(rootPath(config));
        environment.addServlet(new SyncPullServlet<K,E>(stuff, cluster, syncHandler),
                servletPath(pathBuilder));
        // // And finally servlet for for entry access
        
        addStoreEntryServlet(stuff, environment, pathStrategy, cluster, _storeHandler);
    }
    */
    
    
Overridable method used for hooking standard entry access endpoint into alternate location. Usually used for backwards compatibility purposes.
    protected void addStoreEntryServlet(SharedServiceStuff stuff,
            Environment environment,
            ClusterViewByServer cluster,
            StoreHandler<K,E,?> storeHandler)
    {
    }
    
    protected void addHealthChecks(SharedServiceStuff stuff,
            Environment environment) { }
    
    /*
    /**********************************************************************
    /* Internal methods
    /**********************************************************************
     */
    
    protected StoresImpl<K,E> _constructStores(SharedServiceStuff stuff)
        throws IOException
    {
        final SCONFIG v = stuff.getServiceConfig();
        StoreBackendBuilder<?> b = v.instantiateBackendBuilder();
        StoreBackendConfig backendConfig = v._storeBackendConfigOverride;
        if (backendConfig == null) { // no overrides, use databinding
            Class<? extends StoreBackendConfigcfgType = b.getConfigClass();
            if (v.storeBackendConfig == null) {
                throw new IllegalStateException("Missing 'v.storeBackendConfig");
            }
            backendConfig = stuff.convertValue(v.storeBackendConfigcfgType);
        }
        StoreBackend backend = b.with(v.storeConfig)
                .with(backendConfig)
                .build();
        StorableStore store = new StorableStoreImpl(v.storeConfigbackend,
               stuff.getFileManager());
        return constructStores(stuffvstore);
    }
    /*
    /**********************************************************************
    /* Accessors for tests
    /**********************************************************************
     */
    public TimeMaster getTimeMaster() {
        return ;
    }
    
    /*
    /**********************************************************************
    /* Helper methods
    /**********************************************************************
     */
    protected RequestPathBuilder rootPath(ServiceConfig config)
    {
        return new JdkHttpClientPathBuilder("localhost")
            .addPathSegments(config.servicePathRoot);
    }

    
Helper method for constructing Servlet registration path, given a basic end point path definition. Currently just verifies prefix and suffix slashes and adds '*' as necessary.
    protected String servletPath(RequestPathBuilder pathBuilder)
    {
        String base = pathBuilder.getPath();
        if (!base.endsWith("*")) {
            if (base.endsWith("/")) {
                base += "*";
            } else {
                base += "/*";
            }
        }
        if (!base.startsWith("/")) {
            base = "/"+base;
        }
        return base;
    }
    protected byte[] loadResource(String refthrows IOException
    {
        ByteArrayOutputStream bytes = new ByteArrayOutputStream(4000);
        InputStream in = getClass().getResourceAsStream(ref);
        byte[] buffer = new byte[4000];
        int count;
     
        while ((count = in.read(buffer)) > 0) {
            bytes.write(buffer, 0, count);
        }
        in.close();
        byte[] data = bytes.toByteArray();
        if (data.length == 0) {
            String msg = "Could not find resource '"+ref+"'";
            .error(msg);
            throw new IllegalArgumentException(msg);
        }
        return bytes.toByteArray();
    }
New to GrepCode? Check out our FAQ X