Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.fasterxml.clustermate.dw;
  
  
  import java.io.*;
  import java.util.*;
 
 
 
 public abstract class DWBasedService<
     K extends EntryKey,
     E extends StoredEntry<K>,
     SCONFIG extends ServiceConfig,
     CONF extends DWConfigBase<SCONFIG, CONF>
 >
     extends Application<CONF>
 {
     protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
     /*
     /**********************************************************************
     /* Basic configuration
     /**********************************************************************
      */

    
Running mode of this service; used to indicate whether we are running in test mode, and whether background tasks ought to be run or not.
 
     protected final RunMode _runMode;

    
We need to keep track of things created, to let tests access the information
 
     protected SharedServiceStuff _serviceStuff;
    
    
This object is needed to allow test code to work around usual waiting time restrictions.
 
     protected final TimeMaster _timeMaster;
 
     protected SCONFIG _config;
     
     /*
     /**********************************************************************
     /* State management
     /**********************************************************************
      */
    
    
Container for various stores we use for data, metadata.
 
     protected StoresImpl<K,E> _stores;

    
And we better hang on to cluster view as well
 
     
     /*
     /**********************************************************************
     /* Service handlers
     /**********************************************************************
      */
    protected SyncHandler<K,E> _syncHandler;
    
    protected StoreHandler<K,E,?> _storeHandler;
    /*
    /**********************************************************************
    /* Background processes
    /**********************************************************************
     */
    
    
Manager object that deals with data expiration and related clean up tasks.
    protected CleanerUpper<K,E> _cleanerUpper;
    
    /*
    /**********************************************************************
    /* State
    /**********************************************************************
     */
    
    
List of com.fasterxml.storemate.shared.StartAndStoppable objects we will dispatch start/stop calls to.
    protected List<StartAndStoppable_managed = null;
    
    /*
    /**********************************************************************
    /* Construction
    /**********************************************************************
     */
    protected DWBasedService(TimeMaster timingsRunMode mode)
    {
        super();
         = timings;
         = mode;
    }
    /*
    /**********************************************************************
    /* Life-cycle
    /**********************************************************************
     */
    @Override
    public void initialize(Bootstrap<CONF> bootstrap) {
        // Static stuff from under /html (except for root  level things
        // like /index.html that need special handling)
        bootstrap.addBundle(new AssetsBundle("/html"));
    }
    public void _start() throws Exception
    {
        .info("Starting up {} Managed objects".size());
        for (StartAndStoppable managed : ) {
            .info("Starting up: {}"managed.getClass().getName());
            try {
                managed.start();
            } catch (Exception e) {
                .warn("Problems starting component {}: {}".getClass().getName(), e);
            }
        }
        .info("Managed object startup complete");
        /* 27-Mar-2013, tatu: Also: need to register shutdown hook to be
         *    able to do 'prepareForStop()'
         */
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                _prepareForStop();
            }
        });
    }
    protected final AtomicBoolean _prepareForStopCalled = new AtomicBoolean(false);
    
    public void _prepareForStop()
    {
        // only call once:
        if (!.compareAndSet(falsetrue)) {
            return;
        }
        .info("Calling prepareForStop on {} Managed objects".size());
        for (StartAndStoppable managed : ) {
            try {
                managed.prepareForStop();
            } catch (Exception e) {
                .warn("Problems with prepareForStop on {}: {}"managed.getClass().getName(),
                        e.getMessage());
            }
        }
        .info("prepareForStop() for Managed objects complete");
    }
    
    public void _stop() throws Exception
    {
        if ( == null) {
            .error("_managed is null on _stop(): should never happen; skipping");
            return;
        }
        
        int count = .size();
        .info("Stopping {} managed objects"count);
        while (--count >= 0) {
            StartAndStoppable managed = .remove(count);
            String desc = managed.getClass().getName();
            try {
                .info("Stopping: {}"desc);
                managed.stop();
            } catch (Exception e) {
                .warn(String.format("Problems trying to stop Managed of type %s: (%s) %s",
                        desce.getClass().getName(), e.getMessage()),
                        e);
            }
        }
        .info("Managed object shutdown complete");
    }
    /* Ideally, shouldn't need to track this; but after having a few issues,
     * decided better safe than sorry.
     */
    protected boolean _hasBeenRun = false;
    
    @Override
    public void run(CONF dwConfigEnvironment environmentthrows IOException
    {
        synchronized (this) {
            if () {
                throw new IllegalStateException("Trying to run(config, env) DWBasedService more than once");
            }
             = true;
        }
        
        // first things first: we need to get start()/stop() calls, so:
        Listener l = new Listener() {
            @Override
            public void lifeCycleStarting(LifeCycle event) {
                try {
                    _start();
                } catch (Exception e) {
                    .warn("Problems starting components: {}"e);
                }
            }
            @Override
            public void lifeCycleStarted(LifeCycle event) {
            }
            @Override
            public void lifeCycleFailure(LifeCycle eventThrowable cause) {
                // what to do here, if anything?
            }
            @Override
            public void lifeCycleStopping(LifeCycle event) {
                try {
                    _prepareForStop();
                } catch (Exception e) {
                    .warn("Problems preparing components for stopping: {}"e);
                }
            }
            @Override
            public void lifeCycleStopped(LifeCycle event) {
                try {
                    _stop();
                } catch (Exception e) {
                    .warn("Problems stopping components: {}"e);
                }
            }
        };
        environment.getApplicationContext().addLifeCycleListener(l);
        
         = dwConfig.getServiceConfig();
        // 09-Apr-2014, tatu: This should NOT be necessary any more with DW-0.7:
        //dwConfig.overrideGZIPEnabled(false);
        
         = new ArrayList<StartAndStoppable>();
                constructFileManager());
        if (.isTesting()) {
            .markAsTest();
        }
        
        /* Let's try opening up StorableStore: must have been created,
         * and have tables we expect; otherwise we'll fail right away.
         */
        .info("Trying to open Stores (StorableStore, node store, last-access store)");
         = constructStores();
        .add();
        .info("Opened StorableStore successfully");
        .initAndOpen(false);
        // Then: read in cluster information (config file, backend store settings):
        final int port = dwConfig.getApplicationPort();
        .info("Initializing cluster configuration (port {})..."port);
        final long startTime = .currentTimeMillis();
        ClusterViewByServerImpl<K,E> cl = new ClusterBootstrapper<K,E>(startTime)
                .bootstrap(port);
         = cl;
        .add();
        .info("Cluster configuration setup complete, with {} nodes".size());
        
        // Index page must be done via resource, otherwise will conflict with DW/JAX-RS Servlet:
        environment.jersey().register(new IndexResource(loadResource("/index.html"),
                loadResource("/favicon.jpg")));
        // Let's first construct handlers we use:
        .info("Creating handlers for service endpoints");
         = constructSyncHandler();
        .add();
        .info("Adding service end points");
        addServiceEndpoints(environmentconstructServletFactory());
        .info("Adding health checks");
        addHealthChecks(environment);
        if (.shouldRunTasks()) {
            .info("Initializing background cleaner tasks");
             = constructCleanerUpper();
            if ( != null) {
                .add();
            }
        } else {
            .info("Skipping cleaner tasks for light-weight testing");
        }
        .info("Initialization complete: HTTP service now running on port {}",
                dwConfig.getApplicationPort());
    }
    
    /*
    /**********************************************************************
    /* Accessors
    /**********************************************************************
     */
    public boolean isTesting() { return .isTesting(); }
    protected SCONFIG serviceConfig() {
        return ;
    }
    // For tests:
    public TimeMaster getTimeMaster() {
        return ;
    }
    
    /*
    /**********************************************************************
    /* Factory methods: basic bootstrap config objects.
    /**********************************************************************
     */

    
Overridable method that is used for getting helper object used for constructing com.fasterxml.clustermate.service.store.StoredEntry instances to store in the entry metadata store.
    @SuppressWarnings("unchecked")
    protected StoredEntryConverter<K,E,?> constructEntryConverter() {
        return (StoredEntryConverter<K,E,?>) serviceConfig().getEntryConverter();
    }
    protected abstract FileManager constructFileManager();
    protected abstract StoresImpl<K,E> constructStores(StorableStore store,
            NodeStateStore<IpAndPortActiveNodeStatenodeStates);
    protected abstract SharedServiceStuff constructServiceStuff(SCONFIG serviceConfig,
            TimeMaster timeMasterStoredEntryConverter<K,E,?> entryConverter,
            FileManager files);
    
    /*
    /**********************************************************************
    /* Factory methods for constructing handlers
    /**********************************************************************
     */
    protected abstract StoreHandler<K,E,?> constructStoreHandler();
    protected SyncHandler<K,E> constructSyncHandler() {
        return new SyncHandler<K,E>();
    }
        return new ClusterInfoHandler();
    }
    protected CleanerUpper<K,E> constructCleanerUpper() {
        return new CleanerUpper<K,E>(,
                constructCleanupTasks());
    }
    protected abstract List<CleanupTask<?>> constructCleanupTasks();
    /*
    /**********************************************************************
    /* Methods for service end point additions
    /**********************************************************************
     */
    protected abstract CMServletFactory constructServletFactory();
    /*
    protected CMServletFactory constructServletFactory() {
        return new DefaultCMServletFactory<K,E,SCONFIG>(_serviceStuff,
                _stores, _cluster, _clusterInfoHandler, _syncHandler, _storeHandler);
    }
    */
    
    
Method called to create service endpoints, given set of handlers.
    protected void addServiceEndpoints(Environment environmentCMServletFactory servletFactory)
    {
        RequestPathBuilder<?> rootBuilder = rootPath(.getServiceConfig());
        String rootPath = servletPath(rootBuilder);
        .info("Registering main Dispatcher servlet at: "+rootPath);
        ServletBase dispatcher = servletFactory.contructDispatcherServlet();
        if (dispatcher != null) {
            environment.servlets()
                .addServlet("CM-Dispatcher"dispatcher)
                .addMapping(rootPath);
        }
        // // And optional additional servlet for for entry access
        addStoreEntryServlet(environment);
    }
    
    
Overridable method used for hooking standard entry access endpoint into alternate location. Usually used for backwards compatibility purposes.
    protected void addStoreEntryServlet(Environment environment) { }

    
Method called for installing DropWizard-mandated health checks.
    protected void addHealthChecks(Environment environment) { }
    
    /*
    /**********************************************************************
    /* Internal methods
    /**********************************************************************
     */
    
    protected StoresImpl<K,E> constructStores() throws IOException
    {
        final SCONFIG sconfig = serviceConfig();
        StoreBackendBuilder<?> b = sconfig.instantiateBackendBuilder();
        StoreBackendConfig backendConfig = sconfig._storeBackendConfigOverride;
        if (backendConfig == null) { // no overrides, use databinding
            Class<? extends StoreBackendConfigcfgType = b.getConfigClass();
            if (sconfig.storeBackendConfig == null) {
                throw new IllegalStateException("Missing 'config.storeBackendConfig");
            }
            backendConfig = .convertValue(sconfig.storeBackendConfigcfgType);
        }
        b = b.with(sconfig.storeConfig)
                .with(backendConfig);
        StoreBackend backend = b.build();
        StorableStore store = new StorableStoreImpl(sconfig.storeConfigbackend,
                .getFileManager(),
               constructThrottler(), constructWriteMutex());
        return constructStores(storenodeStates);
    }
    {
        /* 09-Dec-2013, tatu: Now we will also construct NodeStateStore using
         *   the very same builder...
         */
        ObjectMapper mapper = .jsonMapper();
        return backendBuilder.<IpAndPortActiveNodeState>buildNodeStateStore(root,
                        new JacksonBasedConverter<IpAndPort>(mapperIpAndPort.class),
                        new JacksonBasedConverter<ActiveNodeState>(mapperActiveNodeState.class));
    }
    
    
Factory method called to instantiate com.fasterxml.storemate.store.StoreOperationThrottler to use for throttling underlying local database operations. If null is returned, store is free to use whatever default throttling mechanism it needs to for ensuring consistency, but nothing more.

Default implementation simply returns null to let the default throttler be used.

        // null -> use the default implementation
        return null;
    }

    
Factory method called to instantiate com.fasterxml.storemate.store.util.PartitionedWriteMutex to use for ensuring atomicity of read-modify-write operations. If null is returned, store is free to use its default implementation.

Default implementation simply returns null to let the default implementation be used.

        // null -> use the default implementation
        return null;
    }
    
    /*
    /**********************************************************************
    /* Helper methods
    /**********************************************************************
     */
    {
        return new JdkHttpClientPathBuilder("localhost")
            .addPathSegments(config.servicePathRoot);
    }

    
Helper method for constructing Servlet registration path, given a basic end point path definition. Currently just verifies prefix and suffix slashes and adds '*' as necessary.
    protected String servletPath(RequestPathBuilder<?> pathBuilder)
    {
        String base = pathBuilder.getPath();
        if (!base.endsWith("*")) {
            if (base.endsWith("/")) {
                base += "*";
            } else {
                base += "/*";
            }
        }
        if (!base.startsWith("/")) {
            base = "/"+base;
        }
        return base;
    }
    protected byte[] loadResource(String refthrows IOException
    {
        ByteArrayOutputStream bytes = new ByteArrayOutputStream(4000);
        InputStream in = getClass().getResourceAsStream(ref);
        byte[] buffer = new byte[4000];
        int count;
     
        while ((count = in.read(buffer)) > 0) {
            bytes.write(buffer, 0, count);
        }
        in.close();
        byte[] data = bytes.toByteArray();
        if (data.length == 0) {
            String msg = "Could not find resource '"+ref+"'";
            .error(msg);
            throw new IllegalArgumentException(msg);
        }
        return bytes.toByteArray();
    }
New to GrepCode? Check out our FAQ X