Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright [2013-2014] eBay Software Foundation
   *  
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *    http://www.apache.org/licenses/LICENSE-2.0
   *  
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package ml.shifu.guagua.mapreduce.example.kmeans;
 
 import java.util.List;
 
 
 import  org.apache.hadoop.io.LongWritable;
 import  org.apache.hadoop.io.Text;
 
KMeansWorker re-computes each record tagged with new category.

To calculate new k centers in master, KMeansWorker also help to accumulate worker info for new k centers by using sum list and count list.

 
 public class KMeansWorker
         extends
 
     private static final Logger LOG = LoggerFactory.getLogger(KMeansWorker.class);

    
Data list of current worker cached in memory.
 
     private List<TaggedRecorddataList;

    
K categories pre-defined
 
     private int k;

    
Columns (dimensions) for each record
 
     private int c;

    
Separator to split data for each record
 
     private String separator;

    
Reading input line by line
 
     @Override
     public void initRecordReader(GuaguaFileSplit fileSplitthrows IOException {
         this.setRecordReader(new GuaguaLineRecordReader());
         this.getRecordReader().initialize(fileSplit);
     }
 
     @Override
     public void init(WorkerContext<KMeansMasterParamsKMeansWorkerParamsworkerContext) {
         this. = Integer.parseInt(workerContext.getProps().getProperty(.));
         this. = Integer.parseInt(workerContext.getProps().getProperty(.));
         this. = workerContext.getProps().getProperty(.);
         this. = new LinkedList<TaggedRecord>();
         // just set into worker context for data output interceptor usage.
         workerContext.setAttachment(this.);
     }

    
Using the new k centers to tag each record with index denoting the record belongs to which category.
 
     @Override
         if(workerContext.getCurrentIteration() == 1) {
             return doFirstIteration(workerContext);
         } else {
             return doOtherIterations(workerContext);
         }
     }
 
        KMeansWorkerParams workerResult = new KMeansWorkerParams();
        workerResult.setK(this.);
        workerResult.setC(this.);
        workerResult.setFirstIteration(true);
        int dataSize = this..size();
        List<double[]> pointList = new ArrayList<double[]>(dataSize);
        if(this. >= dataSize) {
            for(TaggedRecord recordthis.) {
                pointList.add(toDouble(record));
            }
        } else {
            int m = dataSize / this.;
            for(int i = 0; i < this.i++) {
                pointList.add(toDouble(this..get(m * i)));
            }
        }
        workerResult.setPointList(pointList);
        return workerResult;
    }
    private double[] toDouble(TaggedRecord record) {
        Double[] data = record.getRecord();
        double[] newData = new double[data.length];
        int i = 0;
        for(Double ddata) {
            newData[i] = d == null ? 0d : d;
        }
        return newData;
    }
        // new centers used in this iteration.
        List<double[]> centers = workerContext.getLastMasterResult().getPointList();
        .debug("Initial centers:%s", (centers));
        // sum list and count list as worker result sent to master for global accumulation.
        List<double[]> sumList = new LinkedList<double[]>();
        List<IntegercountList = new LinkedList<Integer>();
        // Initializing sum list and count list.
        for(int i = 0; i < this.i++) {
            sumList.add(new double[this.]);
            countList.add(0);
        }
        for(TaggedRecord recordthis.) {
            int index = findClosedCenter(record.getRecord(), centers);
            record.setTag(index);
            countList.set(indexcountList.get(index) + 1);
            double[] sum = sumList.get(index);
            for(int i = 0; i < this.i++) {
                sum[i] += record.getRecord()[i] == null ? 0d : record.getRecord()[i].doubleValue();
            }
        }
        .debug("sumList:%s", (sumList));
        .debug("countList:%s"countList);
        KMeansWorkerParams workerResult = new KMeansWorkerParams();
        workerResult.setK(this.);
        workerResult.setC(this.);
        workerResult.setFirstIteration(false);
        workerResult.setPointList(sumList);
        workerResult.setCountList(countList);
        return workerResult;
    }

    
Finding closed center from all the k centers. Return the index of finding center.
    private int findClosedCenter(Double[] recordList<double[]> centers) {
        int index = 0;
        double minDist = distance(recordcenters.get(0));
        for(int i = 1; i < centers.size(); i++) {
            double distance = distance(recordcenters.get(i));
            if(distance < minDist) {
                index = i;
            }
        }
        return index;
    }

    
Calculate cosine distance for two points.
    // TODO cache sqW2, no need re-computing
    private double distance(Double[] recorddouble[] center) {
        double denominator = 0;
        for(int i = 0; i < center.lengthi++) {
            denominator += record[i] == null ? 0d : (record[i] * center[i]);
        }
        double sqW1 = 0, sqW2 = 0;
        for(int i = 0; i < center.lengthi++) {
            sqW1 += record[i] == null ? 0d : (record[i] * record[i]);
            sqW2 += (center[i] * center[i]);
        }
        return denominator / (Math.sqrt(sqW1) * Math.sqrt(sqW2));
    }

    
Loading data into memory. any invalid data will be set to null.
    @Override
    public void load(GuaguaWritableAdapter<LongWritable> currentKeyGuaguaWritableAdapter<Text> currentValue,
            WorkerContext<KMeansMasterParamsKMeansWorkerParamsworkerContext) {
        String line = currentValue.getWritable().toString();
        Double[] record = new Double[this.];
        int i = 0;
        for(String input: Splitter.on(this.).split(line)) {
            try {
                record[i++] = Double.parseDouble(input);
            } catch (NumberFormatException e) {
                // use null to replace in-valid number
                record[i++] = null;
            }
        }
        this..add(new TaggedRecord(record));
    }
New to GrepCode? Check out our FAQ X