Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * #%L
   * Wikitty :: api
   * %%
   * Copyright (C) 2009 - 2010 CodeLutin, Benjamin Poussin
   * %%
   * This program is free software: you can redistribute it and/or modify
   * it under the terms of the GNU Lesser General Public License as 
   * published by the Free Software Foundation, either version 3 of the 
  * License, or (at your option) any later version.
  * 
  * This program is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Lesser Public License for more details.
  * 
  * You should have received a copy of the GNU General Lesser Public 
  * License along with this program.  If not, see
  * <http://www.gnu.org/licenses/lgpl-3.0.html>.
  * #L%
  */
 
 package org.nuiton.wikitty.services;
 
 import java.util.List;
 import java.util.Map;
 
Wikitty service notifier. Currently based on jgroups.

Author(s):
chatellier
Version:
$Revision$ Last update : $Date$ By : $Author$
 
 public class WikittyServiceNotifier extends WikittyServiceDelegator {

    
to use log facility, just put in your code: log.info(\"...\");
 
     static private Log log = LogFactory.getLog(WikittyServiceNotifier.class);

    
Wikitty service listener (all event).
 
Wikitty service listener (only for local event).
 
Wikitty service listener (only for remote event).
 
notifier
 
     protected WikittyListener notifier;

    
Tous les events en attentent d'etre envoyer aux listeners
 
     protected LinkedBlockingQueue<WikittyEventeventToSend;

    
thread utilise pour evoyer les events
 
     protected EventThread eventThread;

    
Constructor with configuration.

Parameters:
config config to use
ws delegate service
transporter transporter to use for remote event (listen or propagate). this transporter can be null if we don't want propagate or listen remote event
 
     public WikittyServiceNotifier(ApplicationConfig config,
             WikittyService wsRemoteNotifierTransporter transporter) {
         super(ws);
 
         // listeners
 
          = new LinkedBlockingQueue<WikittyEvent>();
         
         = new EventThread(,
                );
        if (transporter != null) {
            transporter.setWikittyServiceNotifier(this);
             = new EventPropagator(configthistransporter);
            // FIX poussin 201O1126 remplacement du ALL par LOCAL
            // sinon on renvoie des events qui nous ont ete envoyes :(
            // ca risque de boucler :(
            this.addWikittyServiceListener(..); // weak reference
        }
        if (.isInfoEnabled()) {
            if (transporter == null) {
                .info("RemoteNotifier synchronisation not used ");
            } else {
                .info("RemoteNotifier transporter: " + transporter.getClass().getName());
            }
        }
    }
    @Override
    public void addWikittyServiceListener(WikittyListener listenerServiceListenerType type) {
        // not delegated
        switch (type) {
            case  :
                synchronized() {
                    .add(listener);
                }
                break;
            case  :
                synchronized() {
                    .add(listener);
                }
                break;
            case  :
                synchronized() {
                    .add(listener);
                }
                break;
        }
    }
    @Override
    public void removeWikittyServiceListener(WikittyListener listenerServiceListenerType type) {
        // not delegated
        switch (type) {
            case  :
                synchronized() {
                    .remove(listener);
                }
                break;
            case  :
                synchronized () {
                    .remove(listener);
                }
                break;
            case  :
                synchronized() {
                    .remove(listener);
                }
                break;
        }
    }
    @Override
    public WikittyEvent clear(String securityToken) {
        WikittyEvent result = getDelegate().clear(securityToken);
        fireEvent(result);
        return result;
    }
    @Override
    public WikittyEvent store(String securityToken,
            Collection<Wikittywikittiesboolean force) {
        WikittyEvent result = getDelegate().store(securityTokenwikittiesforce);
        // notify listeners
        fireEvent(result);
        return result;
    }
    @Override
    public WikittyEvent storeExtension(String securityToken,
            Collection<WikittyExtensionexts) {
        WikittyEvent result = getDelegate().storeExtension(securityTokenexts);
        fireEvent(result);
        return result;
    }
    @Override
            String securityTokenCollection<StringextNames) {
        WikittyEvent result = getDelegate().deleteExtension(securityTokenextNames);
        fireEvent(result);
        return result;
    }
    @Override
    public WikittyEvent delete(String securityTokenCollection<Stringids) {
        WikittyEvent result = getDelegate().delete(securityTokenids);
        // notify listeners
        fireEvent(result);
        return result;
    }
    @Override
    public WikittyEvent deleteTree(String securityTokenString wikittyId) {
        WikittyEvent result = getDelegate().deleteTree(securityTokenwikittyId);
        fireEvent(result);
        return result;
    }
    @Override
    public WikittyEvent replay(
            String securityTokenList<WikittyEventeventsboolean force) {
        WikittyEvent result = getDelegate().replay(securityTokeneventsforce);
        // notify listeners
        fireEvent(result);
        return result;
    }

    
Fire event to all registred listener. Take care about WikittyEvent.isRemote() for fire.

Parameters:
event event to fire
    protected void fireEvent(final WikittyEvent event) {
        // ajout d'un thread, car si les listeners doivent
        // ouvrir une transaction WikittyTransaction
        // alors que celui qui lance l'event en a une ouverte
        // cela cause une exception JTA
        EventThread thread = getEventThread();
        // si le thread de notification est en cours d'arret on leve une exception
        if (thread.stopAsked()) {
            throw new WikittyException(
                    "Event thread dispatcher is stopped, no more event can be send");
        } else {
            .offer(event);
            if (!thread.isAlive()) {
                // on demarre le thread que lorsqu'il y a le premier event d'arrive
                thread.start();
            }
        }
    }

    
fire event passed in argument. Before fire, change source to current WikittyServiceNotifier and set remote event to true.
    public void processRemoteEvent(WikittyEvent event) {
         //source is transient, add it here :
        event.setSource(this);
        event.setRemote(true); // received event became remote
        fireEvent(event);
    }

    
Retourne le dernier thread utiliser pour envoyer les events.

Returns:
    public EventThread getEventThread() {
        return ;
    }
    @Override
    protected void finalize() throws Throwable {
        getEventThread().askStop();
        super.finalize();
    }

    
Thread utilise pour envoyer les events. On rend accessible ce thread pour pouvoir y acceder depuis l'exterieur (pour l'instant pour les tests mais peut-etre plus tard du monitoring). Il permet a un thread d'attendre qu'un evenement leve a une certaine heure est bien ete dispatchee grace a la methode waitfor
    static public class EventThread extends Thread {
        protected boolean mustBeRunning = true;
        
        protected SortedMap<LongObjectwaiter = new TreeMap<LongObject>();
        
reference vers la collection qui contient les events a envoyer
        protected LinkedBlockingQueue<WikittyEventeventToSend;

        
Wikitty service listener (all event).
        protected ListenerSet<WikittyListenerallWikittyServiceListeners;

        
Wikitty service listener (only for local event).
        protected ListenerSet<WikittyListenerlocalWikittyServiceListeners;

        
Wikitty service listener (only for remote event).
        protected ListenerSet<WikittyListenerremoteWikittyServiceListeners;

        
heure du dernier event envoye
        protected long lastEventTime = 0;
        public EventThread(LinkedBlockingQueue<WikittyEventeventToSend,
                ListenerSet<WikittyListenerallWikittyServiceListeners,
                ListenerSet<WikittyListenerlocalWikittyServiceListeners,
                ListenerSet<WikittyListenerremoteWikittyServiceListeners) {
            super("wikitty-event-thread");
            this. = eventToSend;
            this. = allWikittyServiceListeners;
            this. = localWikittyServiceListeners;
            this. = remoteWikittyServiceListeners;
        }
        
        
demande l'arret du thread, ne doit ĂȘtre appeler que par le finalize du WikittyServiceNotifier
        protected void askStop() {
            this. = false;
        }

        
retourne vrai si on a demande l'arret du thread

Returns:
        public boolean stopAsked() {
            return !;
        }

        
thread that want wait for particulare event to be processed, can be call this method with event time in argument. Used only in unit test but this is necessary for test.
        public void waitFor(long eventTimethrows InterruptedException {
            Object mutex = null;
            sleep(1); // sleep 1 millis to prevent problem with 2 events in same millis
            synchronized () {
                if (eventTime <= ) {
                    // le thread demande a attendre un event deja passe
                    // on le met donc pas en attente
                    if (.isDebugEnabled()) {
                        .debug("event deja passe " + eventTime + " <= " + );
                    }
                    return;
                }
                mutex = .get(eventTime);
                if (mutex == null) {
                    mutex = new Object();
                    .put(eventTimemutex);
                }
            }
            synchronized(mutex) {
                mutex.wait();
            }
        }
        @Override
        public void run() {
            while() {
                processEventQueue();
            }
            // le thread est arrete, force l'envoi de tous les events pour
            // liberer correctement tous les threads en attente
            // plus aucun event ne doit etre accepte dans la queue (voir method fireEvent)
            processEventQueue();
        }
        protected void processEventQueue() {
            try {
                WikittyEvent event;
                // on attend pas indefiniment un event, car il faut verifier
                // aussi que personne n'a arrete le thread
                while (null != (event = .poll(5, .))) {
                    try {
                        synchronized () {
                            for(WikittyEvent.WikittyEventType type : event.getType()) {
                                .fire(
                                        type.listenerMethodNameevent);
                            }
                        }
                    } catch (Exception eee) {
                        .error("Can't notify listener"eee);
                    }
                    try {
                        if (event.isRemote()) {
                            synchronized () {
                                for (WikittyEvent.WikittyEventType type : event.getType()) {
                                    .fire(
                                            type.listenerMethodNameevent);
                                }
                            }
                        } else {
                            synchronized () {
                                for (WikittyEvent.WikittyEventType type : event.getType()) {
                                    .fire(
                                            type.listenerMethodNameevent);
                                }
                            }
                        }
                    } catch (Exception eee) {
                        .error("Can't notify listener"eee);
                    }
                    synchronized () {
                        // on met a jour l'heure du dernier event envoye
                         = event.getTime();
                        // on previent les threads en attente si besoin
                        // dans un premier temps on ne recupere que ceux
                        // inferieur a event.getTime()
                        SortedMap<LongObjectsubwaiter =
                                .headMap(event.getTime());
                        for (Iterator<Map.Entry<LongObject>> i = subwaiter.entrySet().iterator(); i.hasNext();) {
                            Object mutex = i.next().getValue();
                            i.remove();
                            synchronized (mutex) {
                                mutex.notifyAll();
                            }
                        }
                        // dans un second temps on verifie si le suivant ne
                        // serait pas egal a event.getTime()
                        if (!.isEmpty()) {
                            Long time = .firstKey();
                            // il pourrait y avoir plusieurs event avec la meme heure
                            // il faut bien tous les liberer
                            while (time.equals(event.getTime())) {
                                // il est bien egal on l'enleve aussi
                                Object mutex = .remove(time);
                                synchronized (mutex) {
                                    mutex.notifyAll();
                                }
                                if (!.isEmpty()) {
                                    time = .firstKey();
                                } else {
                                    break;
                                }
                            }
                        }
                    }
                }
            } catch (InterruptedException eee) {
                .error("Notification thread error"eee);
            }
        }
    };

    
This interface must be implemented to send and received remote message. Only sendMessage method is in interface but you must write receive method too, but this method is protocol specific and can't appear in interface
    static public interface RemoteNotifierTransporter {

        
this method must be call before RemoteNotifierTransporter utilisation to indicate which service use it

Parameters:
ws
        public void setWikittyServiceNotifier(WikittyServiceNotifier ws);
        
        
Send a jgroup message to all other channel member.

Parameters:
event message to send
        public void sendMessage(WikittyEvent eventthrows Exception;
    }

    
Class used to notify remote listener. This class is realy activate only if wikitty.notifier.transporter.class configuration is found and wikitty.service.event.propagateEvent is true
    static public class EventPropagator implements WikittyListener {

        
to use log facility, just put in your code: log.info(\"...\");
        static private Log log = LogFactory.getLog(EventPropagator.class);

        
Notifier service reference reference.
        protected WikittyServiceNotifier ws;
        protected RemoteNotifierTransporter transporter;
        public EventPropagator(ApplicationConfig config,
                WikittyServiceNotifier wsRemoteNotifierTransporter transporter) {
            this. = ws;
            this. = transporter;
        }

        
Send a jgroup message to all other channel member.

Parameters:
event message to send
        protected void sendMessage(WikittyEvent event) {
            try {
                if (.isDebugEnabled()) {
                    .debug("Try to send message : " + event);
                }
                .sendMessage(event);
                if (.isDebugEnabled()) {
                    .debug("Message is sent : " + event);
                }
            } catch (Exception eee) {
                if (.isErrorEnabled()) {
                    .error("Can't send message"eee);
                }
            }
        }
        /*
         * @see org.nuiton.wikitty.WikittyListener#putWikitty(org.nuiton.wikitty.Wikitty[])
         */
        @Override
        public void putWikitty(WikittyEvent event) {
            sendMessage(event);
        }
        /*
         * @see org.nuiton.wikitty.WikittyListener#removeWikitty(java.lang.String[])
         */
        @Override
        public void removeWikitty(WikittyEvent event) {
            sendMessage(event);
        }
        /*
         * @see org.nuiton.wikitty.WikittyListener#clearWikitty()
         */
        @Override
        public void clearWikitty(WikittyEvent event) {
            sendMessage(event);
        }
        /*
         * @see org.nuiton.wikitty.WikittyListener#putExtension(org.nuiton.wikitty.WikittyExtension[])
         */
        @Override
        public void putExtension(WikittyEvent event) {
            sendMessage(event);
        }
        @Override
        public void removeExtension(WikittyEvent event) {
            sendMessage(event);
        }
        /*
         * @see org.nuiton.wikitty.WikittyListener#clearExtension()
         */
        @Override
        public void clearExtension(WikittyEvent event) {
            sendMessage(event);
        }
    }
New to GrepCode? Check out our FAQ X