程序员数据结构大数据

浅谈Hadoop

2016-07-02  本文已影响711人  一只小哈

Hadoop是一个十分流行的分布式存储和计算框架,也是业内大数据处理和分析最通用的框架之一。


hadoop_icon.png

Hadoop2.0 由HDFS(Hadoop Distributed File System)、MapReduce和Yarn三部分组成。
Hadoop的设计原型来源于google的三篇论文,即GFS、MapReduce和BigTable,同时作为Lucene的子项目Nutch的一部分在2005年引入Apache,Hadoop的得名是Hadoop之父Doug Cutting儿子的玩具,值得一提的是他的妻子叫Lucene。
Hadoop生态

hbase生态.png

"永远不把鸡蛋放在一个篮子里"——HDFS

在分布式的文件系统之前,往往使用大型机和存储服务器去做存储,无论大型机和存储服务器都是十分昂贵的,同时也是有瓶颈的,横向扩展能力很差。而分布式文件系统的横向扩展能力以及容错性十分好,也越来越受到人们的青睐。HDFS的定位是用比较廉价的机器,做高可用的海量数据的存储。主要采用多副本的分块存储机制,在部分机器宕机或数据损坏的情况下,依然能提供可靠服务。

  1. 集群拓扑
hdfs.jpg hdfsread.png

open DistributedFileSystem 去NameNode获取文件的块列表,NameNode根据Client距离各节点的网络距离给出Block列表。
根据Block列表去一次读取文件,读取后在Client进行文件汇总。

  1. 写入流程:
hdfs_write.png

"分而治之"——MapReduce###

</br>
分布式计算出现之前,数据的计算往往依靠性能比较好的单机计算。但是单机受限于本身的计算资源,往往计算速度都不如人意。
一天小明接到产品的一个需求:
产品:小明啊,这里有一天的日志信息,大概5个G,我要统计一下一共有多少。
小明:OK啊,就5个G,一个shell搞定,看我 cat * | wc -l,我简直就是个天才。
产品:对不起啊小明,需求变了,一天的看不出来效果,我需要统计1个月的数据,大概有150G。
小明:有点大啊,不怕,我线上服务器内存120G,40核,看我用多线程搞定,过了2个小时,终于搞 定了还有点费劲。
产品:我保证这是我最后一次变更需求,我想要最近一年的数据1800G左右。
小明:数据上T了,搞不定了啊。
上面的例子告诉我们,在大数量的场景下,高性能的单机有时也是解决不了问题。所以我们就需要MapReduce帮助我们。
</br>
MapReduce是一种采用分治和规约的一种并行的批处理框架,先将数据做分割计算,最后汇总结果。看上去和多线程的处理机制一样,但是Hadoop将它封装在了框架中,编程十分简单,吞吐量十分高,目前支持Java、C++、Python等多种API编程。
1.MapReduce运行模型总体概览:

mapreduceAllGraph.png

*reduce:将不同map汇总来的数据做reduce逻辑。

2.多reduce:

datatrans.png

3.经典wordcount:

wordcountdatatrans.png mapreducedataStream.png

4.Map类的实现:

5.Reduce类的实现:

6.作业整体配置:

wordcount:
public class WordCountTask { private static final Logger logger = Logger.getLogger(WordCountTask.class); public static class WordCountMap extends Mapper<Object, Text, Text, IntWritable>{ private static final IntWritable one = new IntWritable(1); private Text word = new Text(); @Override protected void cleanup(Context context) throws IOException, InterruptedException { logger.info("mapTaskEnd....."); } protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { this.word.set(itr.nextToken()); context.write(this.word, one); } } protected void setup(Context context) throws IOException, InterruptedException { logger.info("mapTaskStart....."); } } public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } this.result.set(sum); context.write(key, this.result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCountTask.class); job.setMapperClass(WordCountMap.class); job.setReducerClass(WordCountReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileSystem fs = FileSystem.get(conf); for (int i = 0; i < otherArgs.length - 1; i++) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } if(fs.exists(new Path(otherArgs[otherArgs.length - 1]))){ fs.delete(new Path(otherArgs[otherArgs.length - 1])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)])); job.setNumReduceTasks(1); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

6.提交:
​hadoop jar hadoop-examples.jar demo.wordcount(主类名) Dmapreduce.job.queuename=XX(系统参数) input output
​缺点:无定时调度

  1. 常用的InputFormat:
上一篇 下一篇

猜你喜欢

热点阅读