Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package upenn.junto.algorithm.parallel;

Copyright 2011 Partha Pratim Talukdar Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 
 
 
 import  org.apache.hadoop.fs.Path;
 import  org.apache.hadoop.io.LongWritable;
 import  org.apache.hadoop.io.Text;
 import  org.apache.hadoop.mapred.FileInputFormat;
 import  org.apache.hadoop.mapred.FileOutputFormat;
 import  org.apache.hadoop.mapred.JobClient;
 import  org.apache.hadoop.mapred.JobConf;
 import  org.apache.hadoop.mapred.MapReduceBase;
 import  org.apache.hadoop.mapred.Mapper;
 import  org.apache.hadoop.mapred.OutputCollector;
 import  org.apache.hadoop.mapred.Reducer;
 import  org.apache.hadoop.mapred.Reporter;
 import  org.apache.hadoop.mapred.TextInputFormat;
 import  org.apache.hadoop.mapred.TextOutputFormat;
 
 
 public class LP_ZGL_Hadoop {
 	
    private static String _kDelim = "\t";
 	
    public static class LP_ZGL_Map extends MapReduceBase 
    			implements Mapper<LongWritable, Text, Text, Text> {
      private Text word = new Text();
 
      public void map(LongWritable key, Text value,
     		 		 OutputCollector<Text, Text> output,
     		 		 Reporter reporterthrows IOException {
        /////
        // Constructing the vertex from the string representation
        /////
        String line = value.toString();
  
        // id gold_label injected_labels estimated_labels neighbors rw_probabilities 
        String[] fields = line.split();       
        TObjectDoubleHashMap neighbors = CollectionUtil.String2Map(fields[4]);
        
        boolean isSeedNode = fields[2].length() > 0 ? true : false;
        
        // If the current node is a seed node but there is no
        // estimate label information yet, then transfer the seed label
        // to the estimated label distribution. Ideally, this is likely
        // to be used in the map of the very first iteration.
        if (isSeedNode && fields[3].length() == 0) {
     	   fields[3] = fields[2];
        }
 
        // Send two types of messages:
        //   -- self messages which will store the injection labels and
        //        random walk probabilities.
        //   -- messages to neighbors about current estimated scores
        //        of the node.
        //
        // message to self
        output.collect(new Text(fields[0]), new Text(line));
 
        // message to neighbors
        TObjectDoubleIterator neighIterator = neighbors.iterator();
        while (neighIterator.hasNext()) {
     	   neighIterator.advance();
     	   
     	   // message (neighbor_node, current_node + DELIM + curr_node_label_scores
     	   output.collect(new Text((StringneighIterator.key()),
     			   		  new Text(fields[0] +  + fields[3]));
        }
      }
    }
 	 	
    public static class LP_ZGL_Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
 	 private static double mu1;
 	 private static double mu2;
 	 private static int keepTopKLabels;
	 
	 public void configure(JobConf conf) {
		  = Double.parseDouble(conf.get("mu1"));
		  = Double.parseDouble(conf.get("mu2"));
		  = Integer.parseInt(conf.get("keepTopKLabels"));
	 }
	 
     public void reduce(Text keyIterator<Text> values,
    		 			OutputCollector<Text, Text> output, Reporter reporterthrows IOException {    	 
       // new scores estimated for the current node
       TObjectDoubleHashMap newEstimatedScores = new TObjectDoubleHashMap();
       
       // set to true only if the message sent to itself is found.
       boolean isSelfMessageFound = false;
       
       String vertexId = key.toString();
       String vertexString = "";
       
       TObjectDoubleHashMap neighbors = null;
       TObjectDoubleHashMap randWalkProbs = null;
       
       HashMap<StringStringneighScores =
    	   				new HashMap<StringString>();
       
       int totalMessagesReceived = 0;
       boolean isSeedNode = false;
       
       // iterate over all the messages received at the node
       while (values.hasNext()) {
    	   ++totalMessagesReceived;
    	   String val = values.next().toString();
    	   String[] fields = val.split();
    	   
    	   // System.out.println("src: " + fields[0] + " dest: " + vertexId +
    	   //		   "MESSAGE>>" + val + "<<");
    	   // self-message check
    	   if (vertexId.equals(fields[0])) {
    		   isSelfMessageFound = true;
    		   vertexString = val;
    		   
    		   // System.out.println("Reduce: " + vertexId + " " + val + " " + fields.length);
    		   TObjectDoubleHashMap injLabels = CollectionUtil.String2Map(fields[2]);
    		   neighbors = CollectionUtil.String2Map(neighborsfields[4]);
    		   randWalkProbs = CollectionUtil.String2Map(fields[5]);
    		   
    		   if (injLabels.size() > 0) {
    			   isSeedNode = true;
	    		   // add injected labels to the estimated scores.
	    		   ProbUtil.AddScores(newEstimatedScores,
	    				   		   	  injLabels);
    		   }
    	   } else {
    		   // an empty second field represents that the
    		   // neighbor has no valid label assignment yet.
    		   if (fields.length > 1) {
    			   neighScores.put(fields[0], fields[1]);
    		   }
    	   }
       }
       // terminate if message from self is not received.
       if (!isSelfMessageFound) {
    	   throw new RuntimeException("Self message not received for node " + vertexId);
       }
       
       // Add neighbor label scores to current node's label estimates only if the
       // current node is not a seed node. In case of seed nodes, clamp back the 
       // injected label distribution, which is already done above when processing
       // the self messages
       if (!isSeedNode) {
	       // collect neighbors label distributions and create one single
	       // label distribution
	       TObjectDoubleHashMap weightedNeigLablDist = new TObjectDoubleHashMap();
	       Iterator<StringneighIter = neighScores.keySet().iterator();
	       while (neighIter.hasNext()) {
	    	   String neighName = neighIter.next();
	    	   ProbUtil.AddScores(weightedNeigLablDist// newEstimatedScores,
	    			    	   		 * neighbors.get(neighName),
	    			    	   		CollectionUtil.String2Map(neighScores.get(neighName)));
	       }
	       ProbUtil.Normalize(weightedNeigLablDist);
	       
	       // now add the collective neighbor label distribution to
	       // the estimate of the current node's labels.
	       ProbUtil.AddScores(newEstimatedScores,
	    		              1.0, weightedNeigLablDist);
       }
       
       // normalize the scores
       ProbUtil.Normalize(newEstimatedScores);
       
       // now reconstruct the vertex representation (with the new estimated scores)
       // so that the output from the current mapper can be used as input in next
       // iteration's mapper.
       String[] vertexFields = vertexString.split();
       
       // replace estimated scores with the new ones.
       String[] newVertexFields = new String[vertexFields.length - 1];
       for (int i = 1; i < vertexFields.length; ++i) {
    	   newVertexFields[i - 1] = vertexFields[i]; 
       }
       newVertexFields[2] = CollectionUtil.Map2String(newEstimatedScores);
       output.collect(keynew Text(CollectionUtil.Join(newVertexFields)));
     }
   }
 	
   public static void main(String[] argsthrows Exception {
	 Hashtable config = ConfigReader.read_config(args);
	 
     String baseInputFilePat = Defaults.GetValueOrDie(config"hdfs_input_pattern");
     String baseOutputFilePat = Defaults.GetValueOrDie(config"hdfs_output_base");
     int numIterations = Integer.parseInt(Defaults.GetValueOrDie(config"iters")); 
	   
     String currInputFilePat = baseInputFilePat;
     String currOutputFilePat = "";
     for (int iter = 1; iter <= numIterations; ++iter) {
	     JobConf conf = new JobConf(LP_ZGL_Hadoop.class);
	     conf.setJobName("lp_zgl_hadoop");
	     conf.setOutputKeyClass(Text.class);
	     conf.setOutputValueClass(Text.class);
	     conf.setMapperClass(LP_ZGL_Map.class);
	     // conf.setCombinerClass(LP_ZGL_Reduce.class);
	     conf.setReducerClass(LP_ZGL_Reduce.class);
	     conf.setInputFormat(TextInputFormat.class);
	     conf.setOutputFormat(TextOutputFormat.class);
	     
	     // hyperparameters
	     conf.set("mu1", Defaults.GetValueOrDie(config"mu1"));
	     conf.set("mu2", Defaults.GetValueOrDie(config"mu2"));
	     conf.set("keepTopKLabels",
	    		  Defaults.GetValueOrDefault((Stringconfig.get("keep_top_k_labels"),
	    				  					 Integer.toString(.)));
    	 if (iter > 1) {
    		 // output from last iteration is the input for current iteration
    		 currInputFilePat = currOutputFilePat + "/*";
    	 }
    	 FileInputFormat.setInputPaths(confnew Path(currInputFilePat));
 
    	 currOutputFilePat = baseOutputFilePat + "_" + iter;
	     FileOutputFormat.setOutputPath(confnew Path(currOutputFilePat));
    	 JobClient.runJob(conf);
     }
   }
New to GrepCode? Check out our FAQ X