hadoop-Mapper的示例分析

这篇文章将为大家详细讲解有关hadoop-Mapper的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

成都创新互联公司专注于宝鸡企业网站建设,响应式网站开发,成都商城网站开发。宝鸡网站建设公司,为宝鸡等地区提供建站服务。全流程定制网站设计,专业设计,全程项目跟踪,成都创新互联公司专业和态度为您提供的服务

* Licensed to the Apache Software Foundation (ASF) under one

package org.apache.hadoop.mapreduce;

import java.io.IOException;

/** 
 * Maps input key/value pairs to a set of intermediate key/value pairs.  
 * 
 * 

Maps are the individual tasks which transform input records into a   * intermediate records. The transformed intermediate records need not be of   * the same type as the input records. A given input pair may map to zero or   * many output pairs.

   *   * 

The Hadoop Map-Reduce framework spawns one map task for each   * {@link InputSplit} generated by the {@link InputFormat} for the job.  * Mapper implementations can access the {@link Configuration} for   * the job via the {@link JobContext#getConfiguration()}.  *   * 

The framework first calls   * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by  * {@link #map(Object, Object, Context)}   * for each key/value pair in the InputSplit. Finally   * {@link #cleanup(Context)} is called.

 *   * 

All intermediate values associated with a given output key are   * subsequently grouped by the framework, and passed to a {@link Reducer} to    * determine the final output. Users can control the sorting and grouping by   * specifying two key {@link RawComparator} classes.

 *  * 

The Mapper outputs are partitioned per   * Reducer. Users can control which keys (and hence records) go to   * which Reducer by implementing a custom {@link Partitioner}.  *   * 

Users can optionally specify a combiner, via   * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the   * intermediate outputs, which helps to cut down the amount of data transferred   * from the Mapper to the Reducer.  *   * 

Applications can specify if and how the intermediate  * outputs are to be compressed and which {@link CompressionCodec}s are to be  * used via the Configuration.

 *    * 

If the job has zero  * reduces then the output of the Mapper is directly written  * to the {@link OutputFormat} without sorting by keys.

 *   * 

Example:

 * 

 * public class TokenCounterMapper 
 *     extends Mapper{
 *    
 *   private final static IntWritable one = new IntWritable(1);
 *   private Text word = new Text();
 *   
 *   public void map(Object key, Text value, Context context) throws IOException {
 *     StringTokenizer itr = new StringTokenizer(value.toString());
 *     while (itr.hasMoreTokens()) {
 *       word.set(itr.nextToken());
 *       context.collect(word, one);
 *     }
 *   }
 * }
 * 

 *  * 

Applications may override the {@link #run(Context)} method to exert   * greater control on map processing e.g. multi-threaded Mappers   * etc.

 *   * @see InputFormat  * @see JobContext  * @see Partitioner    * @see Reducer  */ public class Mapper {   public class Context      extends MapContext {     public Context(Configuration conf, TaskAttemptID taskid,                    RecordReader reader,                    RecordWriter writer,                    OutputCommitter committer,                    StatusReporter reporter,                    InputSplit split) throws IOException, InterruptedException {       super(conf, taskid, reader, writer, committer, reporter, split);     }   }      /**    * Called once at the beginning of the task.    */   protected void setup(Context context                        ) throws IOException, InterruptedException {     // NOTHING   }   /**    * Called once for each key/value pair in the input split. Most applications    * should override this, but the default is the identity function.    */   @SuppressWarnings("unchecked")   protected void map(KEYIN key, VALUEIN value,                       Context context) throws IOException, InterruptedException {     context.write((KEYOUT) key, (VALUEOUT) value);   }   /**    * Called once at the end of the task.    */   protected void cleanup(Context context                          ) throws IOException, InterruptedException {     // NOTHING   }      /**    * Expert users can override this method for more complete control over the    * execution of the Mapper.    * @param context    * @throws IOException    */   public void run(Context context) throws IOException, InterruptedException {     setup(context);     while (context.nextKeyValue()) {       map(context.getCurrentKey(), context.getCurrentValue(), context);     }     cleanup(context);   } }

Mapper的四个方法是setup,map,cleanup和run。其中,setup和cleanup用于管理Mapper生命周期中的资源,setup在完成Mapper构造,即将开始执行map动作前调用,cleanup则在所有的map动作完成后被调用。方法map用于对一次输入的key/value对进行map动作。run方法执行了上面描述的过程,它调用setup,让后迭代所有的key/value对,进行map,最后调用cleanup。

org.apache.hadoop.mapreduce.lib.map中实现了Mapper的三个子类,分别是InverseMapper(将输入 map为输出),MultithreadedMapper(多线程执行map方法)和TokenCounterMapper(对输入的value分解为token并计数)。其中最复杂的是MultithreadedMapper,我们就以它为例,来分析Mapper的实现。

InverseMapper源代码:

 * Licensed to the Apache Software Foundation (ASF) under one


package org.apache.hadoop.mapreduce.lib.map;


import java.io.IOException;


/** A {@link Mapper} that swaps keys and values. */
public class InverseMapper extends Mapper {


  /** The inverse function.  Input keys and values are swapped.*/
  @Override
  public void map(K key, V value, Context context
                  ) throws IOException, InterruptedException {
    context.write(value, key);
  }
  
}

TokenCountMapper源代码:

 * Licensed to the Apache Software Foundation (ASF) under one


package org.apache.hadoop.mapreduce.lib.map;


import java.io.IOException;


/**
 * Tokenize the input values and emit each word with a count of 1.
 */
public class TokenCounterMapper extends Mapper{
    
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();
  
  @Override
  public void map(Object key, Text value, Context context
                  ) throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    while (itr.hasMoreTokens()) {
      word.set(itr.nextToken());
      context.write(word, one);
    }
  }
}

MultithreadedMapper会启动多个线程执行另一个Mapper的map方法,它会启动mapred.map.multithreadedrunner.threads(配置项)个线程执行Mapper:mapred.map.multithreadedrunner.class(配置项)。MultithreadedMapper重写了基类Mapper的run方法,启动N个线程(对应的类为MapRunner)执行mapred.map.multithreadedrunner.class(我们称为目标Mapper)的run方法(就是说,目标Mapper的setup和cleanup会被执行多次)。目标Mapper共享同一份InputSplit,这就意味着,对InputSplit的数据读必须线程安全。为此,MultithreadedMapper引入了内部类SubMapRecordReader,SubMapRecordWriter,SubMapStatusReporter,分别继承自RecordReader,RecordWriter和StatusReporter,它们通过互斥访问MultithreadedMapper的Mapper.Context,实现了对同一份InputSplit的线程安全访问,为Mapper提供所需的Context。这些类的实现方法都很简单。

关于“hadoop-Mapper的示例分析”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。


分享文章:hadoop-Mapper的示例分析
文章URL:http://myzitong.com/article/ggjcjg.html