笔记本📒

java实现JavaPairDStream写入hbase

2020-03-04  本文已影响0人  涓涓自然卷
热爱生活.png

spark整合hbase

        <!-- hbase -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase</artifactId>
            <version>${hbase.version}</version>
            <type>pom</type>
        </dependency>

        <!-- /hbase-client -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <!-- /hbase-server -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>

        <!-- /hbase-common -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
        </dependency>

-2、主程序代码:

public class Demo04ReadFromKafkaToHbaseOk {
    private static final Logger logger = LoggerFactory.getLogger(Demo04ReadFromKafkaToHbaseOk.class);

    public static void main(String[] args) {
        // 构建SparkStreaming上下文
        SparkConf conf = new SparkConf()
                .setAppName("Demo04ReadFromKafkaToHbaseOk")
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .registerKryoClasses(new Class[]{Demo04ReadFromKafkaToHbaseOk.class})
                .set("spark.kryoserializer.buffer.mb", "256")
                .set("spark.kryoserializer.buffer.max", "512");

        SparkUtils.setMaster(conf);

        // 绑定sc参数,并设置循环取数时间间隔为5s
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
//checkpoint目录
//        jssc.checkpoint(ConfigurationManager.getProperty(Constants.STREAMING_CHECKPOINT_DIR));
        jssc.checkpoint("./streaming-checkpoint");

        // 构建kafka参数map
        // 主要放置的是连接的kafka集群的地址(broker集群的地址列表)
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", ConfigurationManager.getProperty(Constants.KAFKA_BOOTSTRAP_SERVERS));
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", ConfigurationManager.getProperty(Constants.GROUP_ID));
        kafkaParams.put("auto.offset.reset", "latest");
        //如果true,consumer定期地往zookeeper写入每个分区的offset
        kafkaParams.put("enable.auto.commit", false);

        // 构建topic set
        String kafkaTopics = ConfigurationManager.getProperty(Constants.KAFKA_TOPICS);
        String[] kafkaTopicsSplited = kafkaTopics.split(",");

        Collection<String> topics = new HashSet<>();
        for (String kafkaTopic : kafkaTopicsSplited) {
            topics.add(kafkaTopic);
        }

        try {
            // 获取kafka的数据
            final JavaInputDStream<ConsumerRecord<String, String>> stream =
                    KafkaUtils.createDirectStream(
                            jssc,
                            LocationStrategies.PreferConsistent(),
                            ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                    );


            // 获取words
            JavaDStream<String> words = getWords(stream);
//            words.print();

            //获取word,1格式数据
            JavaPairDStream<String, Integer> wordsAndOne = getWordsAndOne(words);

            //聚合本次5s的拉取的数据
            final JavaPairDStream<String, Integer> wordsCount = getWordsCount(wordsAndOne);
//            wordsCount.print();

            //历史累计 60秒checkpoint一次
            DStream<Tuple2<String, Integer>> result = getResults60s(wordsAndOne);

//            logger.info("==result print==");
//            result.print();

            //开窗函数 5秒计算一次 计算前15秒的数据聚合
            JavaPairDStream<String, Integer> result2 = getResults15s(wordsAndOne);

            logger.info("==result2 print==");
            result2.print();

            // 写入Hbase
            writeToHbase(result2);

            jssc.start();
            jssc.awaitTermination();
            jssc.close();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 获取数据
     *
     * @param stream : JavaInputDStream
     * @return :
     */
    private static JavaDStream<String> getWords(JavaInputDStream<ConsumerRecord<String, String>> stream) {
        return stream.flatMap(new FlatMapFunction<ConsumerRecord<String, String>, String>() {
            @Override
            public Iterator<String> call(ConsumerRecord<String, String> s) throws Exception {
                List<String> list = new ArrayList<>();
                //todo 获取到kafka的每条数据 进行操作
                long offset = s.offset();
                logger.info("ConsumerRecord<String, String> s =" + s);
                logger.info("offset = " + offset); // offset = 180
                logger.info("*** " + s.value() + " ***"); // *** kafka ***
                list.add(s.value());
                return list.iterator();
            }
        });
    }

    /**
     * 映射成 Tuple2<>(wordSplit, 1)格式
     *
     * @param words :JavaDStream<String>
     * @return :JavaPairDStream<String, Integer>
     */
    private static JavaPairDStream<String, Integer> getWordsAndOne(JavaDStream<String> words) {
        return (JavaPairDStream<String, Integer>) words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                String wordSplit = "未知"; //
                if (word.contains(" ")) {
                    String[] s = word.split(" ");
                    for (int i = 0; i < s.length; i++) {
                        wordSplit = s[i];
                    }
                } else {
                    wordSplit = word;
                }

//                    return new Tuple2<>(word, 1);
                return new Tuple2<>(wordSplit, 1);
            }
        });
    }

    /**
     * 聚合本次5s的拉取的数据
     *
     * @param wordsAndOne :
     * @return :
     */
    private static JavaPairDStream<String, Integer> getWordsCount(JavaPairDStream<String, Integer> wordsAndOne) {
        return wordsAndOne.reduceByKey((Function2<Integer, Integer, Integer>) (a, b) -> a + b);

    }

    /**
     * 历史累计 60秒checkpoint一次
     *
     * @param wordsAndOne :
     * @return :
     */
    private static DStream<Tuple2<String, Integer>> getResults60s(JavaPairDStream<String, Integer> wordsAndOne) {
        return wordsAndOne.updateStateByKey(((Function2<List<Integer>, org.apache.spark.api.java.Optional<Integer>, org.apache.spark.api.java.Optional<Integer>>) (values, state) -> {
            Integer updatedValue = 0;
            if (state.isPresent()) {
                updatedValue = Integer.parseInt(state.get().toString());
                logger.info("updatedValue = ", updatedValue);
            }
            for (Integer value : values) {
                updatedValue += value;
            }
            return Optional.of(updatedValue);
        })).checkpoint(Durations.seconds(30));
    }

    /**
     * 开窗函数 15 秒计算一次 计算前15秒的数据聚合
     *
     * @param wordsAndOne :
     * @return :
     */
    private static JavaPairDStream<String, Integer> getResults15s(JavaPairDStream<String, Integer> wordsAndOne) {
        return wordsAndOne.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        }, Durations.seconds(15), Durations.seconds(15));
    }

    /**
     * 写入hbase
     *
     * @param result2 : JavaPairDStream<String, Integer>
     */
    private static void writeToHbase(JavaPairDStream<String, Integer> result2) {
        result2.foreachRDD(new VoidFunction2<JavaPairRDD<String, Integer>, Time>() {
            @Override
            public void call(JavaPairRDD<String, Integer> v1, Time v2) throws Exception {
                v1.foreach(new VoidFunction<Tuple2<String, Integer>>() {
                    @Override
                    public void call(Tuple2<String, Integer> tuple2) throws Exception {

                        HBasePoolConnection.getConnection();

                        String s1 = tuple2._1;
                        String s2 = tuple2._2.toString();

                        String rowKey = s1 + "00000";

                        TableName tableName = TableName.valueOf("wxj-test4");

                        Table table = HBasePoolConnection.getConnection().getTable(tableName);

                        Put put = new Put(Bytes.toBytes(rowKey));
                        put.addColumn(Bytes.toBytes("word")
                                , Bytes.toBytes("name"), Bytes.toBytes(s1));
                        put.addColumn(Bytes.toBytes("count")
                                , Bytes.toBytes("number"), Bytes.toBytes(s2));

                        table.put(put);
                        logger.info("write to hbase success!");

                    }
                });
            }
        });
    }

}
public class SparkUtils {
    /**
     * 根据当前是否本地测试的配置
     * 决定,如何设置SparkConf的master
     */
    public static void setMaster(SparkConf conf) {
        boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
        if(local) {
            conf.setMaster("local");
        }
    }
}

/**
 * Hbase连接池
 * 
 * @author :
 * @date :
 *
 */
public class HBasePoolConnection {
    private HBasePoolConnection() {
    }

    // 连接池
    private static Connection connection = null;
    // 配置文件
    static Configuration hbaseConfiguration = HBaseConfiguration.create();

    public static Connection getConnection() {
        if (connection == null) {
            ExecutorService pool = Executors.newFixedThreadPool(10);// 建立一个固定大小的线程池
            hbaseConfiguration.addResource("hbase-site.xml");
            try {
                connection = ConnectionFactory.createConnection(hbaseConfiguration, pool);// 创建连接时,拿到配置文件和线程池
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return connection;
    }

}
public class ConfigurationManager {
    private static Properties prop = new Properties();

    static {
        try {
            InputStream in = ConfigurationManager.class
                    .getClassLoader().getResourceAsStream("application.properties");
            prop.load(in);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 获得属性
     *
     * @param key :
     * @return :
     */
    public static String getProperty(String key) {
        return prop.getProperty(key);
    }

    /**
     * 获取整数类型的配置项
     *
     * @param key :
     * @return value
     */
    public static Integer getInteger(String key) {
        String value = getProperty(key);
        try {
            return Integer.valueOf(value);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return 0;
    }

    /**
     * 获取布尔类型的配置项
     *
     * @param key :
     * @return :value
     */
    public static Boolean getBoolean(String key) {
        String value = getProperty(key);
        try {
            return Boolean.valueOf(value);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 获取Long类型的配置项
     *
     * @param key :
     * @return :
     */
    public static Long getLong(String key) {
        String value = getProperty(key);
        try {
            return Long.valueOf(value);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return 0L;
    }
/**
 * 常量接口
 */
public interface Constants {

    // * 项目配置相关的常量
    // Cluster Mode Config :
    String SPARK_LOCAL = "spark.local";

    // * Spark作业相关的常量
    String SPARK_SERIALIZER = "spark.serializer"; // 序列化类型

    // * KAFKA
    String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
    String GROUP_ID = "group.id";
    String KAFKA_TOPICS = "kafka.topics";
    String STREAMING_CHECKPOINT_DIR = "streaming.checkpoint.dir";

    // * Hbase
    String HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
    String HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT = "hbase.zookeeper.property.clientPort";

}
# Spark Config #
spark.local = ${spark.local}
spark.serializer = ${spark.serializer}
# KAFKA
kafka.bootstrap.servers = ${bootstrap.servers}
group.id = ${group.id}
kafka.topics = ${kafka.topics}
streaming.checkpoint.dir = ${streaming.checkpoint.dir}
# HBASE
hbase.zookeeper.quorum = ${hbase.zookeeper.quorum}
hbase.zookeeper.property.clientPort = ${hbase.zookeeper.property.clientPort}
# END Config #
myvalue=${myvalue}
# Spark Config #
spark.local = true
spark.serializer = org.apache.spark.serializer.KryoSerializer

# KAFKA
bootstrap.servers=×××.×××.×××.×××:9092,×××.×××.×××.×××:9092,×××.×××.×××.×××:9092
#bootstrap.servers=localhost:9092

kafka.topics=test-wxj1
#kafka.topics=myFlumeKafka
group.id=wxj
streaming.checkpoint.dir=hdfs://×××.×××.×××.×××:8022/streaming-checkpoint
# HBASE
hbase.zookeeper.quorum=cdh1,cdh2,cdh3
hbase.zookeeper.property.clientPort=2181
# end config #
myvalue=dev

好好吃饭好好睡觉.png
上一篇 下一篇

猜你喜欢

热点阅读