Mysql 到 Hbase 数据如何实时同步,强大的 Strea
很多情况大数据集群需要获取业务数据,用于分析。通常有两种方式:
- 业务直接或间接写入的方式
- 业务的关系型数据库同步到大数据集群的方式
第一种可以是在业务中编写代码,将觉得需要发送的数据发送到消息队列,最终落地到大数据集群。
第二种则是通过数据同步的方式,将关系型数据同步到大数据集群,可以是存储在 hdfs 上,使用 hive 进行分析,或者是直接存储到 hbase 中。
其中数据同步又可以大致分为两种:增量同步、CRUD 同步。
增量同步是只将关系型数据库中新增的数据进行同步,对于修改、删除操作不进行同步,这种同步方式适用于那些一旦生成就不会变动的数据。
CRUD 同步则是数据的增、删、改都需要进行同步,保证两个库中的数据一致性。
本文不讲 binlog + Canal + 消息队列 + JAR 实现数据实时同步的方案,也不讲使用 Sqoop 进行离线同步。而是讲解如何使用 Streamsets 零代码完成整个实时同步流程。关于 Streamsets 具体是什么,以及能做哪些其他的事情,大家可以前往 Streamsets 官网进行了解。从笔者了解的信息,在数据同步方面 Streamsets 十分好用。
要实现 mysql 数据的实时同步,首先我们需要打开其 binlog 模式,具体怎么操作网上有很多教程,这里就不进行阐述了。
那么,现在就直接进入正题吧。
安装
下载
Streamsets 可以直接从官网下载: https://archives.streamsets.com
这里安装的是 Core Tarball 格式,当然你也可以直接选择下载 Full Tarball、Cloudera Parcel 或者其他格式。下载 Core Tarball 的好处是体积小,后期需要什么库的时候可以自行在 Streamsets Web 页进行下载。相对于 Core Tarball,Full Tarball 默认帮你下载了很多库,但是文件体积相对较大(>4G),并且可能很多库我们暂时使用不到。
![streamsets](https://www.kooola.com/upload/2019/01/3dcitd62a0haur6ulod650e2sm.png)
或者你可以直接使用这个链接进行下载:https://archives.streamsets.com/datacollector/3.7.1/tarball/streamsets-datacollector-core-3.7.1.tgz
解压启动
Streamsets Core Tarball 下载好后,直接解压就可以使用,非常方便。
tar xvzf streamsets-datacollector-core-3.7.1.tgz
cd streamsets-datacollector-3.7.1/bin/
./streamsets dc
Java 1.8 detected; adding $SDC_JAVA8_OPTS of "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Djdk.nio.maxCachedBufferSize=262144" to $SDC_JAVA_OPTS
Configuration of maximum open file limit is too low: 1024 (expected at least 32768). Please consult https://goo.gl/6dmjXd
如果在运行的时候遇到上面的报错,修改操作系统的 open files 限制数量即可。
#vi /etc/security/limits.conf
添加两行内容:
- soft nofile 65536
- hard nofile 65536
运行 'ulimit -n' 既可以看到 open files 设置值已生效。
Web 页
Streamsets 拥有一个 Web 页,默认端口是 18630。浏览器中输入 ip:18630 即可进入 streamsets 的页面,默认用户名、密码都是 admin。
![streamsets](https://www.kooola.com/upload/2019/01/4dkjlpipt4jajo5mlpak4dodbl.png)
Pipeline
准备工作
因为需要将 mysql 的数据实时同步到 hbase 中,但是下载的 Core Tarball 中没有 MySQL Binary Log 以及 hbase 两个 stage library,所以在 create new pipeline 之前需要先安装它们。
安装 MySQL Binary Log 库
![streamsets](https://www.kooola.com/upload/2019/01/0prcsaekf0h74ran2jgc6fnm03.png)
安装 Hbase 库,这里注意一下,hbase 库位于 CDH 中,所以选择一个 CDH 版本进行安装
![streamsets](https://www.kooola.com/upload/2019/01/ps20dnh70cjrspa5l004lqrp38.png)
安装好后在 Installed Stage Libraries 中就能看到已经安装了 MySQL Binary Log 和 Hbase
![streamsets](https://www.kooola.com/upload/2019/01/178mill4m6isoqkpjb678g4g27.png)
创建 Pipeline
MySQL Binary Log
创建一个 MySQL Binary Log
![streamsets](https://www.kooola.com/upload/2019/01/vofqrqcj3sg1tq01samaal4qrf.png)
设置 mysql 的连接参数(Hostname, Port 以及 Server ID),这里的 Server ID 与 mysql 配置文件(一般是 /etc/my.cnf)中的 server-id 保持一致
![streamsets](https://www.kooola.com/upload/2019/01/s1p9citdcsjquq4588o4b3tng8.png)
设置 mysql 的用户名、密码
![streamsets](https://www.kooola.com/upload/2019/01/qjia4uqne0g4tpn0ag34eoneee.png)
其他设置:我们在 Include Tables 栏设置了两张表(表与表之间用逗号隔开),意思是监控这两张表的数据变化,其他表不关心。
![streamsets](https://www.kooola.com/upload/2019/01/u8aft3u1mqinqr4igkqg2mvhnu.png)
Stream Selector
创建一个 Stream Selector,并将刚刚创建的 MySQL Binary Log 指向这个 Stream Selector。 设置过滤条件, 比如说 ${record:value("/Table")=='cartype'} 就是过滤 cartype 表。
可以看到 Stream Selector 有两个出口(1 和 2),后面我们将会看到: 1 输出到 Hbase, 2 数据到 Trash
![streamsets](https://www.kooola.com/upload/2019/01/rbe0v4bcruhqkqa36h5v7t080i.png)
Hbase & Trash
分别创建 Hbase 和 Trash,连接到 Stream Selector 上
![streamsets](https://www.kooola.com/upload/2019/01/51tg2fk6skjnkohj53irtqt7jj.png)
配置 Hbase
![streamsets](https://www.kooola.com/upload/2019/01/umnj578l5ui5fqfhbarlv8b48o.png)
Trash 无需进行配置
验证 & 启动
验证
点击右上角的“眼镜”,验证整个流程是否有问题。
这里报错:"java.lang.RuntimeException:Unable to get driver instance for jdbcUrl"。这个报错的原因是缺少 mysql 连接的 jar 包。解决起来也很简单,下载一个 jar 包然后放到 streamsets 指定的目录下。我这边的完整目录是:/opt/streamsets/streamsets-datacollector-3.7.1/streamsets-libs/streamsets-datacollector-mysql-binlog-lib/lib/mysql-connector-java-5.1.26-bin.jar, mysql-connector-java-5.1.26-bin.jar 就是我下载的 jar 包。
还有一点就是事先要将 Hbase 中相对应的表创建好,不然验证会提示错误。
![streamsets](https://www.kooola.com/upload/2019/01/120tmtbbpghm0oq292ci71c9q5.png)
接着在页面上重启 streamsets 即可。
![streamsets](https://www.kooola.com/upload/2019/01/3se1im5mj2he0q6burjhqk8fsn.png)
重新验证,发现成功了。
![streamsets](https://www.kooola.com/upload/2019/01/1pti0rgcf2ivkpmvhoul71u4q1.png)
点击右上角播放标签,启动流程,这样整个流程就已经完成了(数据已经在进行实时同步),查看各个 Stage 既可以看到有多少数据流入,多少数据流出。也可以直接进入 hbase 数据库中查看是否有数据生成。
![streamsets](https://www.kooola.com/upload/2019/01/o6oviaksrej9qr3a0itt36h3g1.png)
以上就是如何使用 Streamsets 实时同步 mysql 数据到 hbase 中的整个操作流程。大家肯定发现了,整个流程没有编写任何的代码,相对于 binlog + Canal + 消息队列 + JAR 的方案是不是高效一些呢。当然任何方案都会有优缺点,Streamsets 这种方案的更多实际体验还需要更多的观察。