Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.espertech.esperio.socket.core;
  
 
 import java.io.*;
 import java.util.*;
 
 public class WorkerThread extends Thread {
 
     private static Log log = LogFactory.getLog(WorkerThread.class);
 
     private final EPServiceProviderSPI engine;
     private final EsperSocketServiceRunnable runnable;
     private final String serviceName;
     private final Socket socket;
     private final Map<StringWriterCacheEntrystreamCache = new HashMap<StringWriterCacheEntry>();
     private final MethodResolutionServiceImpl methods;
     private ObjectInputStream ois;
     private BufferedReader br;
     private boolean isShutdown;
 
     public WorkerThread(String serviceNameEPServiceProviderSPI engineEsperSocketServiceRunnable runnableSocket socketDataType dataTypethrows IOException {
         this. = serviceName;
         this. = engine;
         this. = runnable;
         this. = socket;
         this. = new MethodResolutionServiceImpl(engine.getEngineImportService(), engine.getTimeProvider());
 
         if ((dataType == null) || (dataType == .)) {
              = new ObjectInputStream(socket.getInputStream());
         }
         else {
              = new BufferedReader(new InputStreamReader(socket.getInputStream()));
         }
     }
 
     public void setShutdown(boolean shutdown) {
          = shutdown;
     }
 
     public void run() {
 
         try {
             while (!Thread.interrupted() && .isConnected()) {
 
                 if ( != null) {
                     Object object = .readObject();
                     handleObject(object);
                 }
                 else {
                     String str = .readLine();
                     if (str != null) {
                         handleString(str);
                     }
                     else {
                         break;
                     }
                 }
             }
         }
         catch (EOFException ex) {
             .debug("EOF received from connection");
         }
         catch (IOException ex) {
             if (!) {
                 .error("I/O error: " + ex.getMessage(), ex);
             }
         }
         catch (ClassNotFoundException ex) {
             .error("Class not found: " + ex.getMessage());
         }
         finally {
             try {
                 .close();
                 .remove(this);
             } catch (IOException ignore) {}
         }
     }
 
     private void handleObject(Object input) {
         try {
             if (input instanceof Map) {
                 Map map = (Mapinput;
                 String type = (Stringmap.get("stream");
                 if (type == null) {
                     .warn("Expected value for event type not found in map event provided to adapter");
                    return;
                }
                .getEPRuntime().sendEvent(maptype);
            }
            else {
                .getEPRuntime().sendEvent(input);
            }
        }
        catch (Throwable t) {
            .error("Unexpected exception encountered sending event " + input + " service '" +  + "' :" + t.getMessage(), t);
        }
    }
    private void handleString(String input) {
        if (input == null) {
            return;
        }
        try {
            Map<StringStringparameters = new HashMap<StringString>();
            WStringTokenizer tokenizer = new WStringTokenizer(input",");
            while (tokenizer.hasMoreTokens()) {
                String item = tokenizer.nextToken();
                int index = item.indexOf("=");
                if (index != -1) {
                    parameters.put(item.substring(0, index), item.substring(index+1, item.length()));
                }
            }
            String eventTypeName = parameters.get("stream");
            WriterCacheEntry cacheEntry = .get(eventTypeName);
            if (cacheEntry == null) {
                cacheEntry = makeCacheEntry(eventTypeName);
                .put(eventTypeNamecacheEntry);
            }
            if (cacheEntry == null) {
                return;
            }
            Object[] values = new Object[cacheEntry.getParsers().length];
            for (int i = 0; i < cacheEntry.getParsers().lengthi++) {
                String value = parameters.get(cacheEntry.getWritableProperties()[i].getPropertyName());
                if (value == null) {
                    continue;
                }
                values[i] = cacheEntry.getParsers()[i].parse(value);
            }
            EventBean theEvent = cacheEntry.getEventBeanManufacturer().make(values);
            .getEPRuntime().sendEvent(theEvent);
        }
        catch (Throwable t) {
            .error("Unexpected exception encountered sending event " + input + " service '" +  + "' :" + t.getMessage(), t);
        }
    }
    private WriterCacheEntry makeCacheEntry(String eventTypeName) {
        EventType eventType = .getEventAdapterService().getExistsTypeByName(eventTypeName);
        if (eventType == null) {
            .info("Event type by name '" + eventTypeName + "' not found.");
            return null;
        }
        if (!(eventType instanceof EventTypeSPI)) {
            .info("Event type by name '" + eventTypeName + "' is not writable.");
            return null;
        }
        EventTypeSPI eventTypeSPI = (EventTypeSPIeventType;
        Set<WriteablePropertyDescriptorwritablesSet = .getEventAdapterService().getWriteableProperties(eventTypeSPIfalse);
        List<WriteablePropertyDescriptorwritablePropertiesList = new ArrayList<WriteablePropertyDescriptor>();
        List<SimpleTypeParserparserList = new ArrayList<SimpleTypeParser>();
        for (WriteablePropertyDescriptor writableDesc : writablesSet)
        {
            SimpleTypeParser parser = SimpleTypeParserFactory.getParser(writableDesc.getType());
            if (parser == null) {
                .debug("No parser found for type '" + writableDesc.getType() + "'");
                continue;
            }
            writablePropertiesList.add(writableDesc);
            parserList.add(parser);
        }
        WriteablePropertyDescriptor[] writableProperties = writablePropertiesList.toArray(new WriteablePropertyDescriptor[writablePropertiesList.size()]);
        SimpleTypeParser[] parsers = parserList.toArray(new SimpleTypeParser[parserList.size()]);
        EventBeanManufacturer eventBeanManufacturer;
        try {
            eventBeanManufacturer = .getEventAdapterService().getManufacturer(eventTypewritableProperties.getEngineImportService(), false);
        }
        catch (EventBeanManufactureException e) {
            .info("Unable to create manufacturer for event type: " + e.getMessage(), e);
            return null;
        }
        return new WriterCacheEntry(eventBeanManufacturerwritablePropertiesparsers);
    }
New to GrepCode? Check out our FAQ X