使用滑动窗口进行实时的热词统计

2018-11-10  本文已影响0人  hipeer

开发环境:

Java代码:

package cn.spark.streaming;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import kafka.serializer.StringDecoder;
import scala.Tuple2;

/**
 * 
 * Hot Word Count
 * 
 */
public class WindowWordCount {

    public static void main(String[] args) throws Exception{
        
        SparkConf conf = new SparkConf().setAppName("WindowWordCount");
        
        // create context
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
    
        // kafka properties map
        Map<String, String> KafkaParams = new HashMap<String, String>();
        KafkaParams.put("bootstrap.servers", "hserver-1:9092,hserver-2:9092,hserver-3:9092");
        KafkaParams.put("group.id", "WindowWordCount");
        KafkaParams.put("auto.offset.reset", "smallest");
        
        // kafka topic set
        Set<String> topics = new HashSet<String>();
        topics.add(args[0]);
        
        // access data DStream
        JavaPairInputDStream<String, String> SearchLogDStream = 
                KafkaUtils.createDirectStream(
                        jssc, 
                        String.class, 
                        String.class, 
                        StringDecoder.class, 
                        StringDecoder.class, 
                        KafkaParams, 
                        topics
                        );
        
        // flatMap
        JavaDStream<String> WordDSteram = 
                SearchLogDStream.flatMap(
                        
                        new FlatMapFunction<Tuple2<String,String>, String>() {

                            private static final long serialVersionUID = 4034522628037914742L;

                            @Override
                            public Iterator<String> call(Tuple2<String, String> tuple) throws Exception {

                                return Arrays.asList(tuple._2.split(" ")).iterator();
                            }
                        });
        
        // mapToPair
        JavaPairDStream<String, Integer> WordPairDStream = 
                WordDSteram.mapToPair(
                        
                        new PairFunction<String, String, Integer>() {

                            private static final long serialVersionUID = 2101884706537316002L;

                            @Override
                            public Tuple2<String, Integer> call(String word) throws Exception {

                                return new Tuple2<String, Integer>(word, 1);
                            }
                        });
        
        // reduceByKeyAndWindow
        JavaPairDStream<String, Integer> WindowWordDStream = 
                WordPairDStream.reduceByKeyAndWindow(
                        new Function2<Integer, Integer, Integer>() {
                    
                            private static final long serialVersionUID = -358144101893232390L;

                            @Override
                            public Integer call(Integer v1, Integer v2) throws Exception {

                                return v1 + v2;
                            }
                        }, 
                        Durations.seconds(10), 
                        Durations.seconds(60)
                        );
        
        // sort
        JavaPairDStream<String, Integer> ResultSortDStream = 
                WindowWordDStream.transformToPair(
                        
                    new Function<JavaPairRDD<String,Integer>, JavaPairRDD<String,Integer>>() {

                        private static final long serialVersionUID = 1441798634812792342L;

                        @Override
                        public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> unSortRDD)
                                throws Exception {
                            
                            JavaPairRDD<String, Integer> sortRDD = unSortRDD
                            .mapToPair(
                                    
                                new PairFunction<Tuple2<String,Integer>, Integer, String>() {

                                    private static final long serialVersionUID = -3715362497048144520L;

                                    @Override
                                    public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception {

                                        return new Tuple2<Integer, String>(tuple._2, tuple._1);
                                    }
                                })
                            .sortByKey(false)
                            .mapToPair(
                                    
                                new PairFunction<Tuple2<Integer,String>, String, Integer>() {

                                    private static final long serialVersionUID = 7017380215451671038L;

                                    @Override
                                    public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple)
                                            throws Exception {

                                        return new Tuple2<String, Integer>(tuple._2, tuple._1);
                                    }
                                });

                            return sortRDD;
                        }
                    });
        
        // print result
        ResultSortDStream.print();
        
        jssc.start();
        
        jssc.awaitTermination();
        
        jssc.close();
        
    }
}

上一篇 下一篇

猜你喜欢

热点阅读