Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * *************************************************************************************
   *  Copyright (C) 2008 EsperTech, Inc. All rights reserved.                            *
   *  http://esper.codehaus.org                                                          *
   *  http://www.espertech.com                                                           *
   *  ---------------------------------------------------------------------------------- *
   *  The software in this package is published under the terms of the GPL license       *
   *  a copy of which has been included with this distribution in the license.txt file.  *
   * *************************************************************************************
  */
 
 package com.espertech.esperio.socket.core;
 
 
 import java.io.*;
 import java.util.*;
 
 public class WorkerThread extends Thread {
 
     private static Log log = LogFactory.getLog(WorkerThread.class);
 
     private final EPServiceProviderSPI engine;
     private final EsperSocketServiceRunnable runnable;
     private final String serviceName;
     private final Socket socket;
     private final Map<StringWriterCacheEntrystreamCache = new HashMap<StringWriterCacheEntry>();
     private final MethodResolutionServiceImpl methods;
     private final SocketConfig socketConfig;
 
     private ObjectInputStream ois;
     private BufferedReader br;
     private boolean isShutdown;
 
     public WorkerThread(String serviceNameEPServiceProviderSPI engineEsperSocketServiceRunnable runnableSocket socketSocketConfig socketConfigthrows IOException {
         this. = serviceName;
         this. = engine;
         this. = runnable;
         this. = socket;
         this. = new MethodResolutionServiceImpl(engine.getEngineImportService(), engine.getTimeProvider());
         this. = socketConfig;
 
         if (socketConfig.getDataType() == .) {
             if (socketConfig.getStream() == null || socketConfig.getStream().length() == 0) {
                 throw new IllegalArgumentException("Invalid null or empty value provided for required 'stream' parameter");
             }
             if (socketConfig.getPropertyOrder() == null || socketConfig.getPropertyOrder().length() == 0) {
                 throw new IllegalArgumentException("Invalid null or empty value provided for required 'propertyOrder' parameter");
             }
         }
 
         if ((socketConfig.getDataType() == null) || (socketConfig.getDataType() == .)) {
              = new ObjectInputStream(socket.getInputStream());
         }
         else {
              = new BufferedReader(new InputStreamReader(socket.getInputStream()));
         }
     }
 
     public void setShutdown(boolean shutdown) {
          = shutdown;
     }
 
     public void run() {
 
         try {
             while (!Thread.interrupted() && .isConnected()) {
 
                 if ( != null) {
                     Object object = .readObject();
                     handleObject(object);
                 }
                 else {
                     String str = .readLine();
                     if (str != null) {
                         handleString(str);
                     }
                     else {
                         break;
                     }
                 }
             }
         }
         catch (EOFException ex) {
             .debug("EOF received from connection");
        }
        catch (IOException ex) {
            if (!) {
                .error("I/O error: " + ex.getMessage(), ex);
            }
        }
        catch (ClassNotFoundException ex) {
            .error("Class not found: " + ex.getMessage());
        }
        finally {
            try {
                .close();
                .remove(this);
            } catch (IOException ignore) {}
        }
    }
    private void handleObject(Object input) {
        try {
            if (input instanceof Map) {
                Map map = (Mapinput;
                String type = (Stringmap.get("stream");
                if (type == null) {
                    .warn("Expected value for event type not found in map event provided to adapter");
                    return;
                }
                .getEPRuntime().sendEvent(maptype);
            }
            else {
                .getEPRuntime().sendEvent(input);
            }
        }
        catch (Throwable t) {
            .error("Unexpected exception encountered sending event " + input + " service '" +  + "' :" + t.getMessage(), t);
        }
    }
    private void handleString(String input) {
        if (input == null) {
            return;
        }
        try {
            Map<StringStringparameters = new HashMap<StringString>();
            WStringTokenizer tokenizer = new WStringTokenizer(input",");
            String eventTypeName;
            if (.getDataType() != .) {
                while (tokenizer.hasMoreTokens()) {
                    String item = tokenizer.nextToken();
                    int index = item.indexOf("=");
                    if (index != -1) {
                        String value = item.substring(index + 1, item.length());
                        String unescaped = .isUnescape() ? UnescapeUtil.unescapeJavaString(value) : value;
                        parameters.put(item.substring(0, index), unescaped);
                    }
                }
                eventTypeName = parameters.get("stream");
            }
            else {
                // handle property-ordered-csv
                int idx = -1;
                String[] propertyOrder = .getPropertyOrder().split(",");
                while (tokenizer.hasMoreTokens()) {
                    idx++;
                    String value = tokenizer.nextToken();
                    String unescaped = .isUnescape() ? UnescapeUtil.unescapeJavaString(value) : value;
                    if (idx < propertyOrder.length) {
                        parameters.put(propertyOrder[idx].trim(), unescaped);
                    }
                }
                eventTypeName = .getStream();
            }
            WriterCacheEntry cacheEntry = .get(eventTypeName);
            if (cacheEntry == null) {
                cacheEntry = makeCacheEntry(eventTypeName);
                .put(eventTypeNamecacheEntry);
            }
            if (cacheEntry == null) {
                return;
            }
            Object[] values = new Object[cacheEntry.getParsers().length];
            for (int i = 0; i < cacheEntry.getParsers().lengthi++) {
                String value = parameters.get(cacheEntry.getWritableProperties()[i].getPropertyName());
                if (value == null) {
                    continue;
                }
                values[i] = cacheEntry.getParsers()[i].parse(value);
            }
            EventBean theEvent = cacheEntry.getEventBeanManufacturer().make(values);
            .getEPRuntime().sendEvent(theEvent);
        }
        catch (Throwable t) {
            .error("Unexpected exception encountered sending event " + input + " service '" +  + "' :" + t.getMessage(), t);
        }
    }
    private WriterCacheEntry makeCacheEntry(String eventTypeName) {
        EventType eventType = .getEventAdapterService().getExistsTypeByName(eventTypeName);
        if (eventType == null) {
            .info("Event type by name '" + eventTypeName + "' not found.");
            return null;
        }
        if (!(eventType instanceof EventTypeSPI)) {
            .info("Event type by name '" + eventTypeName + "' is not writable.");
            return null;
        }
        EventTypeSPI eventTypeSPI = (EventTypeSPIeventType;
        Set<WriteablePropertyDescriptorwritablesSet = .getEventAdapterService().getWriteableProperties(eventTypeSPIfalse);
        List<WriteablePropertyDescriptorwritablePropertiesList = new ArrayList<WriteablePropertyDescriptor>();
        List<SimpleTypeParserparserList = new ArrayList<SimpleTypeParser>();
        for (WriteablePropertyDescriptor writableDesc : writablesSet)
        {
            SimpleTypeParser parser = SimpleTypeParserFactory.getParser(writableDesc.getType());
            if (parser == null) {
                .debug("No parser found for type '" + writableDesc.getType() + "'");
                continue;
            }
            writablePropertiesList.add(writableDesc);
            parserList.add(parser);
        }
        WriteablePropertyDescriptor[] writableProperties = writablePropertiesList.toArray(new WriteablePropertyDescriptor[writablePropertiesList.size()]);
        SimpleTypeParser[] parsers = parserList.toArray(new SimpleTypeParser[parserList.size()]);
        EventBeanManufacturer eventBeanManufacturer;
        try {
            eventBeanManufacturer = .getEventAdapterService().getManufacturer(eventTypewritableProperties.getEngineImportService(), false);
        }
        catch (EventBeanManufactureException e) {
            .info("Unable to create manufacturer for event type: " + e.getMessage(), e);
            return null;
        }
        return new WriterCacheEntry(eventBeanManufacturerwritablePropertiesparsers);
    }
New to GrepCode? Check out our FAQ X