Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 package org.apache.ode.bpel.memdao;
 
 import java.util.Date;
 import java.util.Map;
 
 
A very simple, in-memory implementation of the org.apache.ode.bpel.dao.ProcessDAO interface.
 
 class ProcessDaoImpl extends DaoBaseImpl implements ProcessDAO {
     private static final Log __log = LogFactory.getLog(ProcessDaoImpl.class);
 
     private QName _processId;
     private QName _type;
     private long _version;
     protected final Map<LongProcessInstanceDAO_instances = new ConcurrentHashMap<LongProcessInstanceDAO>();
     protected final Map<LongLong_instancesAge = new ConcurrentHashMap<LongLong>();
     protected final Map<IntegerPartnerLinkDAO_plinks = new ConcurrentHashMap<IntegerPartnerLinkDAO>();
     private Map<QNameProcessDaoImpl_store;
     private BpelDAOConnectionImpl _conn;
     private int _executionCount = 0;
     private volatile long _lastRemoval = 0;
 
     private String _guid;
 
     public ProcessDaoImpl(BpelDAOConnectionImpl connMap<QNameProcessDaoImplstore,
                           QName processIdQName typeString guidlong version) {
         if (.isDebugEnabled()) {
             .debug("Creating ProcessDao object for process \"" + processId + "\".");
         }
 
          = guid;
          = conn;
          = store;
          = processId;
          = type;
          = version;
     }
 
     public Serializable getId() {
         return ;
     }
     
     public QName getProcessId() {
         return ;
     }
 
     public CorrelatorDAO getCorrelator(String cid) {
         CorrelatorDAO ret = .get(cid);
         if (ret == null) {
             throw new IllegalArgumentException("no such correlator: " + cid);
         }
         return ret;
     }
 
     public Collection<CorrelatorDAOgetCorrelators() {
         // Note: _correlators.values() is a Collection<CorrealatorDaoImpl>. We can't just return this object
         // since Collection<CorrelatorDAO> is /not/ assignment compatible with Collection<CorrelatorDaoImpl>. 
         // However, a immutable Collection<CorrelationDAO> is assignment compatible with Collection<CorrelatorDaoImpl>,
         // but.... we need to introduce some ambiguity into the type hierarchy so that Java will infer the correct type.
         
         // Make an ambiguous collection.
        Collection<? extends CorrelatorDAOfoo =  .values();
        // In order to get a collection of the super-type from a sub-type we must make the collection read-only. 
        return Collections.unmodifiableCollection(foo);
    }
    
    public void removeRoutes(String routeIdProcessInstanceDAO target) {
        for (CorrelatorDAO correlatorDAO : .values()) {
            correlatorDAO.removeRoutes(routeIdtarget);
        }
    }
    public ProcessInstanceDAO createInstance(CorrelatorDAO correlator) {
        final ProcessInstanceDaoImpl newInstance = new ProcessInstanceDaoImpl(thiscorrelator);
        .defer(new Runnable() {
            public void run() {
                .put(newInstance.getInstanceId(), newInstance);
                .put(newInstance.getInstanceId(), System.currentTimeMillis());
            }
        });
        discardOldInstances();
        
        // Removing right away on rollback
        final Long iid = newInstance.getInstanceId();
        .onRollback(new Runnable() {
            public void run() {
                .remove(iid);
                .remove(iid);
            }
        });
        ++;
        return newInstance;
    }
    public ProcessInstanceDAO getInstance(Long instanceId) {
        return .get(instanceId);
    }
        ArrayList<ProcessInstanceDAOresult = new ArrayList<ProcessInstanceDAO>();
        for (ProcessInstanceDAO instance : .values()) {
            for (CorrelationSetDAO corrSet : instance.getCorrelationSets()) {
                if (corrSet.getValue().equals(key)) result.add(instance);
            }
        }
        return result;
    }
    public void instanceCompleted(ProcessInstanceDAO instance) {
        // Cleaning up
        if (.isDebugEnabled())
          .debug("Removing completed process instance " + instance.getInstanceId() + " from in-memory store.");
        .remove(instance.getInstanceId());
        ProcessInstanceDAO removed = .remove(instance.getInstanceId());
        if (removed == null) {
            // Checking for leftover instances that should be removed
            ArrayList<Longremovals = new ArrayList<Long>();
            for (Long iid : removals) {
                .remove(iid);
            }
            .removeAll(removals);
            // The instance can't be found probably because the transaction isn't committed yet and
            // it doesn't exist. Saving its id for later cleanup.
            .add(instance.getInstanceId());
        }
    }
    public void deleteProcessAndRoutes() {
        .remove();
    }
    
    public long getVersion() {
        return ;
    }
    public String getDeployer() {
        return "nobody";
    }
    public QName getType() {
        return ;
    }
    public CorrelatorDAO addCorrelator(String correlator) {
        CorrelatorDaoImpl corr = new CorrelatorDaoImpl(correlator);
        .put(corr.getCorrelatorId(), corr);
        return corr;
    }

    
Nothing to do.
    public void update() {
        //TODO Check requirement for persisting.
    }
    public int getNumInstances() {
        // Instances are removed after execution, using a counter instead
        return ;
    }
        return getInstance(iid);
    }
    public int getActivityFailureCount() {
        return 0;  
    }
    public Date getActivityFailureDateTime() {
        return null;
    }
    public String getGuid() {
        return ;
    }
    
    public void setGuid(String guid) {
         = guid;
    }
        ArrayList<ProcessInstanceDAOpis = new ArrayList<ProcessInstanceDAO>();
        for (ProcessInstanceDAO processInstanceDAO : .values()) {
            if (processInstanceDAO.getState() == .)
                pis.add(processInstanceDAO);
        }
        return pis;
    }

    
Discard in-memory instances that exceeded their time-to-live to prevent memory leaks
    void discardOldInstances() {
        long now = System.currentTimeMillis();
        if (now >  + (./10)) {
             = now;
            Object[] oldInstances = .keySet().toArray();
            for (int i=oldInstances.length-1; i>=0; i--) {
                Long id = (LongoldInstances[i];
                Long age = .get(id);
                if (age != null && now-age > .) {
                    if (.get(id) != null) {
                        .warn("Discarding in-memory instance "+id+" because it exceeded its time-to-live: "+.get(id));
                    }
                    .remove(id);
                    .remove(id);
                }
            }
        }
    }
New to GrepCode? Check out our FAQ X