

2018-10-28  本文已影响0人  kangapp


package org.apache.hadoop.mapred;

import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;

 * <code>InputFormat</code> describes the input-specification for a 
 * Map-Reduce job. 
 * <p>The Map-Reduce framework relies on the <code>InputFormat</code> of the
 * job to:<p>
 * <ol>
 *   <li>
 *   Validate the input-specification of the job. 
 *   <li>
 *   Split-up the input file(s) into logical {@link InputSplit}s, each of 
 *   which is then assigned to an individual {@link Mapper}.
 *   </li>
 *   <li>
 *   Provide the {@link RecordReader} implementation to be used to glean
 *   input records from the logical <code>InputSplit</code> for processing by 
 *   the {@link Mapper}.
 *   </li>
 * </ol>
 * <p>The default behavior of file-based {@link InputFormat}s, typically 
 * sub-classes of {@link FileInputFormat}, is to split the 
 * input into <i>logical</i> {@link InputSplit}s based on the total size, in 
 * bytes, of the input files. However, the {@link FileSystem} blocksize of  
 * the input files is treated as an upper bound for input splits. A lower bound 
 * on the split size can be set via 
 * <a href="{@docRoot}/../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml#mapreduce.input.fileinputformat.split.minsize">
 * mapreduce.input.fileinputformat.split.minsize</a>.</p>
 * <p>Clearly, logical splits based on input-size is insufficient for many 
 * applications since record boundaries are to be respected. In such cases, the
 * application has to also implement a {@link RecordReader} on whom lies the
 * responsibilty to respect record-boundaries and present a record-oriented
 * view of the logical <code>InputSplit</code> to the individual task.
 * @see InputSplit
 * @see RecordReader
 * @see JobClient
 * @see FileInputFormat
public interface InputFormat<K, V> {

   * Logically split the set of input files for the job.  
   * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
   * for processing.</p>
   * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
   * input files are not physically split into chunks. For e.g. a split could
   * be <i>&lt;input-file-path, start, offset&gt;</i> tuple.
   * @param job job configuration.
   * @param numSplits the desired number of splits, a hint.
   * @return an array of {@link InputSplit}s for the job.
  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

   * Get the {@link RecordReader} for the given {@link InputSplit}.
   * <p>It is the responsibility of the <code>RecordReader</code> to respect
   * record boundaries while processing the logical split to present a 
   * record-oriented view to the individual task.</p>
   * @param split the {@link InputSplit}
   * @param job the job that this split belongs to
   * @return a {@link RecordReader}
  RecordReader<K, V> getRecordReader(InputSplit split,
                                     JobConf job, 
                                     Reporter reporter) throws IOException;


package org.apache.hadoop.mapred;

import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.util.Progressable;

 * <code>OutputFormat</code> describes the output-specification for a 
 * Map-Reduce job.
 * <p>The Map-Reduce framework relies on the <code>OutputFormat</code> of the
 * job to:<p>
 * <ol>
 *   <li>
 *   Validate the output-specification of the job. For e.g. check that the 
 *   output directory doesn't already exist. 
 *   <li>
 *   Provide the {@link RecordWriter} implementation to be used to write out
 *   the output files of the job. Output files are stored in a 
 *   {@link FileSystem}.
 *   </li>
 * </ol>
 * @see RecordWriter
 * @see JobConf
public interface OutputFormat<K, V> {

   * Get the {@link RecordWriter} for the given job.
   * @param ignored
   * @param job configuration for the job whose output is being written.
   * @param name the unique name for this part of the output.
   * @param progress mechanism for reporting progress while writing to file.
   * @return a {@link RecordWriter} to write the output for the job.
   * @throws IOException
  RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
                                     String name, Progressable progress)
  throws IOException;

   * Check for validity of the output-specification for the job.
   * <p>This is to validate the output specification for the job when it is
   * a job is submitted.  Typically checks that it does not already exist,
   * throwing an exception when it already exists, so that output is not
   * overwritten.</p>
   * @param ignored
   * @param job job configuration.
   * @throws IOException when output should not be attempted
  void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;
package org.apache.hadoop.mapreduce;

import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.task.MapContextImpl;

 * Maps input key/value pairs to a set of intermediate key/value pairs.  
 * <p>Maps are the individual tasks which transform input records into a 
 * intermediate records. The transformed intermediate records need not be of 
 * the same type as the input records. A given input pair may map to zero or 
 * many output pairs.</p> 
 * <p>The Hadoop Map-Reduce framework spawns one map task for each 
 * {@link InputSplit} generated by the {@link InputFormat} for the job.
 * <code>Mapper</code> implementations can access the {@link Configuration} for 
 * the job via the {@link JobContext#getConfiguration()}.
 * <p>The framework first calls 
 * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
 * {@link #map(Object, Object, Context)} 
 * for each key/value pair in the <code>InputSplit</code>. Finally 
 * {@link #cleanup(Context)} is called.</p>
 * <p>All intermediate values associated with a given output key are 
 * subsequently grouped by the framework, and passed to a {@link Reducer} to  
 * determine the final output. Users can control the sorting and grouping by 
 * specifying two key {@link RawComparator} classes.</p>
 * <p>The <code>Mapper</code> outputs are partitioned per 
 * <code>Reducer</code>. Users can control which keys (and hence records) go to 
 * which <code>Reducer</code> by implementing a custom {@link Partitioner}.
 * <p>Users can optionally specify a <code>combiner</code>, via 
 * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the 
 * intermediate outputs, which helps to cut down the amount of data transferred 
 * from the <code>Mapper</code> to the <code>Reducer</code>.
 * <p>Applications can specify if and how the intermediate
 * outputs are to be compressed and which {@link CompressionCodec}s are to be
 * used via the <code>Configuration</code>.</p>
 * <p>If the job has zero
 * reduces then the output of the <code>Mapper</code> is directly written
 * to the {@link OutputFormat} without sorting by keys.</p>
 * <p>Example:</p>
 * <p><blockquote><pre>
 * public class TokenCounterMapper 
 *     extends Mapper&lt;Object, Text, Text, IntWritable&gt;{
 *   private final static IntWritable one = new IntWritable(1);
 *   private Text word = new Text();
 *   public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
 *     StringTokenizer itr = new StringTokenizer(value.toString());
 *     while (itr.hasMoreTokens()) {
 *       word.set(itr.nextToken());
 *       context.write(word, one);
 *     }
 *   }
 * }
 * </pre></blockquote></p>
 * <p>Applications may override the {@link #run(Context)} method to exert 
 * greater control on map processing e.g. multi-threaded <code>Mapper</code>s 
 * etc.</p>
 * @see InputFormat
 * @see JobContext
 * @see Partitioner  
 * @see Reducer
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

   * The <code>Context</code> passed on to the {@link Mapper} implementations.
  public abstract class Context
    implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
   * Called once at the beginning of the task.
  protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING

   * Called once for each key/value pair in the input split. Most applications
   * should override this, but the default is the identity function.
  protected void map(KEYIN key, VALUEIN value, 
                     Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);

   * Called once at the end of the task.
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
   * Expert users can override this method for more complete control over the
   * execution of the Mapper.
   * @param context
   * @throws IOException
  public void run(Context context) throws IOException, InterruptedException {
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
    } finally {
package org.apache.hadoop.mapreduce;

import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.task.annotation.Checkpointable;

import java.util.Iterator;

 * Reduces a set of intermediate values which share a key to a smaller set of
 * values.  
 * <p><code>Reducer</code> implementations 
 * can access the {@link Configuration} for the job via the 
 * {@link JobContext#getConfiguration()} method.</p>

 * <p><code>Reducer</code> has 3 primary phases:</p>
 * <ol>
 *   <li>
 *   <h4 id="Shuffle">Shuffle</h4>
 *   <p>The <code>Reducer</code> copies the sorted output from each 
 *   {@link Mapper} using HTTP across the network.</p>
 *   </li>
 *   <li>
 *   <h4 id="Sort">Sort</h4>
 *   <p>The framework merge sorts <code>Reducer</code> inputs by 
 *   <code>key</code>s 
 *   (since different <code>Mapper</code>s may have output the same key).</p>
 *   <p>The shuffle and sort phases occur simultaneously i.e. while outputs are
 *   being fetched they are merged.</p>
 *   <h5 id="SecondarySort">SecondarySort</h5>
 *   <p>To achieve a secondary sort on the values returned by the value 
 *   iterator, the application should extend the key with the secondary
 *   key and define a grouping comparator. The keys will be sorted using the
 *   entire key, but will be grouped using the grouping comparator to decide
 *   which keys and values are sent in the same call to reduce.The grouping 
 *   comparator is specified via 
 *   {@link Job#setGroupingComparatorClass(Class)}. The sort order is
 *   controlled by 
 *   {@link Job#setSortComparatorClass(Class)}.</p>
 *   For example, say that you want to find duplicate web pages and tag them 
 *   all with the url of the "best" known example. You would set up the job 
 *   like:
 *   <ul>
 *     <li>Map Input Key: url</li>
 *     <li>Map Input Value: document</li>
 *     <li>Map Output Key: document checksum, url pagerank</li>
 *     <li>Map Output Value: url</li>
 *     <li>Partitioner: by checksum</li>
 *     <li>OutputKeyComparator: by checksum and then decreasing pagerank</li>
 *     <li>OutputValueGroupingComparator: by checksum</li>
 *   </ul>
 *   </li>
 *   <li>   
 *   <h4 id="Reduce">Reduce</h4>
 *   <p>In this phase the 
 *   {@link #reduce(Object, Iterable, Context)}
 *   method is called for each <code>&lt;key, (collection of values)&gt;</code> in
 *   the sorted inputs.</p>
 *   <p>The output of the reduce task is typically written to a 
 *   {@link RecordWriter} via 
 *   {@link Context#write(Object, Object)}.</p>
 *   </li>
 * </ol>
 * <p>The output of the <code>Reducer</code> is <b>not re-sorted</b>.</p>
 * <p>Example:</p>
 * <p><blockquote><pre>
 * public class IntSumReducer&lt;Key&gt; extends Reducer&lt;Key,IntWritable,
 *                                                 Key,IntWritable&gt; {
 *   private IntWritable result = new IntWritable();
 *   public void reduce(Key key, Iterable&lt;IntWritable&gt; values,
 *                      Context context) throws IOException, InterruptedException {
 *     int sum = 0;
 *     for (IntWritable val : values) {
 *       sum += val.get();
 *     }
 *     result.set(sum);
 *     context.write(key, result);
 *   }
 * }
 * </pre></blockquote></p>
 * @see Mapper
 * @see Partitioner

   * The <code>Context</code> passed on to the {@link Reducer} implementations.
  public abstract class Context 
    implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {

   * Called once at the start of the task.
  protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING

   * This method is called once for each key. Most applications will define
   * their reduce class by overriding this method. The default implementation
   * is an identity function.
  protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                        ) throws IOException, InterruptedException {
    for(VALUEIN value: values) {
      context.write((KEYOUT) key, (VALUEOUT) value);

   * Called once at the end of the task.
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING

   * Advanced application writers can use the 
   * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
   * control how the reduce task works.
  public void run(Context context) throws IOException, InterruptedException {
    try {
      while (context.nextKey()) {
        reduce(context.getCurrentKey(), context.getValues(), context);
        // If a back up store is used, reset it
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if(iter instanceof ReduceContext.ValueIterator) {
    } finally {


import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());//默认以空格分割
      while (itr.hasMoreTokens()) {
        context.write(word, one);
  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      context.write(key, result);

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  • Mapper
  • Reducer
public static class GroupingComparator extends WritableComparator
        protected GroupingComparator()
            super(IntPair.class, true);
        //Compare two WritableComparables.
        public int compare(WritableComparable w1, WritableComparable w2)
            IntPair ip1 = (IntPair) w1;
            IntPair ip2 = (IntPair) w2;
            int l = ip1.getFirst();
            int r = ip2.getFirst();
            return l == r ? 0 : (l < r ? -1 : 1);
public class KeyPartitioner extends Partitioner<TextInt, IntWritable> {
    public int getPartition(TextInt key, IntWritable value, int numPartitions) {
        // TODO Auto-generated method stub
        return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;

运行:$ bin/hadoop jar wc.jar WordCount /user/joe/wordcount/input /user/joe/wordcount/output

  • -file:以逗号分割的路径列表,这些路径会出现在当前任务的工作路径中
  • -libjars:以逗号分割的jar包,添加到map和reduce的类路径中
  • -archives:以逗号分割的压缩包列表,压缩包未归档,并且在当前工作路径创建了和压缩包同名的链接;
    bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files cachefile.txt -libjars mylib.jar -archives myarchive.zip input output
package cn.test.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.*;

public class WordCount2 {

    public static class TokenizerMapper extends Mapper<Object, Text,Text, IntWritable>{

        static enum CountersEnum {INPUT_WORDS}

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        private boolean caseSensitive;
        private Set<String> patternsToSkip = new HashSet<String>();

        private Configuration conf;
        private BufferedReader fis;

        protected void setup(Context context) throws IOException, InterruptedException {
            conf = context.getConfiguration();
            caseSensitive = conf.getBoolean("wordcount.case.sensitive",true);
                URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
                for(URI patternsURI : patternsURIs){
                    Path patternsPath = new Path(patternsURI.getPath());
                    String patternsFileName = patternsPath.getName().toString();

        private void parseSkipFile(String fileName) {
                fis = new BufferedReader(new FileReader(fileName));
                String pattern = null;
                while((pattern = fis.readLine())!=null){
            } catch (IOException ioe){
                System.err.println("Caught exception while parsing the cached file'"
                + StringUtils.stringifyException(ioe));

        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line = caseSensitive ? value.toString(): value.toString().toLowerCase();
            for (String pattern : patternsToSkip){
                line = line.replaceAll(pattern, "");
            StringTokenizer itr = new StringTokenizer(line);
            while (itr.hasMoreTokens()){
                Counter counter = context.getCounter(CountersEnum.class.getName(),CountersEnum.INPUT_WORDS.toString());


    public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>{

        private IntWritable result = new IntWritable();

        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for(IntWritable val : values){
                sum += val.get();

    public static void main(String args[]) throws Exception{
        Configuration conf = new Configuration();
        GenericOptionsParser optionsParser = new GenericOptionsParser(conf, args);
        String[] remainingArgs = optionsParser.getRemainingArgs();
        if (remainingArgs.length!=2&&remainingArgs.length!=4){
            System.out.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
        Job job = Job.getInstance(conf,"wordCount");

        List<String> otherArgs = new ArrayList<String>();
        for (int i=0;i<remainingArgs.length;i++){
                job.addCacheFile(new Path(remainingArgs[++i]).toUri());
                job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
            } else{
        FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));

  • GenericOptionsParser
    支持的命令行参数:bin/hadoop command [genericOptions] [commandOptions]
    -conf<配置文件> 指定配置文件$ bin/hadoop dfs -conf core-site.xml -conf hdfs-site.xml -ls /data
    -D <property=value> 使用给定属性的值$ bin/hadoop dfs -D fs.default.name=darwin:8020 -ls /data
    -fs <local|namenode:port> 指定namenode$ bin/hadoop dfs -fs darwin:8020 -ls /data
    -jt <local|resourcemanager:port>指定ResourceManager$ bin/hadoop job -jt local -submit job.xml
  • DistributedCache
    文件/归档文件分发可以通过设置属性 mapreduce.job.cache.{files |archives},以逗号进行分割;
    应用程序中通过API Job.addCacheFile(URI)/ Job.addCacheArchive(URI)(URI默认是HDFS系统上的文件)
    Streaming可以在命令行通过 -cacheFile/-cacheArchive分发文件
  • Counter
        Path outputPath = new Path(args[1]);
        FileSystem fileSystem = FileSystem.get(conf);
            fileSystem.delete(outputPath, true);
            System.out.println("existed file has deleted");
  • jobhistory
    mr-jobhistory-daemon.sh start historyserver 启动
    jps -----》 JobHistoryServer




yarn-site.xml 加上下列配置开启聚合功能,可以查看日志信息


