Maxwell同步mysql增量表数据到kafka,并消费到hd

2023-11-05  本文已影响0人  Yobhel

1 数据通道

image.png

2 Maxwell配置
默认情况下,Maxwell会同步binlog中的所有表的数据变更记录,按照规划,有cart_info、order_info等共计11张表需进行增量同步,按理我们应对Maxwell进行配置,令其只同步这特定的11张表,但为了与实时数仓架构保持一致,此处不做相应配置,而令 Maxwell 对 binlog 中所有表的数据变更记录进行同步,并将数据全部发往 topic_db 主题。
Maxwell最终配置如下:
1)修改Maxwell配置文件config.properties

[yobhel@hadoop101 maxwell]$ vim /opt/module/maxwell/config.properties

2)全部配置参数如下

log_level=info

producer=kafka
kafka.bootstrap.servers=hadoop101:9092,hadoop101:9092,hadoop101:9092

#kafka topic配置,业务数据发往的目标主题
kafka_topic=topic_db
# mysql login info
host=hadoop101
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai

3)重新启动Maxwell

[yobhel@hadoop101 bin]$ mxw.sh restart

4)通道测试
(1)启动Zookeeper以及Kafka集群
(2)启动一个Kafka Console Consumer,消费 topic_db 主题的数据

[yobhel@hadoop101 kafka]$ kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic topic_db

(3)生成模拟数据

[yobhel@hadoop101 bin]$ cd /opt/module/data_mocker/
[yobhel@hadoop101 data_mocker]$ java -jar edu2021-mock-2022-06-18.jar

(4)观察Kafka消费者是否能消费到数据

{"database":"edu2077","table":"order_info","type":"update","ts":1645425636,"xid":37606,"commit":true,"data":{"id":23899,"user_id":16,"origin_amount":800.00,"coupon_reduce":0.00,"final_amount":800.00,"order_status":"1002","out_trade_no":"211814417714292","trade_body":"大数据技术之Zookeeper(2021最新版)等4件商品","session_id":"3a96bddb-7f94-4a0f-9a5b-1aa6fadd718c","province_id":30,"create_time":"2022-02-21 15:15:14","expire_time":"2022-02-21 15:30:14","update_time":"2022-02-21 15:15:42"},"old":{"order_status":"1001","update_time":null}}
{"database":"edu2077","table":"order_info","type":"update","ts":1645425636,"xid":37589,"commit":true,"data":{"id":23900,"user_id":473,"origin_amount":200.00,"coupon_reduce":0.00,"final_amount":200.00,"order_status":"1003","out_trade_no":"462573352988853","trade_body":"尚硅谷大数据技术之Azkaban等1件商品","session_id":"d78dd675-5a38-4e33-b431-b1ef68a89089","province_id":29,"create_time":"2022-02-21 11:26:30","expire_time":"2022-02-21 11:41:30","update_time":"2022-02-21 11:41:47"},"old":{"order_status":"1001","update_time":null}}
{"database":"edu2077","table":"order_info","type":"update","ts":1645425636,"xid":37694,"commit":true,"data":{"id":23901,"user_id":70,"origin_amount":400.00,"coupon_reduce":0.00,"final_amount":400.00,"order_status":"1002","out_trade_no":"677577676596486","trade_body":"尚硅谷大数据技术之Shell等2件商品","session_id":"9b842bcc-3288-49da-8ec2-0e00d743b783","province_id":33,"create_time":"2022-02-21 19:45:13","expire_time":"2022-02-21 20:00:13","update_time":"2022-02-21 19:45:33"},"old":{"order_status":"1001","update_time":null}}

3 Flume配置

1)Flume配置概述
Flume需要将Kafka中各topic的数据传输到HDFS,故其需选用KafkaSource以及HDFSSink,Channe选用FileChanne。
需要注意的是,KafkaSource需订阅Kafka中的11个topic,HDFSSink需要将不同topic的数据写到不同的路径,并且路径中应当包含一层日期,用于区分每天的数据。关键配置如下:


image.png

具体数据示例如下:


image.png
2)Flume配置实操
(1)创建Flume配置文件
在hadoop103节点的Flume的job目录下创建kafka_to_hdfs_db.conf
[yobhel@hadoop103 job]$ vim kafka_to_hdfs_db.conf 

(2)配置文件内容如下

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092
a1.sources.r1.kafka.topics = topic_db
a1.sources.r1.kafka.consumer.group.id = flume
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.yobhel.flume.interceptors.TimestampAndTableNameInterceptor$Builder


a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/data/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/data/flume/data/behavior2
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/edu/db/%{tableName}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0


a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

(3)编写Flume拦截器
代码:https://github.com/Yobhel121/edu-flume-interceptor

将打好的包放入到hadoop103的/opt/module/flume/lib文件夹下
3)通道测试
(1)启动Zookeeper、Kafka集群
(2)启动hadoop103的Flume

[yobhel@hadoop103 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_db.conf -Dflume.root.logger=INFO,console

(3)反注释mock.sh中关于Maxwell的内容, 执行脚本生成模拟数据

#!/bin/bash
DATA_HOME=/opt/module/data_mocker
MAXWELL_HOME=/opt/module/maxwell

function mock_data() {
  if [ $1 ]
  then
    sed -i "/mock.date/s/.*/mock.date: \"$1\"/" $DATA_HOME/application.yml
    echo "正在生成 $1 当日的数据"
  fi
  cd $DATA_HOME
      nohup java -jar "edu2021-mock-2022-03-14.jar" >/dev/null 2>&1  
}

case $1 in
"init")
  [ $2 ] && do_date=$2 || do_date='2022-02-21'
  sed -i "/mock.clear.busi/s/.*/mock.clear.busi: 1/" $DATA_HOME/application.yml
  sed -i "/mock.clear.user/s/.*/mock.clear.user: 1/" $DATA_HOME/application.yml
  mock_data $(date -d "$do_date -5 days" +%F)
  sed -i "/mock.clear.busi/s/.*/mock.clear.busi: 0/" $DATA_HOME/application.yml
  sed -i "/mock.clear.user/s/.*/mock.clear.user: 0/" $DATA_HOME/application.yml
  for ((i=4;i>=0;i--));
  do
    mock_data $(date -d "$do_date -$i days" +%F)
  done
  ;;
[0-9][0-9][0-9][0-9]-[0-1][0-9]-[0-3][0-9])

    sed -i "/mock_date/s/.*/mock_date=$1/" $MAXWELL_HOME/config.properties
    mxw.sh restart
    sleep 1  
    mock_data $1
    ;;
esac

执行脚本生成数据

[yobhel@hadoop101 bin]$ mock.sh 2022-02-22

(4)观察HDFS上的目标路径是否有数据出现
若HDFS上的目标路径已有增量表的数据出现了,就证明数据通道已经打通。

(5)数据目标路径的日期说明
仔细观察,会发现目标路径中的日期,并非模拟数据的业务日期,而是当前日期。这是由于Maxwell输出的JSON字符串中的ts字段的值,是数据的变动日期。而真实场景下,数据的业务日期与变动日期应当是一致的。

此处为了模拟真实环境,对Maxwell源码进行了改动,增加了一个参数mock_date,该参数的作用就是指定Maxwell输出JSON字符串的ts时间戳的日期,接下来进行测试。

mock.sh脚本在生成数据时会修改Maxwell的配置信息

4)编写Flume启停脚本
为方便使用,此处编写一个Flume的启停脚本
(1)在hadoop101节点的/home/yobhel/bin目录下创建脚本f3.sh

[yobhel@hadoop101 bin]$ vim f3.sh

在脚本中填写如下内容

#!/bin/bash

case $1 in
"start")
        echo " --------启动 hadoop103 业务数据flume-------"
        ssh hadoop103 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_db.conf >/dev/null 2>&1 &"
;;
"stop")

        echo " --------停止 hadoop103 业务数据flume-------"
        ssh hadoop103 "ps -ef | grep kafka_to_hdfs_db | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac

(2)增加脚本执行权限

[yobhel@hadoop101 bin]$ chmod +x f3.sh

(3)f3启动

[yobhel@hadoop101 module]$ f3.sh start

(4)f3停止

[yobhel@hadoop101 module]$ f3.sh stop

4 增量表首日全量同步

通常情况下,增量表需要在首日进行一次全量同步,后续每日再进行增量同步,首日全量同步可以使用Maxwell的bootstrap功能,方便起见,下面编写一个增量表首日全量同步脚本。
1)在~/bin目录创建mysql_to_kafka_inc_init.sh

[yobhel@hadoop101 bin]$ vim mysql_to_kafka_inc_init.sh

脚本内容如下

#!/bin/bash

# 该脚本的作用是初始化所有的增量表,只需执行一次

MAXWELL_HOME=/opt/module/maxwell

import_data() {
    $MAXWELL_HOME/bin/maxwell-bootstrap --database edu2077 --table $1 --config $MAXWELL_HOME/config.properties
}

case $1 in
cart_info | comment_info | favor_info | order_detail | order_info | payment_info | review_info | test_exam | test_exam_question | user_info | vip_change_detail)
  import_data $1
  ;;
"all")
  for tmp in cart_info comment_info favor_info order_detail order_info payment_info review_info test_exam test_exam_question user_info vip_change_detail
  do
    import_data $tmp
  done
  ;;
esac

2)为mysql_to_kafka_inc_init.sh all增加执行权限

[yobhel@hadoop101 bin]$ chmod +x ~/bin/mysql_to_kafka_inc_init.sh

3)测试同步脚本
(1)清理历史数据
为方便查看结果,现将HDFS上之前同步的增量表数据删除

[yobhel@hadoop101 ~]$ hadoop fs -ls /origin_data/edu/db | grep _inc | awk '{print $8}' | xargs hadoop fs -rm -r -f

(2)执行同步脚本

[yobhel@hadoop101 bin]$ mysql_to_kafka_inc_init.sh all 

4)检查同步结果
观察HDFS上是否重新出现增量表数据。

5 增量表同步总结

增量表同步,需要在首日进行一次全量同步,后续每日才是增量同步。首日进行全量同步时,需先启动数据通道,包括Maxwell、Kafka、Flume,然后执行增量表首日同步脚本mysql_to_kafka_inc_init.sh进行同步。后续每日只需保证采集通道正常运行即可,Maxwell便会实时将变动数据发往Kafka。

上一篇下一篇

猜你喜欢

热点阅读