flink

MaxWell+kafka解析mysql binlog

2019-12-09  本文已影响0人  陌上冰火

1. maxwell简介

 maxwell,可以监听mysql binlog文件,实时进行更新,以json格式,写到kafka,redis,Kinesis,sqs,pubsub,rabbitmq,file等。
 官网:  http://maxwells-daemon.io
 下载地址:https://github.com/zendesk/maxwell

2. 修改mysql binlog格式为row模式

2.1 查看binlog是否开启

mysql> show variables like '%log_bin%';
binlog配置

2.2 退出mysql,查看配置文件,/etc/my.conf(macOS的路径)

vim /etc/my.conf
my.conf

2.3 修改binlog_format 为 row

log-bin=mysql-bin
binlog_format=row
server-id=1

2.4 重启mysql

service mysqld restart

binlog format 三种方式说明:
https://www.cnblogs.com/xingyunfashi/p/8431780.html

3. mysql权限配置

mysql> GRANT ALL on maxwell.* to 'maxwell'@'%' identified by 'XXXXXX';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';

4. 执行maxwell命令,开启监听

# 未过滤
bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \
   --producer=kafka --kafka.bootstrap.servers=localhost:9092
默认消息会写到topic为 maxwell中
执行结果 执行成功
# 过滤数据库
/usr/local/maxwell/bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \
--producer=kafka --kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092 \
--kafka_topic=maxwells  --filter 'exclude: dbName01.*, include: dbName02.*'

更多参数设置参考:http://maxwells-daemon.io/config/

5. kafka代码监听

# pom.xml
<dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.1</version>
        </dependency>
    </dependencies>
# KafkaConsumer.java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumer {

    public static void main(String[] args) {
        //连接kafka集群的参数
        Properties prop = new Properties();

        prop.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
        prop.put("group.id", "test");
        prop.put("enable.auto.commit", "true");
        prop.put("auto.commit.interval.ms", "1000");

        prop.put("key.deserializer",
             "org.apache.kafka.common.serialization.StringDeserializer");
        prop.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");

        org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(prop);

        //订阅生产者的topic
        consumer.subscribe(Arrays.asList("maxwell"));

        while (true){
            //poll获取元素
            ConsumerRecords<String, String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records){
                System.out.println("消费的数据为:"+record.value());
            }
        }
    }
}

6. 测试

6.1 运行代码

6.2 操作mysql mysql 6.3 查看idea控制台 result
上一篇 下一篇

猜你喜欢

热点阅读