storm

storm学习第二天QA(flume-kafa-storm-re

2017-06-27  本文已影响141人  小王同学加油

主要内容:解决部署运行遇到问题

Q1 为什么storm 部署的任务没有日志 也就是说没有采集任何数据

image.png

A:

A:step1 【知识点补充】 emit transferred ack和fail是什么概念

Spout的可靠性保证
在Storm中,消息处理可靠性从Spout开始的。storm为了保证数据能正确的被处理, 对于spout产生的每一个tuple,storm都能够进行跟踪,这里面涉及到了ack/fail的处理, 如果一个tuple被处理成功,那么spout便会调用其ack方法,如果失败,则会调用fail方法。而topology中处理tuple的每一个bolt都会通过OutputCollector来告知storm,当前bolt处理是否成功
当一个tuple被创建, 不管是spout还是bolt创建的, 它会被赋予一个64位的id ,而acker就是利用这个id去跟踪所有的tuple的。 每个tuple知道它的祖宗的id(从spout发出来的那个tuple的id), 每当你新发射一个tuple, 它的祖宗id都会传给这个新的tuple。 所以当一个tuple被ack的时候,它会发一个消息给acker,告诉它这个tuple树发生了怎么样的变化

A:step2 判断flume有没有采集 kafka有没有数据

tail -f /usr/local/apache-flume-1.7.0-bin/logs/flume.log

14 Jul 2017 20:46:27,733 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:177) - Shutdown Metric for type: SOURCE, name: r1. src.open-connection.count == 0

竟然没有启动 重启flume程序

flume-ng agent --conf conf -f /usr/local/apache-flume-1.7.0-bin/conf/flume-conf.properties -n agent&

-查看kafa数据搜集情况

[root@VM-10-112-179-18 logs]# kafka-console-producer.sh --broker-list 10.112.179.18:9092 --topic gome

flume启动后根本没有采集数据
修改成文件形式
agent.sinks.s1.type = file_roll
agent.sinks.s1.sink.directory =/usr/local/apache-flume-1.7.0-bin/data
Specify the channel the sink should use
agent.sinks.s1.channel = c1

image.png
 kafka-run-class.sh kafka.tools.DumpLogSegments --files \
 /usr/local/kafka_2.12-0.10.2.1/logs/gome-0/00000000000000000000.log --print-data-log 

Dumping /usr/local/kafka_2.12-0.10.2.1/logs/gome-0/00000000000000000000.log
Starting offset: 0

再次查看flume 日志

org.apache.flume.EventDeliveryException: Failed to publish events
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
        at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:212)
        ... 3 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

问题在于 这说明已经采取到 数据没有传输到kafka中去

对kafa进行监控

image.png

重新检查flume配置文件 kafa迁移其他主机 正常


image.png

估计是防火墙的原因

主机间通信:
关闭命令: service iptables stop
永久关闭防火墙:chkconfig iptables off
查看状态:service iptables status

或者topic 配置的不正确

发布storm程序出错

Caused by: java.lang.RuntimeException: java.io.NotSerializableException: org.apache.log4j.Logger

原因分析:static修饰符作用
java序列化是不能序列化static变量的
解决办法:

出错代码:
public class Customer implements Serializable{
private Logger logger =   Logger.getLogger(Customer.class)
}
修正代码:
public class Customer implements Serializable{
private static final Logger logger =   Logger.getLogger(Customer.class)
 
}

storm storm 持久化引入json格式的数据 --缺少依赖

<dependency>   
       <groupId>net.sf.json-lib</groupId>   
       <artifactId>json-lib</artifactId>   
       <version>2.4</version>   
       <classifier>jdk15</classifier>   
    </dependency> 

1. java.lang.NoClassDefFoundError: net/sf/json/JSONObject at gome.storm.model.SrsLogServeInfo.toJsonString(SrsLogServeInfo.java:31) at gome.storm.bolt.PersistentSrsLog.getEdgeTtoSrsToClitInfo
    ----->json-lib-2.4-jdk15.jar
2.  java.lang.NoClassDefFoundError: org/apache/commons/lang/exception/NestableRuntimeException at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:
   ---->commons-lang-2.5.jar 
3.  java.lang.NoClassDefFoundError: net/sf/ezmorph/Morpher at 
    ----> ezmorph-1.0.6.jar
4. java.lang.NoClassDefFoundError: org/apache/commons/logging/LogFactory at net.sf.json.AbstractJSON.<clinit>(AbstractJSON.java:53) 
   ---->commons-logging-1.1.1.jar
5. java.lang.NoClassDefFoundError: org/apache/commons/beanutils/DynaBean at     
      net.sf.json.JSONObject.fromObject(JSONObject.java:147) at 
      net.sf.json.JSONObject.fromObject(JSONObject.java:134) 
   ------>commons-beanutils 1.8.0

A2:

json-lib官方网站

作用能把对象 map 数组 xml等转换成json结构 并且解析

http://json-lib.sourceforge.net/

image.png

综上,想用一个最简单的JSON也得导入以下的6个包:
Json-lib requires (at least) the following dependencies in your classpath:
commons-lang 2.5
commons-beanutils 1.8.0
commons-collections 3.2.1
commons-logging 1.1.1
ezmorph 1.0.6
json-lib-2.4-jdk15.jar

Could not connect to Redis at 10.77.88.99:6379: Connection refused
估计是防火墙的原因

主机间通信:

关闭命令: service iptables stop
永久关闭防火墙:chkconfig iptables off
查看状态:service iptables status

结果不行 问题在redis服务器上查看redis.conf 说明

################################## NETWORK #####################################
# By default, if no "bind" configuration directive is specified, Redis listens
# for connections from all the network interfaces available on the server.
# It is possible to listen to just one or multiple selected interfaces using
# the "bind" configuration directive, followed by one or more IP addresses.
#
# Examples:
#
# bind 192.168.1.100 10.0.0.1
# bind 127.0.0.1 ::1
#
# ~~~ WARNING ~~~ If the computer running Redis is directly exposed to the
# internet, binding to all the interfaces is dangerous and will expose the
# instance to everybody on the internet. So by default we uncomment the
# following bind directive, that will force Redis to listen only into
# the IPv4 lookback interface address (this means Redis will be able to
# accept connections only from clients running into the same computer it
# is running).
#
# IF YOU ARE SURE YOU WANT YOUR INSTANCE TO LISTEN TO ALL THE INTERFACES
# JUST COMMENT THE FOLLOWING LINE.
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#bind 127.0.0.1 

翻译:

redis服务器只能接受本地连接的请求(same computer)
Redis will be able to
accept connections only from clients running into the same computer it is running

redis接受任何服务器的连接(all ) 默认配置
By default
if no "bind" configuration directive is specified, Redis listens

for connections from all the network interfaces available on the server.

bing 允许外网访问的ip(followed by one or more IP addresses)
It is possible to listen to just one or multiple selected interfaces using
the "bind" configuration directive, followed by one or more IP addresses.
Examples:
bind 192.168.1.100 10.0.0.1

对比启动

只允许本地访问.png
支持远程访问.png

Q2 重启之后从strom仍然从头开始读取kafka记录

https://github.com/apache/storm/tree/master/external/storm-kafka

https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIaccuratelygetoffsetsofmessagesforacertaintimestampusingOffsetRequest?

How KafkaSpout recovers in case of failures

参考
1 http://blog.csdn.net/yangyutong0506/article/details/46742601
2 https://github.com/dibbhatt/kafka-spark-consumer/issues/16
3 Storm消息可靠性与Ack机制
http://blog.jassassin.com/2014/10/22/storm/storm-ack/
4 Kafka 指南
http://wdxtub.com/2016/08/15/kafka-guide/
5 序列化
https://www.ibm.com/developerworks/cn/java/j-lo-serial/index.html

上一篇 下一篇

猜你喜欢

热点阅读