浅谈实时流同步Hive方案

2020-03-07  本文已影响0人  少年_7b60

实时流同步hive

同步实时流数据时,首先要看数据类型,如果是append流,则比较简单,
如果数据流存在根据id更新,删除的情况,则同步到es,hbase等分布书数据库比较简单

同步append流

思路

每天写一个分区,每小时消费一次,使用hdfs的append模式往同一个目录写,这样可能出现每天分区中的小文件较多的情况,可以使用hive查询进行文件merge。

以kafka+sparkstreaming+hive为例

  1. sparkstreaming消费kafka数据写hdfs
override def handle(ssc: StreamingContext): Unit = {
    val conf = ssc.sparkContext.getConf

    val source = new KafkaDirectSource[String, String](ssc)
    val lines: DStream[ActualTraceData] = source.getDStream(_.value()).map(RealTimeData.parse[ActualTraceData]).map(_.body)

    val spark = SparkSession.builder.config(ssc.sparkContext.getConf).getOrCreate()
    import spark.implicits._

    lines.foreachRDD(rdd => {
    val df: DataFrame = rdd.toDF()
        .withColumn("date", lit(DateTimeUtil.currentDateStr()))
    df.write.mode(SaveMode.Append)
        .option("path", conf.get("spark.trace.hive.output"))
        .partitionBy("date", "datatype")
        .option("delimiter", "\t")
        .format("csv").save()
    })
}
  1. 创建hive外部表
CREATE EXTERNAL TABLE `trace.online`(
`id` string COMMENT 'id',
`data` string COMMENT '数据',
`key1` string COMMENT '索引1',
`key2` string COMMENT '索引2',
`key3` string COMMENT '索引3',
`key4` string COMMENT '索引4',
`key5` string COMMENT '索引5',
`ts` bigint COMMENT '时间戳'
) PARTITIONED BY (
`date` string COMMENT '日期',
`datatype` string COMMENT '数据分类')
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
location 'hdfs:///hive/trace';
  1. 每天刷hive分区,这里有两种办法,执行ddl或是hive 自动检测修复分区

执行ddl添加分区,ddl如下

alter table `trace.online` add PARTITION (date='20200306', datatype='trace_test_input') 
location 'hdfs:///hive/trace/date=20200306/datatype=trace_test_input';

执行MSCK REPAIR TABLE trace.online;

同步普通数据流

思路

由于hive底层是hdfs,没有按主键更新记录的功能。假设数据按天的频率同步,则数据可按天分区,每个分区都是数据的一个快照,消费数据时,将拿到的数据和前一个分区的数据进行join和过滤,再将其存到当天的分区中,此分区即是数据流的最新快照了。

上一篇下一篇

猜你喜欢

热点阅读