输入DStream之基础数据源

2019-02-23  本文已影响0人  一个人一匹马

HDFS文件
基于HDFS文件的实时计算,其实就是,监控一个HDFS目录,只要其中有新文件出现,就实时处理。相当于处理实时的文件流。
streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory)
streamingContext.fileStreamKeyClass, ValueClass, InputFormatClass
Spark Streaming会监视指定的HDFS目录,并且处理出现在目录中的文件。要注意的是,所有放入HDFS目录中的文件,都必须有相同的格式;必须使用移动或者重命名的方式,将文件移入目录;一旦处理之后,文件的内容即使改变,也不会再处理了;基于HDFS文件的数据源是没有Receiver的,因此不会占用一个cpu core。

基于HDFS的实时wordcount程序
1、基于HDFS的实时wordcount程序

import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

/**
* 基于HDFS文件的实时wordcount程序
* @author Administrator
*
*/
public class HDFSWordCount {

​public static void main(String[] args) {
​​SparkConf conf = new SparkConf()​​​​.setMaster("local[2]")​​​​.setAppName("HDFSWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
​​// 首先,使用JavaStreamingContext的textFileStream()方法,针对HDFS目录创建输入数据流
​​JavaDStream<String> lines = jssc.textFileStream("hdfs://spark1:9000/wordcount_dir");
​​​​// 执行wordcount操作
​​JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

​​​private static final long serialVersionUID = 1L;

​​​@Override
​​​public Iterable<String> call(String line) throws Exception {
​​​​return Arrays.asList(line.split(" "));
​​​}
​​});
JavaPairDStream<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>() {
​​​​​private static final long serialVersionUID = 1L;

​​​​​@Override
​​​​​public Tuple2<String, Integer> call(String word) ​​​​​​​throws Exception {
​​​​​​return new Tuple2<String, Integer>(word, 1);
​​​​​}
​​​​});
​​JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(

​​​​new Function2<Integer, Integer, Integer>() {

private static final long serialVersionUID = 1L;

@Override
​​​​​public Integer call(Integer v1, Integer v2) throws Exception {
​​​​​​return v1 + v2;
​​​​​}​​​​
​​​​});
wordCounts.print();
​​jssc.start();
​​jssc.awaitTermination();
​​jssc.close();
​}
}

验证:
Hadoop fs –mkdir /wordCount_dir

上一篇下一篇

猜你喜欢

热点阅读