Flume 写 HBase 真实记录

2019-11-25  本文已影响0人  帅可儿妞

环境说明:

HBase:

hbase(main):001:0> version
1.2.0-cdh5.15.1, rUnknown, Thu Aug  9 09:08:28 PDT 2018

Flume:

[root@txdev-flume bin]# flume-ng version
Flume 1.7.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707
Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016
From source with checksum 0d21b3ffdc55a07e1d08875872c00523

建表

create 'flume-hbase-test-table','family1_test1'

开始配置, 排错

  1. 写好配置文件
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent-hbase'

#source: source_file sink: sink_hbase channel: channel_file_hbase
agent-hbase.sources = source_file
agent-hbase.sinks = sink_hbase
agent-hbase.channels = channel_file_hbase

# specify the channel for source and sink
agent-hbase.sources.source_file.channels = channel_file_hbase
agent-hbase.sinks.sink_hbase.channel = channel_file_hbase

#describe the source
agent-hbase.sources.source_file.type = exec
agent-hbase.sources.source_file.command = tail -f /tmp/flume_hbase_temp.log
agent-hbase.sources.source_file.checkperiodic = 50

# timestamp handling
agent-hbase.sources.source_file.interceptors.itcpt_hive_partition.type=com.xylink.bigdata.flume.interceptor.HivePartitionTimestampInterceptor$Builder

#use a channel which buffers events in memory
agent-hbase.channels.channel_file_hbase.type = memory
agent-hbase.channels.channel_file_hbase.capacity = 100000
agent-hbase.channels.channel_file_hbase.transactionCapacity = 10000

#sinks type
agent-hbase.sinks.sink_hbase.type = org.apache.flume.sink.hbase.HBaseSink
agent-hbase.sinks.sink_hbase.table = flume-hbase-test-table
agent-hbase.sinks.sink_hbase.columnFamily = familyclom1
agent-hbase.sinks.sink_hbase.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
agent-hbase.sinks.sink_hbase.serializer.regex = "name":"(.*)", "timestamp":(1[0-9]{12}), "content":"(.*)"
agent-hbase.sinks.sink_hbase.serializer.colNames = name,timestamp,content
  1. 编写启动脚本
#!/bin/sh
nohup /usr/libra/flume/bin/flume-ng agent -n agent-hbase -c /usr/libra/flume/conf-hbase -f /usr/libra/flume/conf-hbase/flume-kafka-hdfs-hbase.properties -Dflume.monitoring.type=http -Dflume.monitoring.port=5600 &> /tmp/flume_hbase.log &
  1. 添加已知的基础 jar
hbase-client-1.2.0-cdh5.15.1.jar  
hbase-common-1.2.0-cdh5.15.1.jar  
hbase-protocol-1.2.0-cdh5.15.1.jar  
htrace-core-3.2.0-incubating.jar  
netty-all-4.0.29.Final.jar # 这个是后来加进来的,为了后期查询方便就加进来了
  1. 启动脚本
./start-flume-hbase.sh
  1. 报错记录
java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration
    at org.apache.flume.sink.hbase.HBaseSink.<init>(HBaseSink.java:114)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.lang.Class.newInstance(Class.java:442)
    at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:45)
    at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:408)
    at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:102)
    at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.HBaseConfiguration
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 17 more
  1. 添加完 jar 后, 启动 agent
25 Nov 2019 11:35:06,904 INFO  [lifecycleSupervisor-1-3] (org.apache.zookeeper.Environment.logEnv:100)  - Client environment:java.library.path=
25 Nov 2019 11:35:06,904 INFO  [lifecycleSupervisor-1-3] (org.apache.zookeeper.Environment.logEnv:100)  - Client environment:java.io.tmpdir=/tmp
25 Nov 2019 11:35:06,904 INFO  [lifecycleSupervisor-1-3] (org.apache.zookeeper.Environment.logEnv:100)  - Client environment:java.compiler=<NA>
25 Nov 2019 11:35:06,904 INFO  [lifecycleSupervisor-1-3] (org.apache.zookeeper.Environment.logEnv:100)  - Client environment:os.name=Linux
25 Nov 2019 11:35:06,904 INFO  [lifecycleSupervisor-1-3] (org.apache.zookeeper.Environment.logEnv:100)  - Client environment:os.arch=amd64
25 Nov 2019 11:35:06,904 INFO  [lifecycleSupervisor-1-3] (org.apache.zookeeper.Environment.logEnv:100)  - Client environment:os.version=2.6.32-642.6.2.el6.x86_64
25 Nov 2019 11:35:06,904 INFO  [lifecycleSupervisor-1-3] (org.apache.zookeeper.Environment.logEnv:100)  - Client environment:user.name=root
25 Nov 2019 11:35:06,904 INFO  [lifecycleSupervisor-1-3] (org.apache.zookeeper.Environment.logEnv:100)  - Client environment:user.home=/root
25 Nov 2019 11:35:06,904 INFO  [lifecycleSupervisor-1-3] (org.apache.zookeeper.Environment.logEnv:100)  - Client environment:user.dir=/usr/libra/flume/lib
25 Nov 2019 11:35:06,905 INFO  [lifecycleSupervisor-1-3] (org.apache.zookeeper.ZooKeeper.<init>:438)  - Initiating client connection, connectString=localhost:2181 sessionTimeout=90000 watcher=hconnection-0x7934e2510x0, quorum=localhost:2181, baseZNode=/hbase
25 Nov 2019 11:35:06,939 INFO  [lifecycleSupervisor-1-3-SendThread(VM_48_19_centos:2181)] (org.apache.zookeeper.ClientCnxn$SendThread.logStartConnect:975)  - Opening socket connection to server VM_48_19_centos/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
25 Nov 2019 11:35:06,980 WARN  [lifecycleSupervisor-1-3-SendThread(VM_48_19_centos:2181)] (org.apache.zookeeper.ClientCnxn$SendThread.run:1102)  - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
25 Nov 2019 11:35:07,095 DEBUG [lifecycleSupervisor-1-3] (org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.retryOrThrow:272)  - Possibly transient ZooKeeper, quorum=localhost:2181, exception=org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase/hbaseid
25 Nov 2019 11:35:08,096 INFO  [lifecycleSupervisor-1-3-SendThread(VM_48_19_centos:2181)] (org.apache.zookeeper.ClientCnxn$SendThread.logStartConnect:975)  - Opening socket connection to server VM_48_19_centos/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
25 Nov 2019 11:35:08,096 WARN  [lifecycleSupervisor-1-3-SendThread(VM_48_19_centos:2181)] (org.apache.zookeeper.ClientCnxn$SendThread.run:1102)  - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect

原因是没有配置 hbase 的 zk的相关配置, 于是把从 CDH 中 hbase-site.xml下载下来, 放在conf 下面, 重启这个问题就解决了, 当然也可以直接添加属性如下:

agent-hbase.sinks.sink_hbase.zookeeperQuorum = x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181
  1. 添加完成 zk 的地址和端口列表之后, 重新启动
25 Nov 2019 14:00:51,287 ERROR [lifecycleSupervisor-1-1] (org.apache.flume.sink.hbase.HBaseSink.start:153)  - Could not load table, flume-hbase-test-table from HBase
java.io.IOException: java.lang.reflect.InvocationTargetException
    at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:240)
    at org.apache.hadoop.hbase.client.ConnectionManager.createConnection(ConnectionManager.java:433)
    at org.apache.hadoop.hbase.client.ConnectionManager.createConnection(ConnectionManager.java:426)
    at org.apache.hadoop.hbase.client.ConnectionManager.getConnectionInternal(ConnectionManager.java:304)
    at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:186)
    at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:152)
    at org.apache.flume.sink.hbase.HBaseSink$1.run(HBaseSink.java:144)
    at org.apache.flume.sink.hbase.HBaseSink$1.run(HBaseSink.java:141)
    at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
    at org.apache.flume.sink.hbase.HBaseSink.start(HBaseSink.java:141)
    at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:45)
    at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:238)
    ... 19 more
Caused by: java.lang.NoClassDefFoundError: io/netty/channel/EventLoopGroup
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2013)
    at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1978)
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2072)
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2098)
    at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.<init>(ConnectionManager.java:667)
    ... 24 more
Caused by: java.lang.ClassNotFoundException: io.netty.channel.EventLoopGroup
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 31 more

缺少 jar: netty-all-4.0.29.Final.jar

  1. 添加 jar 后启动
25 Nov 2019 14:09:30,511 DEBUG [lifecycleSupervisor-1-1] (org.apache.hadoop.util.Shell.checkHadoopHome:320)  - Failed to detect a valid hadoop home directory
java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
    at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:302)
    at org.apache.hadoop.util.Shell.<clinit>(Shell.java:327)
    at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
    at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:104)
    at org.apache.hadoop.security.Groups.<init>(Groups.java:86)
    at org.apache.hadoop.security.Groups.<init>(Groups.java:66)
    at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:280)
    at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:265)
    at org.apache.hadoop.hbase.security.UserProvider.<clinit>(UserProvider.java:56)
    at org.apache.hadoop.hbase.client.HConnectionKey.<init>(HConnectionKey.java:72)
    at org.apache.hadoop.hbase.client.ConnectionManager.getConnectionInternal(ConnectionManager.java:300)
    at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:186)
    at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:152)
    at org.apache.flume.sink.hbase.HBaseSink$1.run(HBaseSink.java:144)
    at org.apache.flume.sink.hbase.HBaseSink$1.run(HBaseSink.java:141)
    at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
    at org.apache.flume.sink.hbase.HBaseSink.start(HBaseSink.java:141)
    at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:45)
    at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

这个由于我们的 Flume 是单独部署的, 没有在 CDH 的管理之下

验证

启动 Flume 查看, 没有发现报错, 然后又去 hbase 中查看数据, 结果:

hbase(main):001:0> scan 'flume-hbase-test-table'
ROW                                                          COLUMN+CELL
0 row(s) in 0.1800 seconds

我以为数据还没有刷够, 就一直刷数据, 但是 HBase 中的数据迟迟不来...

不报错也没有数据, 结果就手足无措了, ...只能硬着头皮去看源码了,

配置远程调试

从 GitHub 上Clone 了 Flume 的源码,切换分支到 1.7.0, 配置Flume 的远程调试
编辑<FLUME_HOME>/bin/flume-ng, 找到以下代码

# set default params
FLUME_CLASSPATH=""
FLUME_JAVA_LIBRARY_PATH=""
JAVA_OPTS="-Xmx20m"
LD_LIBRARY_PATH=""

修改JAVA_OPTS为(端口自已定一下, 我们的 5005 被占用了):

JAVA_OPTS="-Xmx1024m -Xdebug -Xrunjdwp:transport=dt_socket,address=45678,server=y,suspend=y"

接着本地 IDEA 配置一下, 这里就不多说了
...
在代码中找到flume-ng-hbase-sinkHBaseSink.java打了个断点, 一步一步的执行下去
...
最后发现, 每批数据在写 HBase 的时候都会rollback, 结果就懵逼了, Flume 的日志中根本就没有报错啊, 报错信息和解决方案参考这里: java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Put.setWriteToWAL, 其实即使把所有的setWriteToWAL调用注释掉, 然后就是重新打包, 测试可能比较慢, 耐心等待一下, 重新传包, 重启 Flume
...
然后你就发现 HBase 中就有数据了...
艰难的一批....
后面的就是解决 rowkey

自定义 rowkey

目前这部分还没有做,但是官方给我们留接口了, 在 RegexHBaseEventSerilizer 中的 getRowkeys 方法不是 private 的, 而是 protected, 那么我们完全可以继承RegexHBaseEventSerilizer然后重写getRowKey方法, 然后 flume 的配置中的类就是我们自定义的RegexHBaseEventSerilizer的子类了

因为在探索的过程中花了一些时间, 导致项目有点延期, 所以订到决定不用这个方案了,(直接使用 spark 写入 HBase), 觉得风险太大, 毕竟修改了源码, 但是我在这里给了一个修改 rowkey 的思路, 以备后面重新考虑的时候好上手, 毕竟我的脑子不太好用, 哈哈哈, 拜拜

上一篇下一篇

猜你喜欢

热点阅读