Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /* This Source Code Form is subject to the terms of the Mozilla Public
   * License, v. 2.0. If a copy of the MPL was not distributed with this file,
   * You can obtain one at http://mozilla.org/MPL/2.0/. */
  
  package pt.webdetails.cda.dataaccess;
  
  import java.util.List;
 
 import  org.apache.commons.logging.Log;
 import  org.apache.commons.logging.LogFactory;
 import  org.dom4j.Element;
 import  org.pentaho.di.core.util.StringUtil;
 import  org.pentaho.reporting.libraries.base.util.StringUtils;
 
 import  plugins.org.pentaho.di.robochef.kettle.DynamicTransConfig;
 import  plugins.org.pentaho.di.robochef.kettle.DynamicTransConfig.EntryType;
 import  plugins.org.pentaho.di.robochef.kettle.DynamicTransMetaConfig;
 import  plugins.org.pentaho.di.robochef.kettle.DynamicTransMetaConfig.Type;
 import  plugins.org.pentaho.di.robochef.kettle.DynamicTransformation;
 import  plugins.org.pentaho.di.robochef.kettle.RowProductionManager;
 import  plugins.org.pentaho.di.robochef.kettle.TableModelInput;
Created by IntelliJ IDEA. User: pedro Date: Feb 16, 2010 Time: 11:38:19 PM
 
 public class JoinCompoundDataAccess extends CompoundDataAccess implements RowProductionManager
 {
 
   private static final Log logger = LogFactory.getLog(JoinCompoundDataAccess.class);
   private static final String TYPE = "join";
   private String leftId;
   private String rightId;
   private String[] leftKeys;
   private String[] rightKeys;
   private ExecutorService executorService = Executors.newCachedThreadPool();
   private static final long DEFAULT_ROW_PRODUCTION_TIMEOUT = 120;
   
   private static int DEFAULT_MAX_ROWS_VALUE_TYPE_SEARCH = 500;//max nbr of rows to search for value
   public static final String MAX_ROWS_VALUE_TYPE_SEARCH_PROPERTY = "pt.webdetails.cda.TypeSearchMaxRows";
 
   public JoinCompoundDataAccess()
   {
   }
 
 
   public JoinCompoundDataAccess(final Element element)
   {
     super(element);
 
     Element left = (Element) element.selectSingleNode("Left");
     Element right = (Element) element.selectSingleNode("Right");
 
      = left.attributeValue("id");
      = right.attributeValue("id");
 
      = left.attributeValue("keys").split(",");
      = right.attributeValue("keys").split(",");
   }
 
 
   public String getType()
   {
     return ;
   }
 
 
   protected TableModel queryDataSource(final QueryOptions queryOptionsthrows QueryException
   {
     TableModel output = null;
     .clear();
 
     try
     {
       QueryOptions croppedOptions = (QueryOptionsqueryOptions.clone();
       croppedOptions.setSortBy(new ArrayList<String>());
       croppedOptions.setPageSize(0);
       croppedOptions.setPageStart(0);
       final TableModel tableModelA = this.getCdaSettings().getDataAccess().doQuery(croppedOptions);
       final TableModel tableModelB = this.getCdaSettings().getDataAccess().doQuery(croppedOptions);
 
       if (tableModelA.getColumnCount() == 0 || tableModelB.getColumnCount() == 0)
      {
        return new MetadataTableModel(new String[0], new Class[0], 0);
      }
      String[] leftColumnNames = new String[.];
      for (int i = 0; i < .i++)
      {
        leftColumnNames[i] = tableModelA.getColumnName(Integer.parseInt([i]));
      }
      String[] rightColumnNames = new String[.];
      for (int i = 0; i < .i++)
      {
        rightColumnNames[i] = tableModelB.getColumnName(Integer.parseInt([i]));
      }
      String sortLeftXML = getSortXmlStep("sortLeft"leftColumnNames);
      String sortRightXML = getSortXmlStep("sortRight"rightColumnNames);
      StringBuilder mergeJoinXML = new StringBuilder(
              "<step><name>mergeJoin</name><type>MergeJoin</type><join_type>FULL OUTER</join_type><copies>1</copies><step1>sortLeft</step1><step2>sortRight</step2>");
      mergeJoinXML.append("<keys_1>");
      for (int i = 0; i < .i++)
      {
        mergeJoinXML.append("<key>").append(leftColumnNames[i]).append("</key>");
      }
      mergeJoinXML.append("</keys_1><keys_2>");
      for (int i = 0; i < .i++)
      {
        mergeJoinXML.append("<key>").append(rightColumnNames[i]).append("</key>");
      }
      mergeJoinXML.append("</keys_2><copies>1</copies></step>");
      DynamicTransMetaConfig transMetaConfig = new DynamicTransMetaConfig(Type.EMPTY, "JoinCompoundData"nullnull);
      DynamicTransConfig transConfig = new DynamicTransConfig();
      String input1Xml = getInjectorStepXmlString("input1"tableModelA);
      String input2Xml = getInjectorStepXmlString("input2"tableModelB);
      transConfig.addConfigEntry(EntryType.STEP, "input1"input1Xml);
      transConfig.addConfigEntry(EntryType.STEP, "input2"input2Xml);
      transConfig.addConfigEntry(EntryType.STEP, "sortLeft"sortLeftXML);
      transConfig.addConfigEntry(EntryType.STEP, "sortRight"sortRightXML);
      transConfig.addConfigEntry(EntryType.STEP, "mergeJoin"mergeJoinXML.toString());
      transConfig.addConfigEntry(EntryType.HOP, "input1""sortLeft");
      transConfig.addConfigEntry(EntryType.HOP, "input2""sortRight");
      transConfig.addConfigEntry(EntryType.HOP, "sortLeft""mergeJoin");
      transConfig.addConfigEntry(EntryType.HOP, "sortRight""mergeJoin");
      TableModelInput input1 = new TableModelInput();
      transConfig.addInput("input1"input1);
      .add(input1.getCallableRowProducer(tableModelAtrue));
      TableModelInput input2 = new TableModelInput();
      transConfig.addInput("input2"input2);
      .add(input2.getCallableRowProducer(tableModelBtrue));
      RowMetaToTableModel outputListener = new RowMetaToTableModel(falsetruefalse);
      transConfig.addOutput("mergeJoin"outputListener);
      DynamicTransformation trans = new DynamicTransformation(transConfigtransMetaConfig);
      trans.executeCheckedSuccess(nullnullthis);
      .info(trans.getReadWriteThroughput());
      output = outputListener.getRowsWritten();
    }
    catch (UnknownDataAccessException e)
    {
      throw new QueryException("Unknown Data access in CompoundDataAccess "e);
    }
    catch (Exception e)
    {
      throw new QueryException("Exception during query "e);
    }
    return output;
  }
  private String getSortXmlStep(final String namefinal String[] columnNames)
  {
    StringBuilder sortXML = new StringBuilder(
            "  <step>\n"
            + "    <name>" + name + "</name>\n"
            + "    <type>SortRows</type>\n"
            + "    <description/>\n"
            + "    <distribute>Y</distribute>\n"
            + "    <copies>1</copies>\n"
            + "         <partitioning>\n"
            + "           <method>none</method>\n"
            + "           <schema_name/>\n"
            + "           </partitioning>\n"
            + "      <directory>%%java.io.tmpdir%%</directory>\n"
            + "      <prefix>out</prefix>\n"
            + "      <sort_size/>\n"
            + "      <free_memory>25</free_memory>\n"
            + "      <compress>N</compress>\n"
            + "      <compress_variable/>\n"
            + "      <unique_rows>N</unique_rows>\n"
            + "    <fields>\n");
    for (int i = 0; i < columnNames.lengthi++)
    {
      sortXML.append("      <field>\n"
              + "        <name>" + columnNames[i] + "</name>\n"
              + "        <ascending>Y</ascending>\n"
              + "        <case_sensitive>N</case_sensitive>\n"
              + "      </field>\n");
    }
    sortXML.append("    </fields>\n"
            + "     <cluster_schema/>\n"
            + " <remotesteps>   <input>   </input>   <output>   </output> </remotesteps>    <GUI>\n"
            + "      <xloc>615</xloc>\n"
            + "      <yloc>188</yloc>\n"
            + "      <draw>Y</draw>\n"
            + "      </GUI>\n"
            + "    </step>\n");
    return sortXML.toString();
  }
  {
    StringBuilder xml = new StringBuilder("<step><name>");
    Class<?> columnClass;
    xml.append(name).append("</name><type>Injector</type>");
    int maxRowsTypeSearch = getMaxTypeSearchRowCount(t);
    
    // If we have metadata information, put it here
    if (t.getColumnCount() > 0)
    {
      xml.append("<fields>");
      for (int i = 0; i < t.getColumnCount(); i++)
      {
        /* The proper way to get the column class is from t.getColumnClass().
         * However, this always returns Object when the column at hand is a
         * Calculated Column -- and we have no idea what to do with Objects.
         * Therefore, we try to infer the correct type from the getClass() of
         * the chosen column, first row, as that can't be worse than trying
         * to deal with Object.
         */
        columnClass = t.getColumnClass(i);
        if (columnClass.equals(Object.class) && t.getRowCount() > 0){
          for(int j = 0; j < maxRowsTypeSearchj++){
            if(t.getValueAt(ji) != null){
              columnClass = t.getValueAt(ji).getClass();
              break;
            }
          }
        }
        xml.append("<field>");
        xml.append("<name>" + t.getColumnName(i) + "</name>");
        xml.append("<type>" + getKettleTypeFromColumnClass(columnClass) + "</type>");
        xml.append("<length>-1</length><precision>-1</precision></field>");
      }
      xml.append("</fields>");
    }
    
    xml.append("<copies>1</copies>");
    xml.append("</step>");
    return xml.toString();
  }
  private int getMaxTypeSearchRowCount(TableModel t) {
    int maxRowsTypeSearch = ;
    String maxRowsTypeSearchProperty = CdaBoot.getInstance().getGlobalConfig().getConfigProperty();
    if(!StringUtils.isEmpty(maxRowsTypeSearchProperty)){
      try{
        maxRowsTypeSearch = Integer.parseInt(maxRowsTypeSearchProperty);
      }catch (NumberFormatException nfe){
        .error( + ":" + maxRowsTypeSearchProperty + " not a valid integer.");
      }
    }
    if(maxRowsTypeSearch <= 0){
      maxRowsTypeSearch = t.getRowCount();
    }
    else {
      maxRowsTypeSearch = Math.min(maxRowsTypeSearcht.getRowCount());
    }
    return maxRowsTypeSearch;
  }
  /*
   * This method returns the correct kettle type from the column class. Possible values:
   *  String
   *  Date
   *  Boolean
   *  Integer
   *  BigNumber
   *  Serializable
   *  Binary
   *  
   */
  {
    if (clazz == String.class)
    {
      return "String";
    }
    else if (clazz == Double.class)
    {
      return "Number";
    }
    else if (clazz == java.util.Date.class)
    {
      return "Date";
    }
    else if (clazz == Long.class || clazz == Integer.class)
    {
      return "Integer";
    }
    else if (clazz == java.math.BigDecimal.class)
    {
      return "BigNumber";
    }
    else if (clazz == Boolean.class )
    {
      return "Boolean";
    }
    else
    {
      throw new IllegalArgumentException("Unexpected class " + clazz + ", can't convert to kettle type");
    }
  }
  public void startRowProduction()
  {
    String timeoutStr = CdaBoot.getInstance().getGlobalConfig().getConfigProperty("pt.webdetails.cda.DefaultRowProductionTimeout");
    long timeout = StringUtil.isEmpty(timeoutStr) ?  : Long.parseLong(timeoutStr);
    String unitStr = CdaBoot.getInstance().getGlobalConfig().getConfigProperty("pt.webdetails.cda.DefaultRowProductionTimeoutTimeUnit");
    TimeUnit unit = StringUtil.isEmpty(unitStr) ?  : TimeUnit.valueOf(unitStr);
    startRowProduction(timeoutunit);
  }
  public void startRowProduction(long timeoutTimeUnit unit)
  {
    try
    {
      List<Future<Boolean>> results = .invokeAll(timeoutunit);
      for (Future<Booleanresult : results)
      {
        result.get();
      }
    } catch (InterruptedException e) {
      .error("Row production interrupted"e);
    } catch (ExecutionException e) {
      .error("Problem starting row production"e);
    }
  }
  /*
  public static ArrayList<DataAccessConnectionDescriptor> getDataAccessConnectionDescriptors() {
  ArrayList<DataAccessConnectionDescriptor> descriptor = new ArrayList<DataAccessConnectionDescriptor>();
  DataAccessConnectionDescriptor proto = new DataAccessConnectionDescriptor();
  proto.addDataAccessProperty(new PropertyDescriptor("Left",PropertyDescriptor.TYPE.STRING,PropertyDescriptor.SOURCE.DATAACCESS));
  proto.addDataAccessProperty(new PropertyDescriptor("Right",PropertyDescriptor.TYPE.STRING,PropertyDescriptor.SOURCE.DATAACCESS));
  descriptor.add(proto);
  return descriptor;
  }*/
  {
    return .;
  }
  {
    return properties;
  }
  public String getLeftId() {
	  return ;
  }
  public void setLeftId(String leftId) {
	  this. = leftId;
  }
  public String getRightId() {
	  return ;
  }
  public String[] getLeftKeys() {
	  return ;
  }
  public String[] getRightKeys() {
	  return ;
  }
  public void setQuery(String query) {
	  // Do nothing
  }
New to GrepCode? Check out our FAQ X