HDFS数据流WordCount

2018-11-08  本文已影响0人  hipeer

基于HDFS文件的实时计算,其实就是,监控一个HDFS目录,只要其中有新文件出现,就实时处理。相当于处理实时的文件流。
Spark Streaming会监视指定的HDFS目录,并且处理出现在目录中的文件。要注意的是,所有放入HDFS目录中的文件,都必须有相同的格式;必须使用移动或者重命名的方式,将文件移入目录;一旦处理之后,文件的内容即使改变,也不会再处理了;基于HDFS文件的数据源是没有Receiver的,因此不会占用一个cpu core。

WordCount
package cn.spark.streaming;

import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

/**
 * 基于HDFS的 spark streaming 的 wordcount 程序
 *
 */
public class HDFSWordCount {

    public static void main(String[] args) throws Exception {

        SparkConf conf = new SparkConf().setAppName("HDFSWordCount");

        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));

        JavaDStream<String> lineDStream = jssc.textFileStream(args[0]);
        
        // flatMap
        JavaDStream<String> wordDStream = lineDStream.flatMap(
                
                new FlatMapFunction<String, String>() {

                    private static final long serialVersionUID = -7716791306222529945L;

                    @Override
                    public Iterator<String> call(String line) throws Exception {
        
                        return Arrays.asList(line.split(" ")).iterator();
                    }
        
                });
        
        // mapToPair
        JavaPairDStream<String, Integer> pairDStream = wordDStream.mapToPair(
                
                new PairFunction<String, String, Integer>() {

                    private static final long serialVersionUID = 859183755407567155L;

                    @Override
                    public Tuple2<String, Integer> call(String word) throws Exception {
                        
                        return new Tuple2<String, Integer>(word, 1);
                    }
                });
        
        // reduceByKey
        JavaPairDStream<String, Integer> resultDStream = pairDStream.reduceByKey(
                
                new Function2<Integer, Integer, Integer>() {
                    
                    private static final long serialVersionUID = 6470112788289414159L;

                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {

                        return v1 + v2;
                    }
                });
        
        resultDStream.print(100);
        
        jssc.start();
        
        jssc.awaitTermination();
        
        jssc.close();
        
    }
}

上一篇下一篇

猜你喜欢

热点阅读