解决DStream的mapWithState无效key清除的问题

2019-09-28  本文已影响0人  机灵鬼鬼

该实例使用spark streaming从kafka中实时分析当天加减购的次数排名top5的用户。
实现思路就是通过reduceByKey和mapWithState结合完成数据的统计。在统计过程中遇到的问题可能大家都会遇到:
问题1:当跨天的时候,mapWithState中的缓存会存储以前计算过的过期的key,比如我们今天是10月26日,按照我们的思路只需要对10月26日发生的加购行为的用户做累加统计,10月25日及其以前key的就不要了。但是实际情况并不是这样,他依然会被mapWithState存储在缓存中,除非你重启你的程序。那么问题来了,我们怎么把已经计算过的10月25日及其以前的key从mapWithState缓存中去除呢?
解决方案:
第一步、这时候我们就用了State的超时机制,为每个key都固定设置24小时的存活时间,过期后会自动清除。这样保证sparkStreaming内在机制可以自动清理掉24小时以外的过期key。但这种情况依然会存在两天的数据key的情况,比如10月25日15点产生的key,必须等到10月26日15点才能被销毁,所以就需要第二步屏蔽操作。
第二步、从分析程序用日期屏蔽除了10月26日以外的key。
问题2:相同的key为什么会出现多次,好像并没有合并同类项?
解决方案:这是因为mapWithState是并没有合并的机制的,这一点不同于updateBykey可以自动reduceBykey。所以我们在使用mapWithState来进行词频统计的时候,要先进行reduceBykey才能避免相同的key重复出现的情况。
查看Spark Streaming消费消息的情况的工具
${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server hanode5:9092 --describe --group jis


image.png
package com.lppz.busi;

import com.lppz.common.DBUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.State;
import org.apache.spark.streaming.StateSpec;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.None;
import scala.Tuple2;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.text.SimpleDateFormat;
import java.util.*;

public class DayUpdateCartCntTopN {
    static String flag1="更新购物车接口,uid:";
    static String flag2="返回给前端的购物车信息cartInfoMap:";
    static String addSql="insert into bigdata_update_cart_cnt(uid, up_date, cnt) values(?,?,?)";
    static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd");
    static int setp=300;
    public static void main(String[] args) {
        ResourceBundle kafkaData = ResourceBundle.getBundle("META-INF/kafka");

        SparkConf sc=new SparkConf().setMaster("local[2]").setAppName("DayUpdateCartCntTopN");
        //控制sparkstreaming启动时,积压问题并设置背压机制,自适应批次的record变化,来控制任务的堆积
        //(1)确保在kill任务时,能够处理完最后一批数据,再关闭程序,不会发生强制kill导致数据处理中断,没处理完的数据丢失
        sc.set("spark.streaming.stopGracefullyOnShutdown", "true");
        //(2)开启后spark自动根据系统负载选择最优消费速率
        sc.set("spark.streaming.backpressure.enabled", "true");
        //(3)开启的情况下,限制第一次批处理应该消费的数据,因为程序冷启动 队列里面有大量积压,防止第一次全部读取,造成系统阻塞
        sc.set("spark.streaming.backpressure.initialRate", "1000");
        //(4)限制每秒每个消费线程读取每个kafka分区最大的数据量
        sc.set("spark.streaming.kafka.maxRatePerPartition", "1000");
        /**
         * 注意:
         只有(4)激活的时候,每次消费的最大数据量,就是设置的数据量,如果不足这个数,就有多少读多少,如果超过这个数字,就读取这个数字的设置的值
         只有(2)+(4)激活的时候,每次消费读取的数量最大会等于(4)设置的值,最小是spark根据系统负载自动推断的值,消费的数据量会在这两个范围之内变化根据系统情况,但第一次启动会有多少读多少数据。此后按(2)+(4)设置规则运行
         (2)+(3)+(4)同时激活的时候,跟上一个消费情况基本一样,但第一次消费会得到限制,因为我们设置第一次消费的频率了
         */

        JavaStreamingContext jssc=new JavaStreamingContext(sc, Durations.seconds(180));//180秒一次
        jssc.checkpoint(".");//设置上一个批次的值存在的目录,在生产环境中,放在hdfs某个文件下,相对安全些
        // 首先要创建一份kafka参数map
        Map<String, Object> kafkaParams = new HashMap<String, Object>();
        // 这里是不需要zookeeper节点,所以这里放broker.list
        String brokerslist=kafkaData.getString("BROKERS_LIST");
        String topics = kafkaData.getString("TOPICS");
        String groupId=kafkaData.getString("GROUP_ID");
        //Kafka服务监听端口
        kafkaParams.put("bootstrap.servers",brokerslist);
        //指定kafka输出key的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        //指定kafka输出value的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        //消费者ID,随意指定
        kafkaParams.put("group.id", groupId);
        //earliest
        //当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
        //latest
        //当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
        kafkaParams.put("auto.offset.reset", "earliest");
        //如果true,consumer定期地往zookeeper写入每个分区的offset
        kafkaParams.put("enable.auto.commit", true);
        //这里不用担心时间久了kafka的数据会很多,因为kafka有自动清理机制,
        // 默认把消息数据保存168小时,超过的都会自动清理,配置在server.properties文件中的log.retention.hours

        //我们该用另外一种方式来获取多分区中的topic和offset
        Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
        //通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定
        try {
            JavaInputDStream<ConsumerRecord<String,String>> kafkaStream = KafkaUtils.createDirectStream(
                    jssc,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.Subscribe(topicsSet, kafkaParams)
            );
            //使用map先对InputDStream进行取值操作
            //先过滤无用的数据
            JavaDStream<String> lines=kafkaStream.filter(new Function<ConsumerRecord<String, String>, Boolean>() {
                @Override
                public Boolean call(ConsumerRecord<String, String> orgRecord) throws Exception {
                    String today=sdf.format(new Date());
                    //System.out.println("时间过滤:"+today);
                    String orgVal=orgRecord.value();
                    return orgVal.contains(flag1)&&orgVal.contains(flag2)&&orgVal.startsWith(today);//满足条件的才会被留下,不满足条件都会被过滤掉
                }
            }).map(new Function<ConsumerRecord<String, String>, String>() {
                        @Override
                        public String call(ConsumerRecord<String, String> consumerRecord) throws Exception {
                            String line=consumerRecord.value();
                            int f1=line.indexOf(flag1)+flag1.length();
                            String temp=line.substring(f1);
                            int f2=temp.indexOf(",",0);
                            String lastupTime= line.substring(11,23);
                            String uid=temp.substring(0,f2);
                            String day=line.substring(0,10);
                            return uid+"_"+day;
                        }
            });
            lines.print();
            JavaPairDStream<String, Integer> pairs  = lines.mapToPair(s -> new Tuple2<>(s, 1)).reduceByKey((v1,v2)->v1+v2);
            // 只更新数值变更的数据
            Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
                    (word, one, state) -> {
                        Date newDate=new Date();
                        String today=sdf.format(newDate);
                        System.out.println("时间过滤3:"+today+",word:"+word);
                        int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
                        Tuple2<String, Integer> output = new Tuple2<>(word, sum);
                        if(word.contains(today)&&!state.isTimingOut()) {
                            state.update(sum);
                        }else{
                            
                      System.out.println("today:"+today+",word:"+word+",timeout:"+state.isTimingOut());
                     if(!state.isTimingOut()) {
                            state.remove();
                      }
                        }
                        return output;
            };

            // DStream made of get cumulative counts that get updated in every batch
            JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
                    pairs.mapWithState(StateSpec.function(mappingFunc).timeout(Durations.minutes(1440)));//24小时后缓存的key过期
            JavaPairDStream<String,Integer> fullStateDstream=stateDstream.stateSnapshots();//获取所有的未过期的key
            if (fullStateDstream != null) {
                fullStateDstream.print();
                //遍历DStream,并转换成RDD
                fullStateDstream.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
                    @Override
                    public void call(JavaPairRDD<String, Integer> tuple2JavaRDD) throws Exception {
                        if (tuple2JavaRDD.isEmpty()){
                            System.out.println("暂无可处理数据");
                            return;
                        }
                        JavaRDD<String> newRdd =tuple2JavaRDD.map(new Function<Tuple2<String, Integer>, String>() {
                            @Override
                            public String call(Tuple2<String, Integer> record) throws Exception {
                                Date newDate=new Date();
                                String today=sdf.format(newDate);
                                if (record._1.contains(today)){
                                    return record._2+"_"+record._1;
                                }else{
                                    System.out.println("排序前被过滤的数据today:"+today+",record._1:"+record._1);
                                    return null;
                                }
                            }
                        });
                        //这里对RDD的键值进行互换,并进行排序
                        JavaPairRDD<Integer,String> fRdd=newRdd.mapToPair(new PairFunction<String, Integer,String>() {
                            @Override
                            public Tuple2<Integer,String> call(String s) throws Exception {
                                if (null==s||"".equals(s)){
                                    return new Tuple2<Integer,String>(0,"needRemove");
                                }else {
                                    String[] vals = s.split("_");
                                    return new Tuple2<Integer, String>(Integer.valueOf(vals[0]), vals[1] + "_" + vals[2]);
                                }
                            }
                        }).filter(new Function<Tuple2<Integer, String>, Boolean>() {
                            @Override
                            public Boolean call(Tuple2<Integer, String> v1) throws Exception {
                                return !"needRemove".equals(v1._2);//过滤掉需要删除的key
                            }
                        }).sortByKey(false);
                        //数据处理并入库
                        processJavaRDDData(fRdd);
                        fRdd=null;
                        newRdd=null;
                        tuple2JavaRDD=null;
                    }
                });
            }else{
                System.out.println("暂无key存在");
            }
            jssc.start();
            jssc.awaitTermination();
            jssc.stop();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void processJavaRDDData(JavaPairRDD<Integer,String> fRdd) throws Exception {
        //单数据样例:[330,11111111_2019-09-08],取top5入库
        List<Tuple2<Integer, String>> top5 = fRdd.collect();
        if(top5.size()>5) {
            top5 = top5.subList(0, 5);
        }
        System.out.println("top5:"+top5);
        if(top5.size()>0) {
            Connection connection = DBUtils.getConnection();
            connection.setAutoCommit(false);
            //获取数据中日期进行删除后重新插入(kafka遍历后数据日期同一天)
            String time = fRdd.collect().get(0)._2.split("_")[1];
            String delSql = "DELETE FROM bigdata_update_cart_cnt where up_date='" + time + "'";//清空表数据,保留表结构
            PreparedStatement dst = connection.prepareStatement(delSql);
            dst.executeUpdate();
            connection.commit();
            int recordsize = 0;
            dst = connection.prepareStatement(addSql);
            for (Tuple2<Integer, String> row : top5) {
                String[] vals = row._2.split("_");
                String uid = vals[0];
                String day = vals[1];
                int cnt = row._1();
                dst.setString(1, uid);
                dst.setString(2, day);
                dst.setInt(3, cnt);
                dst.addBatch();
                if ((recordsize + 1) % setp == 0) {
                    dst.executeBatch();//执行批量处理
                }
                recordsize++;
            }
            if (recordsize % setp != 0) {//不被整除,才执行这个
                dst.executeBatch();//执行批量处理
            }
            connection.commit();//手动提交
            DBUtils.DBclose(connection, dst, null);
        }
    }

}

上一篇 下一篇

猜你喜欢

热点阅读