Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.fasterxml.clustermate.dw;
  
  import java.io.*;
  import java.util.*;
  
  import org.slf4j.Logger;
  
 
 
 
 public abstract class DWBasedService<
     K extends EntryKey,
     E extends StoredEntry<K>,
     L extends ListItem,
     SCONFIG extends ServiceConfig,
     CONF extends DWConfigBase<SCONFIG, CONF>
 >
     extends Service<CONF>
 {
     private final Logger LOG = LoggerFactory.getLogger(getClass());

    
List of com.fasterxml.clustermate.service.StartAndStoppable objects we will dispatch start/stop calls to.
 
     protected List<StartAndStoppable_managed = null;
    
    
Running mode of this service; used to indicate whether we are running in test mode, and whether background tasks ought to be run or not.
 
     protected final RunMode _runMode;

    
We need to keep track of things created, to let tests access the information
 
     protected SharedServiceStuff _serviceStuff;
    
    
Container for various stores we use for data, metadata.
 
     protected StoresImpl<K,E> _stores;
    
    
This object is needed to allow test code to work around usual waiting time restrictions.
 
     protected final TimeMaster _timeMaster;

    
And we better hang on to cluster view as well
 
     protected ClusterViewByServerUpdatable _cluster;
    
    
Manager object that deals with data expiration and related clean up tasks.
 
     protected CleanerUpper<K,E> _cleanerUpper;
 
     /*
     /**********************************************************************
     /* Handlers
     /**********************************************************************
      */
     
    protected StoreHandler<K,E,L> _storeHandler;
    
    /*
    /**********************************************************************
    /* Construction
    /**********************************************************************
     */
    protected DWBasedService(TimeMaster timingsRunMode mode)
    {
        super();
         = timings;
         = mode;
    }
    /*
    /**********************************************************************
    /* Life-cycle
    /**********************************************************************
     */
    @Override
    public void initialize(Bootstrap<CONF> bootstrap) {
        // Static stuff from under /html (except for root  level things
        // like /index.html that need special handling)
        bootstrap.addBundle(new AssetsBundle("/html"));
    }
    public void _start() throws Exception
    {
        .info("Starting up {} VManaged objects".size());
        for (StartAndStoppable managed : ) {
            .info("Starting up: {}"managed.getClass().getName());
            managed.start();
        }
        .info("VManaged object startup complete");
        /* 27-Mar-2013, tatu: Also: need to register shutdown hook to be
         *    able to do 'prepareForStop()'
         */
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                _prepareForStop();
            }
        });
    }
    public void _prepareForStop()
    {
        .info("Calling prepareForStop on {} VManaged objects".size());
        for (StartAndStoppable managed : ) {
            try {
                managed.prepareForStop();
            } catch (Exception e) {
                .warn("Problems with prepareForStop on {}: {}"managed.getClass().getName(),
                        e.getMessage());
            }
        }
        .info("prepareForStop() for managed objects complete");
    }
    
    public void _stop() throws Exception
    {
        int count = .size();
        .info("Stopping {} VManaged objects"count);
        while (--count >= 0) {
            StartAndStoppable managed = .remove(count);
            String desc = managed.getClass().getName();
            try {
                .info("Stopping: {}"desc);
                managed.stop();
            } catch (Exception e) {
                .warn(String.format("Problems trying to stop VManaged of type %s: (%s) %s",
                        desce.getClass().getName(), e.getMessage()),
                        e);
            }
        }
        .info("VManaged object shutdown complete");
    }
    /* Ideally, shouldn't need to track this; but after having a few issues,
     * decided better safe than sorry.
     */
    protected boolean _hasBeenRun = false;
    
    @Override
    public void run(CONF dwConfigEnvironment environmentthrows IOException
    {
        synchronized (this) {
            if () {
                throw new IllegalStateException("Trying to run(config, env) DWBasedService more than once");
            }
             = true;
        }
        
        // first things first: we need to get start()/stop() calls, so:
        environment.manage(new Managed() {
            @Override
            public void start() throws Exception {
                _start();
            }
            @Override
            public void stop() throws Exception {
                _stop();
            }
        });
        /* 04-Jun-2013, tatu: Goddammit, disabling gzip filter is tricky due to
         *   data-binding... Object-values get re-created. So, need to patch after
         *   the fact. And hope it works...
         *   
         * NOTE: looks like this is too late, and won't have effect. If so, modifying
         * YAML/JSON config is the only way.
         */
        dwConfig.overrideGZIPEnabled(false);
        final SCONFIG config = dwConfig.getServiceConfig();
        
         = new ArrayList<StartAndStoppable>();
        StoredEntryConverter<K,E,L> entryConverter = constructEntryConverter(configenvironment);
        FileManager files = constructFileManager(config);
         = constructServiceStuff(config,
               entryConverterfiles);
        if (.isTesting()) {
            .markAsTest();
        }
        
        /* Let's try opening up StorableStore: must have been created,
         * and have tables we expect; otherwise we'll fail right away.
         */
        .info("Trying to open Stores (StorableStore, node store, last-access store)");
          = _constructStores();
         .add();
        .info("Opened StorableStore successfully");
        .initAndOpen(false);
        // Then: read in cluster information (config file, backend store settings):
        final int port = dwConfig.getHttpConfiguration().getPort();
        .info("Initializing cluster configuration (port {})..."port);
        final long startTime = .currentTimeMillis();
        ClusterViewByServerImpl<K,E> cl = new ClusterBootstrapper<K,E>(startTime)
                .bootstrap(port);
         = cl;
        .add();
     
        .info("Cluster configuration setup complete, with {} nodes".size());
        
        // Index page must be done via resource, otherwise will conflict with DW/JAX-RS Servlet:
        environment.addResource(new IndexResource(loadResource("/index.html"), loadResource("/favicon.jpg")));
        // Let's first construct handlers we use:
        .info("Creating handlers for service endpoints");
        SyncHandler<K,E> syncH = constructSyncHandler();
        .add();
        .info("Adding service end points");
        addServiceEndpoints(environment,
                nodeHsyncH);
        .info("Adding health checks");
        addHealthChecks(environment);
        if (.shouldRunTasks()) {
            .info("Initializing background cleaner tasks");
            if ( != null) {
                .add();
            }
        } else {
            .info("Skipping cleaner tasks for light-weight testing");
        }
        .info("Initialization complete: HTTP service now running on port {}",
                dwConfig.getHttpConfiguration().getPort());
    }
    /*
    /**********************************************************************
    /* Factory methods: basic config objects
    /**********************************************************************
     */
    public boolean isTesting() { return .isTesting(); }
    
    /*
    /**********************************************************************
    /* Factory methods: basic config objects
    /**********************************************************************
     */

    
Overridable method that is used for getting helper object used for constructing com.fasterxml.clustermate.service.store.StoredEntry instances to store in the entry metadata store.
    protected abstract StoredEntryConverter<K,E,L> constructEntryConverter(SCONFIG config,
            Environment environment);
    protected abstract FileManager constructFileManager(SCONFIG serviceConfig);
    protected abstract StoresImpl<K,E> constructStores(SharedServiceStuff stuff,
            SCONFIG serviceConfigStorableStore store);    
    protected abstract SharedServiceStuff constructServiceStuff(SCONFIG serviceConfig,
            TimeMaster timeMasterStoredEntryConverter<K,E,L> entryConverter,
            FileManager files);
    
    /*
    /**********************************************************************
    /* Factory methods for constructing handlers
    /**********************************************************************
     */
    protected abstract StoreHandler<K,E,L> constructStoreHandler(SharedServiceStuff serviceStuff,
            Stores<K,E> storesClusterViewByServer cluster);
    protected SyncHandler<K,E> constructSyncHandler(SharedServiceStuff stuff,
            StoresImpl<K,E> storesClusterViewByServerUpdatable cluster)
    {
        return new SyncHandler<K,E>(stuffstorescluster);
    }
            ClusterViewByServerUpdatable cluster)
    {
        return new ClusterInfoHandler(stuffcluster);
    }
    // since 0.9.6
            Stores<K,E> storesClusterViewByServer cluster)
    {
        return new CleanerUpper<K,E>(,
                constructCleanupTasks());
    }
    // since 0.9.6
    protected abstract List<CleanupTask<?>> constructCleanupTasks();
            ClusterViewByServer clusterStores<K,E> stores,
            AllOperationMetrics.Provider[] metrics) {
        return new BackgroundMetricsAccessor(stuffstoresmetrics);
    }
    /*
    /**********************************************************************
    /* Factory methods: servlets
    /**********************************************************************
     */
    protected abstract StoreEntryServlet<K,E> constructStoreEntryServlet(SharedServiceStuff stuff,
            ClusterViewByServer clusterStoreHandler<K,E,L> storeHandler);
            ClusterInfoHandler nodeHandler) {
        return new NodeStatusServlet(stuffnodeHandler);
    }
            BackgroundMetricsAccessor accessor) {
        return new NodeMetricsServlet(stuffaccessor);
    }
        
            ClusterViewByServer clusterSyncHandler<K,E> syncHandler) {
        return new SyncListServlet<K,E>(stuffclustersyncHandler);
    }
            ClusterViewByServer clusterSyncHandler<K,E> syncHandler) {
        return new SyncPullServlet<K,E>(stuffclustersyncHandler);
    }
            ClusterViewByServer clusterStoreHandler<K,E,L> storeHandler) {
        return new StoreListServlet<K,E>(stuffclusterstoreHandler);
    }
    
    /*
    /**********************************************************************
    /* Methods for service end point additions
    /**********************************************************************
     */

    
Method called to create service endpoints, given set of handlers.
    protected void addServiceEndpoints(SharedServiceStuff stuff,
            Environment environment,
            ClusterInfoHandler nodeHandlerSyncHandler<K,E> syncHandler,
            StoreHandler<K,E,L> storeHandler)
    {
        final ClusterViewByServer cluster = syncHandler.getCluster();
        EnumMap<PathTypeServletBaseservlets = new EnumMap<PathTypeServletBase>(PathType.class);
        servlets.put(.constructNodeStatusServlet(stuffnodeHandler));
        ServletWithMetricsBase syncListServlet = constructSyncListServlet(stuffclustersyncHandler);
        servlets.put(.syncListServlet);
        ServletBase syncPullServlet = constructSyncPullServlet(stuffclustersyncHandler);
        servlets.put(.syncPullServlet);
        StoreEntryServlet<K,E> storeEntryServlet = constructStoreEntryServlet(stuff,
                clusterstoreHandler);
        servlets.put(.storeEntryServlet);
        ServletWithMetricsBase storeListServlet = constructStoreListServlet(stuff,
                clusterstoreHandler);
        servlets.put(.storeListServlet);
        final BackgroundMetricsAccessor metrics = constructMetricsAccessor(stuffcluster,
                storeHandler.getStores(),
                new AllOperationMetrics.Provider[] {
                    storeEntryServletstoreListServletsyncListServlet
                });
        servlets.put(.constructNodeMetricsServlet(stuffmetrics));
//        servlets.put(PathType.STORE_FIND_ENTRY, );
//        servlets.put(PathType.STORE_FIND_LIST, );
        
        ServiceDispatchServlet<K,E> dispatcher = new ServiceDispatchServlet<K,E>(cluster,
                nullstuffservlets);
        RequestPathBuilder rootBuilder = rootPath(stuff.getServiceConfig());
        String rootPath = servletPath(rootBuilder);
        .info("Registering main Dispatcher servlet at: {}"rootPath);
        environment.addServlet(dispatcherrootPath);
        // // And finally servlet for for entry access
        
        addStoreEntryServlet(stuffenvironmentcluster);
    }
    /*
     * Old single-Servlet implementation:
     */
    /*
    {
        final ServiceConfig config = stuff.getServiceConfig();
        final ClusterViewByServer cluster = syncHandler.getCluster();
        // All paths are dynamic, so we need a mapper:
        RequestPathStrategy pathStrategy = stuff.getPathStrategy();
        RequestPathBuilder pathBuilder;
        pathBuilder = pathStrategy.appendNodeStatusPath(rootPath(config));
        // // And then start by adding cluster-info (aka node status)
        // // and sync end points as plain servlets
        
        NodeStatusServlet nsServlet = constructNodeStatusServlet(nodeHandler);
        if (nsServlet != null) {
            environment.addServlet(nsServlet, servletPath(pathBuilder));
        }
    
        pathBuilder = pathStrategy.appendSyncListPath(rootPath(config));
        environment.addServlet(new SyncListServlet<K,E>(stuff, cluster, syncHandler),
                servletPath(pathBuilder));
        pathBuilder = pathStrategy.appendSyncPullPath(rootPath(config));
        environment.addServlet(new SyncPullServlet<K,E>(stuff, cluster, syncHandler),
                servletPath(pathBuilder));
        // // And finally servlet for for entry access
        
        addStoreEntryServlet(stuff, environment, pathStrategy, cluster, _storeHandler);
    }
    */
    
    
Overridable method used for hooking standard entry access endpoint into alternate location. Usually used for backwards compatibility purposes.
    protected void addStoreEntryServlet(SharedServiceStuff stuff,
            Environment environment,
            ClusterViewByServer cluster,
            StoreHandler<K,E,?> storeHandler)
    {
    }
    
    protected void addHealthChecks(SharedServiceStuff stuff,
            Environment environment) { }
    
    /*
    /**********************************************************************
    /* Internal methods
    /**********************************************************************
     */
    
    protected StoresImpl<K,E> _constructStores(SharedServiceStuff stuff)
        throws IOException
    {
        final SCONFIG v = stuff.getServiceConfig();
        StoreBackendBuilder<?> b = v.instantiateBackendBuilder();
        StoreBackendConfig backendConfig = v._storeBackendConfigOverride;
        if (backendConfig == null) { // no overrides, use databinding
            Class<? extends StoreBackendConfigcfgType = b.getConfigClass();
            if (v.storeBackendConfig == null) {
                throw new IllegalStateException("Missing 'v.storeBackendConfig");
            }
            backendConfig = stuff.convertValue(v.storeBackendConfigcfgType);
        }
        StoreBackend backend = b.with(v.storeConfig)
                .with(backendConfig)
                .build();
        StorableStore store = new StorableStoreImpl(v.storeConfigbackend,
               stuff.getFileManager(),
               _constructThrottler(stuff), _constructWriteMutex(stuff));
        return constructStores(stuffvstore);
    }

    
Factory method called to instantiate com.fasterxml.storemate.store.StoreOperationThrottler to use for throttling underlying local database operations. If null is returned, store is free to use whatever default throttling mechanism it needs to for ensuring consistency, but nothing more.

Default implementation simply returns null to let the default throttler be used.

Since:
0.9.9
    {
        // null -> use the default implementation
        return null;
    }

    
Factory method called to instantiate com.fasterxml.storemate.store.util.PartitionedWriteMutex to use for ensuring atomicity of read-modify-write operations. If null is returned, store is free to use its default implementation.

Default implementation simply returns null to let the default implementation be used.

Since:
0.9.10
    {
        // null -> use the default implementation
        return null;
    }
    
    /*
    /**********************************************************************
    /* Accessors for tests
    /**********************************************************************
     */
    public TimeMaster getTimeMaster() {
        return ;
    }
    
    /*
    /**********************************************************************
    /* Helper methods
    /**********************************************************************
     */
    protected RequestPathBuilder rootPath(ServiceConfig config)
    {
        return new JdkHttpClientPathBuilder("localhost")
            .addPathSegments(config.servicePathRoot);
    }

    
Helper method for constructing Servlet registration path, given a basic end point path definition. Currently just verifies prefix and suffix slashes and adds '*' as necessary.
    protected String servletPath(RequestPathBuilder pathBuilder)
    {
        String base = pathBuilder.getPath();
        if (!base.endsWith("*")) {
            if (base.endsWith("/")) {
                base += "*";
            } else {
                base += "/*";
            }
        }
        if (!base.startsWith("/")) {
            base = "/"+base;
        }
        return base;
    }
    protected byte[] loadResource(String refthrows IOException
    {
        ByteArrayOutputStream bytes = new ByteArrayOutputStream(4000);
        InputStream in = getClass().getResourceAsStream(ref);
        byte[] buffer = new byte[4000];
        int count;
     
        while ((count = in.read(buffer)) > 0) {
            bytes.write(buffer, 0, count);
        }
        in.close();
        byte[] data = bytes.toByteArray();
        if (data.length == 0) {
            String msg = "Could not find resource '"+ref+"'";
            .error(msg);
            throw new IllegalArgumentException(msg);
        }
        return bytes.toByteArray();
    }
New to GrepCode? Check out our FAQ X