Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   *    This program is free software; you can redistribute it and/or modify
   *    it under the terms of the GNU General Public License as published by
   *    the Free Software Foundation; either version 2 of the License, or
   *    (at your option) any later version.
   *
   *    This program is distributed in the hope that it will be useful,
   *    but WITHOUT ANY WARRANTY; without even the implied warranty of
   *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  *    GNU General Public License for more details.
  *
  *    You should have received a copy of the GNU General Public License
  *    along with this program; if not, write to the Free Software
  *    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  */
 
 /*
  *    ArffOutput.java
  *    Copyright 2007 Pentaho Corporation.  All rights reserved. 
  *
  */
 
 package org.pentaho.di.arff;
 
 
 import  org.pentaho.di.core.Const;
 import  org.pentaho.di.core.ResultFile;
 import  org.pentaho.di.core.exception.KettleException;
 import  org.pentaho.di.core.row.ValueMetaInterface;
 import  org.pentaho.di.core.vfs.KettleVFS;
 import  org.pentaho.di.trans.Trans;
 import  org.pentaho.di.trans.TransMeta;
 import  org.pentaho.di.trans.step.BaseStep;
 import  org.pentaho.di.trans.step.StepDataInterface;
 import  org.pentaho.di.trans.step.StepInterface;
 import  org.pentaho.di.trans.step.StepMeta;
 import  org.pentaho.di.trans.step.StepMetaInterface;
 import  org.pentaho.dm.commons.ArffMeta;

Outputs Kettle rows to a file in WEKA's ARFF (Attribute Relation File Format) format.

Kettle String fields are converted to ARFF nominal, Number and Integer are converted to ARFF numeric, and Date remains the same.

Author(s):
Mark Hall (mhall{[at]}pentaho.org)
Version:
1.0
 
 public class ArffOutput extends BaseStep implements StepInterface {
 
   private ArffOutputMeta m_meta;
   private ArffOutputData m_data;
 
   private final TransMeta m_transMeta;

  
Creates a new ArffOutput instance.

Parameters:
stepMeta holds the step's meta data
stepDataInterface holds the step's temporary data
copyNr the number assigned to the step
transMeta meta data for the transformation
trans a Trans value
 
   public ArffOutput(StepMeta stepMeta, StepDataInterface stepDataInterface,
       int copyNr, TransMeta transMeta, Trans trans) {
     super(stepMetastepDataInterfacecopyNrtransMetatrans);
      = transMeta;
   }

  
Process an incoming row of data.

Parameters:
smi a StepMetaInterface value
sdi a StepDataInterface value
Returns:
a boolean value
Throws:
KettleException if an error occurs
 
   @Override
   public synchronized boolean processRow(StepMetaInterface smi,
       StepDataInterface sdithrows KettleException {
 
      = (ArffOutputMetasmi;
      = (ArffOutputDatasdi;
 
     Object[] r = getRow();
 
     if (r == null) {
       // no more input
 
       // need to read csv file and append to header
 
       // Add this to the result file names
       ResultFile resultFile = new ResultFile(ResultFile.FILE_TYPE_GENERAL,
          KettleVFS.getFileObject(.getFileName(), getTransMeta()),
          getTransMeta().getName(), getStepname());
      if (resultFile != null) {
        addResultFile(resultFile);
      }
      setOutputDone();
      return false;
    }
    if (first) {
      first = false;
      // If we have no arff meta data, then assume all fields
      // are to be written
      if (.getOutputFields() == null
          || .getOutputFields().length == 0) {
      }
      // Determine the output format
      .getFields(.getOutputRowMeta(), getStepname(), nullnull,
          this);
      // now check to see if the incoming rows have these
      // fields and set up indexes. (pre-configured step might now have a new
      // incoming stream that is not compatible with its
      // configuration
      int[] outputFieldIndexes = new int[.getOutputFields().length];
      ArffMeta[] arffMeta = .getOutputFields();
      for (int i = 0; i < arffMeta.length; i++) {
        if (arffMeta[i] != null) {
          outputFieldIndexes[i] = getInputRowMeta().indexOfValue(
              arffMeta[i].getFieldName());
          // Do we still have this field coming in??
          if (outputFieldIndexes[i] < 0) {
            throw new KettleException("Field [" + arffMeta[i].getFieldName()
                + "] couldn't be found in the " + "input stream!");
          }
        } else {
          // OK, this particular entry had no corresponding arff
          // type
          outputFieldIndexes[i] = -1;
        }
      }
      // check the weight field (if set)
      if (!Const.isEmpty(.getWeightFieldName())) {
        int weightFieldIndex = getInputRowMeta().indexOfValue(
            .getWeightFieldName());
        // check to see if its still in the incoming stream
        if (weightFieldIndex < 0) {
          throw new KettleException("Field for setting instance weights ["
              + .getWeightFieldName() + "] couldn't be found in the "
              + "input stream!");
        }
        // now check the type (can only use numbers to set weights!)
        ValueMetaInterface v = getInputRowMeta().getValueMeta(weightFieldIndex);
        if (!v.isNumeric()) {
          throw new KettleException(
              "Field for setting instance weights must be numeric!");
        }
        .setWeightFieldIndex(weightFieldIndex);
      }
      .setOutputFieldIndexes(outputFieldIndexesarffMeta);
      if (Const.isEmpty(.getFileName())) {
        throw new KettleException("No file name given to write to!");
      }
      if (Const.isEmpty(.getRelationName())) {
        throw new KettleException("No relation name given!");
      }
      try {
        String fileName = .getFileName();
        if (!(fileName.endsWith(".arff") || fileName.endsWith(".ARFF"))) {
          fileName = fileName + ".arff";
        }
        String modName = .environmentSubstitute(fileName);
        .openFiles(modName);
      } catch (IOException ex) {
        throw new KettleException("Unable to open file(s)..."ex);
      }
    }
    try {
      .writeRow(r.getEncoding());
    } catch (IOException ex) {
      throw new KettleException("Problem writing a row to the file..."ex);
    }
    // in case the data is to go further
    putRow(.getOutputRowMeta(), r);
    if (log.isRowLevel()) {
      log.logRowlevel(toString(), "Read row #" + linesRead + " : " + r);
    }
    if (checkFeedback(linesRead)) {
      logBasic("Linenr " + linesRead);
    }
    return true;
  }

  
Initialize the step.

Parameters:
smi a StepMetaInterface value
sdi a StepDataInterface value
Returns:
a boolean value
  public boolean init(StepMetaInterface smi, StepDataInterface sdi) {
     = (ArffOutputMetasmi;
     = (ArffOutputDatasdi;
    if (super.init(smisdi)) {
      try {
        .setHasEncoding(!Const.isEmpty(.getEncoding()));
        if (.getHasEncoding()) {
          if (!Const.isEmpty(.getNewLine())) {
                .getEncoding()));
          }
          // m_data.setBinaryEnclosure("'".getBytes(m_meta.getEncoding()));
        } else {
          if (!Const.isEmpty(.getNewLine())) {
            .setBinaryNewLine(.getNewLine().getBytes());
          }
          .setBinarySeparator(",".getBytes());
          .setBinaryMissing("?".getBytes());
          // m_data.setBinaryEnclosure("'".getBytes());
        }
      } catch (UnsupportedEncodingException ex) {
        logError("Encoding problem: " + ex.toString());
        logError(Const.getStackTracker(ex));
        return false;
      }
      return true;
    }
    return false;
  }

  
Clean up.

Parameters:
smi a StepMetaInterface value
sdi a StepDataInterface value
  public void dispose(StepMetaInterface smi, StepDataInterface sdi) {
     = (ArffOutputMetasmi;
     = (ArffOutputDatasdi;
    try {
      .closeFiles();
    } catch (IOException ex) {
      logError("Exception trying to close file: " + ex.toString());
      setErrors(1);
    }
    super.dispose(smisdi);
  }

  
Run is where the action happens!
  public void run() {
    logBasic("Starting to run...");
    try {
      while (processRow() && !isStopped())
        ;
    } catch (Exception e) {
      logError("Unexpected error : " + e.toString());
      logError(Const.getStackTracker(e));
      setErrors(1);
      stopAll();
    } finally {
      dispose();
      logBasic("Finished, processing " + linesRead + " rows");
      markStop();
    }
  }
New to GrepCode? Check out our FAQ X