Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
   *
   * Copyright (c) 2007-2010 Oracle and/or its affiliates. All rights reserved.
   *
   * The contents of this file are subject to the terms of either the GNU
   * General Public License Version 2 only ("GPL") or the Common Development
   * and Distribution License("CDDL") (collectively, the "License").  You
   * may not use this file except in compliance with the License.  You can
  * obtain a copy of the License at
  * https://glassfish.dev.java.net/public/CDDL+GPL_1_1.html
  * or packager/legal/LICENSE.txt.  See the License for the specific
  * language governing permissions and limitations under the License.
  *
  * When distributing the software, include this License Header Notice in each
  * file and include the License file at packager/legal/LICENSE.txt.
  *
  * GPL Classpath Exception:
  * Oracle designates this particular file as subject to the "Classpath"
  * exception as provided by Oracle in the GPL Version 2 section of the License
  * file that accompanied this code.
  *
  * Modifications:
  * If applicable, add the following below the License Header, with the fields
  * enclosed by brackets [] replaced by your own identifying information:
  * "Portions Copyright [year] [name of copyright owner]"
  *
  * Contributor(s):
  * If you wish your version of this file to be governed by only the CDDL or
  * only the GPL Version 2, indicate your decision by adding "[Contributor]
  * elects to include this software in this distribution under the [CDDL or GPL
  * Version 2] license."  If you don't indicate a single choice of license, a
  * recipient has the option to distribute your version of this file under
  * either the CDDL, the GPL Version 2 or to extend the choice of license to
  * its licensees as provided above.  However, if you add GPL Version 2 code
  * and therefore, elected the GPL Version 2 license, then the option applies
  * only if the new code is made subject to such option by the copyright
  * holder.
  */
 
 package com.sun.enterprise.web.connector.grizzly.comet;
 
Main class allowing Comet support on top of Grizzly Asynchronous Request Processing mechanism. This class is the entry point to any component interested to execute Comet request style. Components can be Servlets, JSP, JSF or pure Java class. A component interested to support Comet request must do:

 (1) First, register the topic on which Comet support will be applied:
     CometEngine cometEngine = CometEngine.getEngine()
     CometContext cometContext = cometEngine.register(topic)
 (2) Second, add an instance of CometHandler to the
     CometContext returned by the register method:
     CometContext.addCometHandler(com.sun.enterprise.web.connector.grizzly.comet.CometHandler). Executing this operation
     will tells Grizzly to suspend the response.
 (3) Finally, you can java.lang.Object.notify() other CometHandler
     to share information between  CometHandler. When notified,
     CometHandler can decides to push back the data, resume the
     response, or simply ignore the content of the notification.
 
You can also select the stage where the suspension of the response happens when registering the CometContext's topic (see register(java.lang.String)), which can be before, during or after invoking a Servlet

Author(s):
Jeanfrancois Arcand
 
 public final class CometEngine {
     private final static String NOTIFICATION_HANDLER =
         "com.sun.grizzly.comet.notificationHandlerClassName";

    
Comet thread pool config in the format "<on|off>[-<max-pool-size>[-<min-pool-size>]]"
 
     private final static String COMET_THREAD_POOL_CONFIG =
         "com.sun.grizzly.comet.thread-pool-config";
 
     // Disable suspended connection time out.
     public final static int DISABLE_SUSPEND_TIMEOUT = -1;
    // Disable client detection close.
    public final static int DISABLE_CLIENT_DISCONNECTION_DETECTION = 0;


    
The token used to support BEFORE_REQUEST_PROCESSING polling.
    public final static int BEFORE_REQUEST_PROCESSING = 0;


    
The token used to support AFTER_SERVLET_PROCESSING polling.
    public final static int AFTER_SERVLET_PROCESSING = 1;


    
The token used to support BEFORE_RESPONSE_PROCESSING polling.
    public final static int AFTER_RESPONSE_PROCESSING = 2;


    
Main logger
    private final static Logger logger = SelectorThread.logger();


    
    protected volatile Pipeline pipeline;

    
Sync object for pipeline update
    private final Object pipelineUpdateSync = new Object();


    
The single instance of this class.
    private static CometEngine cometEngine;


    
The current active CometContext keyed by context path.
Cache of CometTask instance
    protected Queue<CometTaskcometTasks;


    
Cache of CometContext instance.
    protected Queue<CometContextcometContexts;


    
The CometSelector used to poll requests.
    protected CometSelector cometSelector;


    
The default class to use when deciding which NotificationHandler to use. The default is DefaultNotificationHandler.
    protected static String notificationHandlerClassName =
        DefaultNotificationHandler.class.getName();


    
Temporary repository that associate a Thread ID with a Key. NOTE: A ThreadLocal might be more efficient.
    protected ConcurrentHashMap<Long,SelectionKeythreadsId;


    
Store modified CometContext.
The list of registered com.sun.enterprise.web.connector.grizzly.async.AsyncProcessorTask. This object are mainly keeping the state of the Comet request.
    // Simple lock.
    private ReentrantLock lock = new ReentrantLock();
    private static final ThreadPoolConfig threadPoolConfig = new ThreadPoolConfig();
    
    // --------------------------------------------------------------------- //
    static {
        final String notificationHandlerClass =
                System.getProperty();
        if (notificationHandlerClass != null) {
            setNotificationHandlerClassName(notificationHandlerClass);
        }
        // expected config string format is "<on|off>[-<max-pool-size>[-<min-pool-size>]]"
        final String threadPoolConfigString = System.getProperty();
        if (threadPoolConfigString != null) {
            .configure(threadPoolConfigString);
        }
    }

    
Create a singleton and initialize all lists required. Also create and start the CometSelector
    protected CometEngine() {
         = new ConcurrentQueue<CometTask>("CometEngine.cometTasks");
         = new ConcurrentQueue<CometContext>("CometEngine.cometContexts");
         = new CometSelector(this);
        try{
            .start();
        } catch(InterruptedException ex){
            .log(.,"Unable to start CometSelector",ex);
        }
         = new ConcurrentHashMap<Long,SelectionKey>();
         = new ConcurrentQueue<AsyncProcessorTask>("CometEngine.asyncTasks");
        if (.) {
            final LinkedListPipeline threadPool =
                    new LinkedListPipeline(.,
                    .,
                    "Comet-thread-pool", 0);
            
            threadPool.initPipeline();
            threadPool.startPipeline();
            setPipeline(threadPool);
        }
    }


    
Return a singleton of this Class.

Returns:
CometEngine the singleton.
    public synchronized static CometEngine getEngine(){
        if ( == null) {
             = new CometEngine();
        }
        return ;
    }


    
Unregister the CometHandler to the list of the CometContext. Invoking this method will invoke all CometHandler.onTerminate(com.sun.enterprise.web.connector.grizzly.comet.CometEvent) before removing the associated CometContext. Invoking that method will also resume the underlying connection associated with the CometHandler, similar to what CometContext.resumeCometHandler(com.sun.enterprise.web.connector.grizzly.comet.CometHandler) do.
    public synchronized CometContext unregister(String topic){
        CometContext cometContext = .get(topic);
        try{
            cometContext.notify(cometContext,.);
        } catch (IOException ex){
            .log(.,"unregister",ex);
        }
        finalizeContext(cometContext);
        return .remove(topic);
    }


    
Register a context path with this CometEngine. The CometContext returned will be of type AFTER_SERVLET_PROCESSING, which means the request target (most probably a Servlet) will be executed first and then polled.

Parameters:
topic the context path used to create the CometContext
Returns:
CometContext a configured CometContext.
    public CometContext register(String topic){
        return register(topic,);
    }


    
Register a context path with this CometEngine. The CometContext returned will be of type type.

Parameters:
topic the context path used to create the CometContext
type when the request will be suspended, e.g. BEFORE_REQUEST_PROCESSING, AFTER_SERVLET_PROCESSING or AFTER_RESPONSE_PROCESSING
Returns:
CometContext a configured CometContext.
    public synchronized CometContext register(String topicint type){
        return register(topictype,CometContext.class);
    }


    
Instanciate a new CometContext.

Parameters:
topic the topic the new CometContext will represent.
type when the request will be suspended, e.g. BEFORE_REQUEST_PROCESSING, AFTER_SERVLET_PROCESSING or AFTER_RESPONSE_PROCESSING
contextclass The CometContext class to instanticate.
Returns:
a new CometContext if not already created, or the existing one.
    public synchronized CometContext register(String topicint type,
            Class<? extends CometContextcontextclass ) {
        CometContext cometContext = .get(topic);
        if (cometContext == null){
            cometContext = .poll();
            if (cometContext == null){
                try{
                    cometContext = contextclass.getConstructor(String.classint.class).newInstance(topictype);
                } catch (Throwable t) {
                    .log(.,"Invalid CometContext class : ",t);
                    cometContext = new CometContext(topictype);
                }
                cometContext.setCometSelector();
                NotificationHandler notificationHandler
                    = loadNotificationHandlerInstance
                    ();
                cometContext.setNotificationHandler(notificationHandler);
                if (notificationHandler != null && (notificationHandler
                            instanceof DefaultNotificationHandler)){
                    ((DefaultNotificationHandler)notificationHandler)
                        .setPipeline();
                }
            }
            .put(topic,cometContext);
        }
        return cometContext;
    }


    
Handle an interrupted(or polled) request by matching the current context path with the registered one. If required, the bring the target component (Servlet) to the proper execution stage and then java.lang.Object.notify() the CometHandler

Parameters:
apt the current apt representing the request.
Returns:
boolean true if the request can be polled.
    protected boolean handle(AsyncProcessorTask aptthrows IOException{
        if ( == null){
             = apt.getPipeline();
        }
        String topic = apt.getProcessorTask().getRequestURI();
        CometContext cometContext = null;
        if (topic != null){
            cometContext = .get(topic);
            try{
                .lock();
                if (cometContext != null){
                    NotificationHandler notificationHandler =
                        cometContext.getNotificationHandler();
                    if (notificationHandler instanceof DefaultNotificationHandler){
                        ((DefaultNotificationHandler)notificationHandler)
                            .setPipeline();
                    }
                }
            } finally {
                .unlock();
            }
        }
        /*
         * If the cometContext is null, it means the context has never
         * been registered. The registration might happens during the
         * Servlet.service() execution so we need to keep a reference
         * to the current thread so we can later retrieve the associated
         * SelectionKey. The SelectionKey is required in order to park the
         * request.
         */
        boolean activateContinuation = true;
        SelectionKey key = apt.getProcessorTask().getSelectionKey();
        .put(Thread.currentThread().getId(),key);
        int continuationType = (cometContext == null)?
            :cometContext.continuationType;
        /*
         * Execute the Servlet.service method. CometEngine.register() or
         * CometContext.addCometHandler() might be invoked during the
         * execution.
         */
        executeServlet(continuationType,apt);
        /*
         * Will return a CometContext instance if and only if the
         * Servlet.service() have invoked CometContext.addCometHandler().
         * If the returned CometContext is null, it means we need to
         * execute a synchronous request.
         */
        cometContext = .remove(Thread.currentThread().getId());
        if (cometContext == null){
            activateContinuation = false;
        }
        boolean parkRequest = true;
        if (activateContinuation) {
            // Prevent the Servlet to suspend/resume the request in a single
            // transaction
            CometContext.addInProgressSelectionKey(key);
            // Disable keep-alive
            key.attach(null);
            boolean isBlocking = cometContext.isBlockingNotification();
            // We must initialize in blocking mode in case the connection is resumed
            // during the invocation of that method. If non blocking, there is a possible
            // thread race.
            cometContext.setBlockingNotification(true);
            cometContext.initialize(key);
            cometContext.setBlockingNotification(isBlocking);

            
The CometHandler has been resumed during the onIntialize method call if getCometHandler return null.
            if (cometContext.getCometHandler(key) != null){
                .offer(apt);
                CometTask cometTask = getCometTask(cometContextkeynull);
                cometTask.setSelectorThread(apt.getSelectorThread());
                cometTask.setExpirationDelay(cometContext.getExpirationDelay());
                cometContext.addActiveCometTask(cometTask);
                if (cometContext.getExpirationDelay() != ){
                    .registerKey(key,cometTask);
                }
            } else{
                parkRequest = false;
            }
            // Now we can allow full control
            CometContext.removeInProgressSelectionKey(key);
        } else {
            parkRequest = false;
        }
        return parkRequest;
    }


    
Tell the CometEngine to activate Grizzly ARP on that CometContext. This method is called when CometContext.addCometHandler() is invoked.

Parameters:
threadId the Thread.getId().
cometContext An instance of CometContext.
Returns:
key The SelectionKey associated with the current request.
    protected SelectionKey activateContinuation(Long threadId,
            CometContext cometContextboolean continueExecution){
        if (!continueExecution){
            .put(threadId,cometContext);
        }
        return .remove(threadId);
    }


    
Return a clean and configured CometTask

Parameters:
cometContext the CometContext to clean
key The current java.nio.channels.SelectionKey
Returns:
a new CometContext
     CometTask getCometTask(CometContext cometContext,SelectionKey key,
            Pipeline ctxPipeline){
        if (ctxPipeline == null){
            ctxPipeline = ;
        }
        CometTask cometTask = .poll();
        if (cometTask == null){
            cometTask = new CometTask();
        }
        cometTask.setCometContext(cometContext);
        cometTask.setSelectionKey(key);
        cometTask.setCometSelector();
        cometTask.setPipeline(ctxPipeline);
        return cometTask;
    }


    
Cleanup the CometContext

Parameters:
cometContext the CometContext to clean
    private void finalizeContext(CometContext cometContext) {
        Iterator<Stringiterator = .keySet().iterator();
        String topic;
        while(iterator.hasNext()){
            topic = iterator.next();
            if ( .get(topic).equals(cometContext) ){
                .remove(topic);
                break;
            }
        }
        for (AsyncProcessorTask apt){
            flushResponse(apt);
        }
        cometContext.recycle();
        .offer(cometContext);
    }


    
Return the CometContext associated with the topic.

Parameters:
topic the topic used to creates the CometContext
    public CometContext getCometContext(String topic){
        return .get(topic);
    }


    
The CometSelector is expiring idle java.nio.channels.SelectionKey, hence we need to resume the current request.

Parameters:
key the expired SelectionKey
    protected void interrupt(final SelectionKey key) {
        final CometTask cometTask = (CometTask)key.attachment();
        key.attach(null);
        interrupt(cometTask);
    }
    protected void interrupt(final CometTask cometTask) {
        if (cometTask == null){
            if (.isLoggable(.)){
                .fine("CometTask was null");
            }
            return;
        }
        final SelectionKey akey = cometTask.getSelectionKey();
        try{
            if (akey == nullreturn;
            final Iterator<AsyncProcessorTaskiterator = .iterator();
            AsyncHandler ah = null;
            while (iterator.hasNext()){
                final AsyncProcessorTask apt = iterator.next();
                ah = apt.getAsyncExecutor().getAsyncHandler();
                if (apt.getProcessorTask().getSelectionKey() == akey){
                    iterator.remove();
                    if (akey != null){
                        akey.attach(null);
                    }
                    
The connection was parked and resumed before the CometEngine.handle() terminated.
                    if (apt.getStage() != .){
                        break;
                    }
                    flushResponse(apt);
                    break;
                }
            }
        } finally {
            returnTask(cometTask);
        }
    }

    
Return a Task to the pool.
    protected void returnTask(CometTask cometTask){
        cometTask.recycle();
        .offer(cometTask);
    }


    
Resume the long polling request by unblocking the current java.nio.channels.SelectionKey
    protected synchronized void resume(SelectionKey key) {
        Iterator<AsyncProcessorTaskiterator = .iterator();
        AsyncProcessorTask apt = null;
        AsyncExecutor asyncE = null;
        ProcessorTask pt = null;
        while (iterator.hasNext()){
            apt = iterator.next();
            asyncE = apt.getAsyncExecutor();
            if (asyncE == null){
                return;
            }
            pt = apt.getProcessorTask();
            if (pt != null && pt.getSelectionKey() == key){
                iterator.remove();

                
The connection was parked and resumed before the CometEngine.handle() terminated.
                if (apt.getStage() != .){
                    break;
                }
                flushResponse(apt);
                break;
            }
        }
    }


    
Complete the asynchronous request.
    private void flushResponse(AsyncProcessorTask apt){
        apt.setStage(.);
        try{
            apt.doTask();
        } catch (IllegalStateException ex){
            if (.isLoggable(.)){
                .log(.,"flushResponse failed",ex);
            }
        } catch (IOException ex) {
            .log(.,"flushResponse failed",ex);
        }
    }


    
Bring the cometContext path target (most probably a Servlet) to the processing stage we need for Comet request processing.

Parameters:
cometContext The CometContext associated with the Servlet
apt the AsyncProcessorTask
    private void executeServlet(int continuationType,
            AsyncProcessorTask apt){
        try{
            switch (continuationType){
                case :
                    apt.setStage(.);
                    break;
                case :
                    apt.getProcessorTask().invokeAdapter();
                    return;
                case :
                    apt.setStage(.);
                    // Last step, execute directly from here.
                    apt.doTask();
                    break;
                default:
                    throw new IllegalStateException("Invalid state");
            }

            
We have finished the processing, most probably because we entered the FileCache or because we of the AFTER_RESPONSE_PROCESSING configuration.
            if (apt.getStage() == .){
                return;
            }
            apt.doTask();
        } catch (IOException ex){
            .log(.,"executeServlet",ex);
        }
    }


    
Return the default NotificationHandler class name.

Returns:
the default NotificationHandler class name.
    public static String getNotificationHandlerClassName() {
        return ;
    }


    
Set the default NotificationHandler class name.

Parameters:
the default NotificationHandler class name.
    public static void setNotificationHandlerClassName(String aNotificationHandlerClassName) {
         = aNotificationHandlerClassName;
    }
    public Pipeline getPipeline() {
        return ;
    }
    public void setPipeline(final Pipeline pipeline) {
        synchronized() {
            int oldSize = 0;
            if (this. != null) {
                oldSize = this..getMaxThreads();
                this..stopPipeline();
            }
            final int delta = pipeline.getMaxThreads() - oldSize;
            try {
                SelectorFactory.changeSelectorsBy(delta);
            } catch (Exception e) {
                .log(."Error resizing the selector pool"e);
            }
            this. = pipeline;
        }
    }

    
Util to load classes using reflection.
    protected final static NotificationHandler loadNotificationHandlerInstance(String className){
        Class clazz = null;
        try{
            clazz = Class.forName(className,true,
                    Thread.currentThread().getContextClassLoader());
            return (NotificationHandler)clazz.newInstance();
        } catch (Throwable t) {
            .log(.,"Invalid NotificationHandler: ",t);
        }
        return new DefaultNotificationHandler();
    }


    
Return the current logger.
    public final static Logger logger(){
        return ;
    }
    private static class ThreadPoolConfig {
        private boolean isEnabled = false;
        private int minSize = 1;
        private int maxSize = 5;

        
Parse config string in the format "<on|off>[-<max-pool-size>[-<min-pool-size>]]"

Parameters:
threadPoolConfigString <on|off>[-<max-pool-size>[-<min-pool-size>]]
        private void configure(String threadPoolConfigString) {
            String[] poolConfig = threadPoolConfigString.split("-");
            if (poolConfig.length > 0) {
                final String useThreadPoolConfig = poolConfig[0];
                 = useThreadPoolConfig.equalsIgnoreCase("on")  ||
                        useThreadPoolConfig.equalsIgnoreCase("yes") ||
                        useThreadPoolConfig.equalsIgnoreCase("enabled") ||
                        useThreadPoolConfig.equalsIgnoreCase("true");
            }
            if (poolConfig.length > 1) {
                final String threadPoolMaxConfig = poolConfig[1];
                try {
                     = Integer.parseInt(threadPoolMaxConfig);
                } catch (Exception ignored) {
                }
            }
            if (poolConfig.length > 2) {
                final String threadPoolMinConfig = poolConfig[2];
                try {
                     = Integer.parseInt(threadPoolMinConfig);
                } catch (Exception ignored) {
                }
            }
        }
    }
New to GrepCode? Check out our FAQ X