MapReduce示例
2018-10-28 本文已影响0人
kangapp
MapReduce执行流程图
核心概念
- Split:MapReduce作业处理的数据块,是MapReduce中最小的计算单元。和HDFS中的block默认是一一对应的,也可以手动设置他们之间的比值关系(不建议)
- InputFormat:将输入数据进行分片(split)
package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
/**
* <code>InputFormat</code> describes the input-specification for a
* Map-Reduce job.
*
* <p>The Map-Reduce framework relies on the <code>InputFormat</code> of the
* job to:<p>
* <ol>
* <li>
* Validate the input-specification of the job.
* <li>
* Split-up the input file(s) into logical {@link InputSplit}s, each of
* which is then assigned to an individual {@link Mapper}.
* </li>
* <li>
* Provide the {@link RecordReader} implementation to be used to glean
* input records from the logical <code>InputSplit</code> for processing by
* the {@link Mapper}.
* </li>
* </ol>
*
* <p>The default behavior of file-based {@link InputFormat}s, typically
* sub-classes of {@link FileInputFormat}, is to split the
* input into <i>logical</i> {@link InputSplit}s based on the total size, in
* bytes, of the input files. However, the {@link FileSystem} blocksize of
* the input files is treated as an upper bound for input splits. A lower bound
* on the split size can be set via
* <a href="{@docRoot}/../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml#mapreduce.input.fileinputformat.split.minsize">
* mapreduce.input.fileinputformat.split.minsize</a>.</p>
*
* <p>Clearly, logical splits based on input-size is insufficient for many
* applications since record boundaries are to be respected. In such cases, the
* application has to also implement a {@link RecordReader} on whom lies the
* responsibilty to respect record-boundaries and present a record-oriented
* view of the logical <code>InputSplit</code> to the individual task.
*
* @see InputSplit
* @see RecordReader
* @see JobClient
* @see FileInputFormat
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface InputFormat<K, V> {
/**
* Logically split the set of input files for the job.
*
* <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
* for processing.</p>
*
* <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
* input files are not physically split into chunks. For e.g. a split could
* be <i><input-file-path, start, offset></i> tuple.
*
* @param job job configuration.
* @param numSplits the desired number of splits, a hint.
* @return an array of {@link InputSplit}s for the job.
*/
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
/**
* Get the {@link RecordReader} for the given {@link InputSplit}.
*
* <p>It is the responsibility of the <code>RecordReader</code> to respect
* record boundaries while processing the logical split to present a
* record-oriented view to the individual task.</p>
*
* @param split the {@link InputSplit}
* @param job the job that this split belongs to
* @return a {@link RecordReader}
*/
RecordReader<K, V> getRecordReader(InputSplit split,
JobConf job,
Reporter reporter) throws IOException;
}
-OutputFormat:将job的内容输出到文件系统
package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.util.Progressable;
/**
* <code>OutputFormat</code> describes the output-specification for a
* Map-Reduce job.
*
* <p>The Map-Reduce framework relies on the <code>OutputFormat</code> of the
* job to:<p>
* <ol>
* <li>
* Validate the output-specification of the job. For e.g. check that the
* output directory doesn't already exist.
* <li>
* Provide the {@link RecordWriter} implementation to be used to write out
* the output files of the job. Output files are stored in a
* {@link FileSystem}.
* </li>
* </ol>
*
* @see RecordWriter
* @see JobConf
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface OutputFormat<K, V> {
/**
* Get the {@link RecordWriter} for the given job.
*
* @param ignored
* @param job configuration for the job whose output is being written.
* @param name the unique name for this part of the output.
* @param progress mechanism for reporting progress while writing to file.
* @return a {@link RecordWriter} to write the output for the job.
* @throws IOException
*/
RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress)
throws IOException;
/**
* Check for validity of the output-specification for the job.
*
* <p>This is to validate the output specification for the job when it is
* a job is submitted. Typically checks that it does not already exist,
* throwing an exception when it already exists, so that output is not
* overwritten.</p>
*
* @param ignored
* @param job job configuration.
* @throws IOException when output should not be attempted
*/
void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;
}
- Mapper
package org.apache.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.task.MapContextImpl;
/**
* Maps input key/value pairs to a set of intermediate key/value pairs.
*
* <p>Maps are the individual tasks which transform input records into a
* intermediate records. The transformed intermediate records need not be of
* the same type as the input records. A given input pair may map to zero or
* many output pairs.</p>
*
* <p>The Hadoop Map-Reduce framework spawns one map task for each
* {@link InputSplit} generated by the {@link InputFormat} for the job.
* <code>Mapper</code> implementations can access the {@link Configuration} for
* the job via the {@link JobContext#getConfiguration()}.
*
* <p>The framework first calls
* {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
* {@link #map(Object, Object, Context)}
* for each key/value pair in the <code>InputSplit</code>. Finally
* {@link #cleanup(Context)} is called.</p>
*
* <p>All intermediate values associated with a given output key are
* subsequently grouped by the framework, and passed to a {@link Reducer} to
* determine the final output. Users can control the sorting and grouping by
* specifying two key {@link RawComparator} classes.</p>
*
* <p>The <code>Mapper</code> outputs are partitioned per
* <code>Reducer</code>. Users can control which keys (and hence records) go to
* which <code>Reducer</code> by implementing a custom {@link Partitioner}.
*
* <p>Users can optionally specify a <code>combiner</code>, via
* {@link Job#setCombinerClass(Class)}, to perform local aggregation of the
* intermediate outputs, which helps to cut down the amount of data transferred
* from the <code>Mapper</code> to the <code>Reducer</code>.
*
* <p>Applications can specify if and how the intermediate
* outputs are to be compressed and which {@link CompressionCodec}s are to be
* used via the <code>Configuration</code>.</p>
*
* <p>If the job has zero
* reduces then the output of the <code>Mapper</code> is directly written
* to the {@link OutputFormat} without sorting by keys.</p>
*
* <p>Example:</p>
* <p><blockquote><pre>
* public class TokenCounterMapper
* extends Mapper<Object, Text, Text, IntWritable>{
*
* private final static IntWritable one = new IntWritable(1);
* private Text word = new Text();
*
* public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
* StringTokenizer itr = new StringTokenizer(value.toString());
* while (itr.hasMoreTokens()) {
* word.set(itr.nextToken());
* context.write(word, one);
* }
* }
* }
* </pre></blockquote></p>
*
* <p>Applications may override the {@link #run(Context)} method to exert
* greater control on map processing e.g. multi-threaded <code>Mapper</code>s
* etc.</p>
*
* @see InputFormat
* @see JobContext
* @see Partitioner
* @see Reducer
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
/**
* The <code>Context</code> passed on to the {@link Mapper} implementations.
*/
public abstract class Context
implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
/**
* Called once at the beginning of the task.
*/
protected void setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Called once for each key/value pair in the input split. Most applications
* should override this, but the default is the identity function.
*/
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
/**
* Called once at the end of the task.
*/
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Expert users can override this method for more complete control over the
* execution of the Mapper.
* @param context
* @throws IOException
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
}
- Reducer
package org.apache.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.task.annotation.Checkpointable;
import java.util.Iterator;
/**
* Reduces a set of intermediate values which share a key to a smaller set of
* values.
*
* <p><code>Reducer</code> implementations
* can access the {@link Configuration} for the job via the
* {@link JobContext#getConfiguration()} method.</p>
* <p><code>Reducer</code> has 3 primary phases:</p>
* <ol>
* <li>
*
* <h4 id="Shuffle">Shuffle</h4>
*
* <p>The <code>Reducer</code> copies the sorted output from each
* {@link Mapper} using HTTP across the network.</p>
* </li>
*
* <li>
* <h4 id="Sort">Sort</h4>
*
* <p>The framework merge sorts <code>Reducer</code> inputs by
* <code>key</code>s
* (since different <code>Mapper</code>s may have output the same key).</p>
*
* <p>The shuffle and sort phases occur simultaneously i.e. while outputs are
* being fetched they are merged.</p>
*
* <h5 id="SecondarySort">SecondarySort</h5>
*
* <p>To achieve a secondary sort on the values returned by the value
* iterator, the application should extend the key with the secondary
* key and define a grouping comparator. The keys will be sorted using the
* entire key, but will be grouped using the grouping comparator to decide
* which keys and values are sent in the same call to reduce.The grouping
* comparator is specified via
* {@link Job#setGroupingComparatorClass(Class)}. The sort order is
* controlled by
* {@link Job#setSortComparatorClass(Class)}.</p>
*
*
* For example, say that you want to find duplicate web pages and tag them
* all with the url of the "best" known example. You would set up the job
* like:
* <ul>
* <li>Map Input Key: url</li>
* <li>Map Input Value: document</li>
* <li>Map Output Key: document checksum, url pagerank</li>
* <li>Map Output Value: url</li>
* <li>Partitioner: by checksum</li>
* <li>OutputKeyComparator: by checksum and then decreasing pagerank</li>
* <li>OutputValueGroupingComparator: by checksum</li>
* </ul>
* </li>
*
* <li>
* <h4 id="Reduce">Reduce</h4>
*
* <p>In this phase the
* {@link #reduce(Object, Iterable, Context)}
* method is called for each <code><key, (collection of values)></code> in
* the sorted inputs.</p>
* <p>The output of the reduce task is typically written to a
* {@link RecordWriter} via
* {@link Context#write(Object, Object)}.</p>
* </li>
* </ol>
*
* <p>The output of the <code>Reducer</code> is <b>not re-sorted</b>.</p>
*
* <p>Example:</p>
* <p><blockquote><pre>
* public class IntSumReducer<Key> extends Reducer<Key,IntWritable,
* Key,IntWritable> {
* private IntWritable result = new IntWritable();
*
* public void reduce(Key key, Iterable<IntWritable> values,
* Context context) throws IOException, InterruptedException {
* int sum = 0;
* for (IntWritable val : values) {
* sum += val.get();
* }
* result.set(sum);
* context.write(key, result);
* }
* }
* </pre></blockquote></p>
*
* @see Mapper
* @see Partitioner
*/
@Checkpointable
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
/**
* The <code>Context</code> passed on to the {@link Reducer} implementations.
*/
public abstract class Context
implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
/**
* Called once at the start of the task.
*/
protected void setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* This method is called once for each key. Most applications will define
* their reduce class by overriding this method. The default implementation
* is an identity function.
*/
@SuppressWarnings("unchecked")
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
/**
* Called once at the end of the task.
*/
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Advanced application writers can use the
* {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
* control how the reduce task works.
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
}
- Combiner
- Partitioner
wordcount:词频统计分析
- wordcount 1.0
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
/**
*Mapper实现类通过map()方法一次处理一行数据,数据由指定的的TextInputFormat提供。
*/
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());//默认以空格分割
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
/*
*Reducer实现类通过reduce方法对出现的key值的value进行累加
*/
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
//对于每个map输出的键值对的“key”排序后,通过本地Combiner进行本地聚合
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
- Mapper
MapReduce为作业的InputFormat生成的每一个InputSplit产生map任务
通过调用context.write(WritableComparable,Writable)
来收集输出对
用户可以通过Job.setGroupingComparatorClass(Class)
来指定Comparator从而控制分组,通常需要继承WritableCompator
用户可以通过Job.setSortComparatorClass(Class)
来指定Comparator从而控制传递给Reduce前如何对key进行排序,通常需要继承WritableCompator
用户可以通过Job.setPartitionerClass(Class)
来指定Partitioner从而控制key和reduce的映射关系,partition的数量和reduce任务的数量是相同的
用户可以通过Job.setCombinerClass(Class)
对map的中间输出进行本地聚合,减少map向reduce的数据传输量,需继承Reducer- Reducer
Reducer有3个主要的阶段:shuffle、sort和reduce
public static class GroupingComparator extends WritableComparator
{
protected GroupingComparator()
{
super(IntPair.class, true);
}
@Override
//Compare two WritableComparables.
public int compare(WritableComparable w1, WritableComparable w2)
{
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
int l = ip1.getFirst();
int r = ip2.getFirst();
return l == r ? 0 : (l < r ? -1 : 1);
}
}
public class KeyPartitioner extends Partitioner<TextInt, IntWritable> {
@Override
public int getPartition(TextInt key, IntWritable value, int numPartitions) {
// TODO Auto-generated method stub
return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;
}
}
运行:$ bin/hadoop jar wc.jar WordCount /user/joe/wordcount/input /user/joe/wordcount/output
- -file:以逗号分割的路径列表,这些路径会出现在当前任务的工作路径中
- -libjars:以逗号分割的jar包,添加到map和reduce的类路径中
- -archives:以逗号分割的压缩包列表,压缩包未归档,并且在当前工作路径创建了和压缩包同名的链接;
下面的myarchive.zip会解压在名为“myarchive.zip”的目录中
bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files cachefile.txt -libjars mylib.jar -archives myarchive.zip input output
- WordCount2.0
package cn.test.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.*;
public class WordCount2 {
public static class TokenizerMapper extends Mapper<Object, Text,Text, IntWritable>{
static enum CountersEnum {INPUT_WORDS}
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private boolean caseSensitive;
private Set<String> patternsToSkip = new HashSet<String>();
private Configuration conf;
private BufferedReader fis;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
conf = context.getConfiguration();
//获取设置的属性值,并指定默认值
caseSensitive = conf.getBoolean("wordcount.case.sensitive",true);
if(conf.getBoolean("wordcount.skip.patterns",false)){
URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
for(URI patternsURI : patternsURIs){
Path patternsPath = new Path(patternsURI.getPath());
String patternsFileName = patternsPath.getName().toString();
parseSkipFile(patternsFileName);
}
}
}
private void parseSkipFile(String fileName) {
try{
fis = new BufferedReader(new FileReader(fileName));
String pattern = null;
while((pattern = fis.readLine())!=null){
patternsToSkip.add(pattern);
}
} catch (IOException ioe){
System.err.println("Caught exception while parsing the cached file'"
+ StringUtils.stringifyException(ioe));
}
}
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = caseSensitive ? value.toString(): value.toString().toLowerCase();
for (String pattern : patternsToSkip){
line = line.replaceAll(pattern, "");
}
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()){
word.set(itr.nextToken());
context.write(word,one);
Counter counter = context.getCounter(CountersEnum.class.getName(),CountersEnum.INPUT_WORDS.toString());
counter.increment(1);
}
}
}
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable val : values){
sum += val.get();
}
result.set(sum);
context.write(key,result);
}
}
public static void main(String args[]) throws Exception{
Configuration conf = new Configuration();
GenericOptionsParser optionsParser = new GenericOptionsParser(conf, args);
//返回仅包含特定于应用程序的参数的字符串数组。
String[] remainingArgs = optionsParser.getRemainingArgs();
if (remainingArgs.length!=2&&remainingArgs.length!=4){
System.out.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
System.exit(2);
}
Job job = Job.getInstance(conf,"wordCount");
job.setJarByClass(WordCount2.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
List<String> otherArgs = new ArrayList<String>();
for (int i=0;i<remainingArgs.length;i++){
if("-skip".equals(remainingArgs[i])){
job.addCacheFile(new Path(remainingArgs[++i]).toUri());
//设置配置自定义参数属性值
job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
} else{
otherArgs.add(remainingArgs[i]);
}
}
FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));
System.exit(job.waitForCompletion(true)?0:1);
}
}
- GenericOptionsParser
解析hadoop框架通用命令行参数,使应用程序可以轻松指定namenode,ResourceManager,其他配置资源等。
支持的命令行参数:bin/hadoop command [genericOptions] [commandOptions]
-conf<配置文件> 指定配置文件$ bin/hadoop dfs -conf core-site.xml -conf hdfs-site.xml -ls /data
-D <property=value> 使用给定属性的值$ bin/hadoop dfs -D fs.default.name=darwin:8020 -ls /data
-fs <local|namenode:port> 指定namenode$ bin/hadoop dfs -fs darwin:8020 -ls /data
-jt <local|resourcemanager:port>指定ResourceManager$ bin/hadoop job -jt local -submit job.xml
-files
-libjars
-archives- DistributedCache
有效地分发特定于应用程序的大型只读文件,用于缓存应用程序所需的文件(文本,存档,jar等)。
文件/归档文件分发可以通过设置属性 mapreduce.job.cache.{files |archives},以逗号进行分割;
应用程序中通过API Job.addCacheFile(URI)/ Job.addCacheArchive(URI)(URI默认是HDFS系统上的文件)
Streaming可以在命令行通过 -cacheFile/-cacheArchive分发文件- Counter
使用enum类名作为Counter的组名,enum的成员为Counter的名字
//清理已存在的输出目录
Path outputPath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(conf);
if(fileSystem.exists(outputPath)){
fileSystem.delete(outputPath, true);
System.out.println("existed file has deleted");
}
- jobhistory
记录已运行完的MapReduce信息到指定的HDFS目录下,默认没有开启该功能。
mapred-site.xml加上下列配置
mr-jobhistory-daemon.sh start historyserver 启动
jps -----》 JobHistoryServer
<property>
<name>mapreduce.jobhistory.address</name>
<value>master:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>master:19888</value>
</property>
<property>
<name>mapreduce.jobhistory.done-dir</name>
<value>/history/done</value>
</property>
<property>
<name>mapreduce.jobhisory.intermediate-done-dir</name>
<value>/history/done_intermediate</value>
</property>
yarn-site.xml 加上下列配置开启聚合功能,可以查看日志信息
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>