Nginx=>Flume=>Kafka 流程总结
nginx=>flume=>kafka
- 编写flume 日志收集文件
nginx日志
access.log====>flume
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/access.log
a1.sources.r1.shell = /bin/sh -c
a1.channels.c1.type = memory
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 这个地址是kafka的监听地址
a1.sinks.k1.brokerList = spark001:9092
# 注意这里的topic是zk的topic
a1.sinks.k1.topic = test
a1.sinks.k1.batchSize = 5
a1.sinks.k1.requiredAcks =1
#a1.sinks.k1.type = logger
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 关于Kafka的部署
Step 1: Start the zookeeper #得到QuorumPeerMain进程
zkServer.sh start
[hadoop@hadoop000 conf]$ vim zoo.cfg
修改数据的临时存放目录,默认是在tmp目录下的
dataDir=/home/hadoop/app/tmp/zk
:wq 保存退出
Step 2: Start the server 得到==》kafka进程
kafka-server-start.sh -deamon $KAFKA_HOME/config/server.properties
配置,修改解压目录下config配置文件
配置server.properties[需要注意的地方]
broker.id=0 解释:是kafka,一个broker
listeners 解释:监听的端口
host.name 解释:当前机器
log.dirs 解释:kafka的日志存放目录
zookeeper.connect zk的地址
修改好之后,保存退出
Step 3: Create a topic # 注意和之前flume的topic对应起来
kafka-topics.sh -create -zookeeper spark001:2181 -replication-factor 1 -partitions 1 -topic test
ps:kafka-topics.sh -list -zookeeper spark001:2181
Step 4: 开启之前的agent (作为生产者)
flume-ng agent --name a1 --conf . --conf-file ./lamda_imooc.conf -Dflume.root.logger=INFO,console
步骤4:扩展:控制台作为生产者
kafka-console-producer.sh --broker-list spark001:9092 --topic test
Step 5: Start a consumer (消费者)
kafka-console-consumer.sh --zookeeper spark001:2181 --topic test
最后 确保flume监听的文件在不断产生新的日志
文件位置 /root/logs/access.log
#!/bin/bash
i=1;
j=0;
while (test $i -le 6170967 )
do
j=`expr $i + 9`
sed -n $i,$j'p' /root/logs/all.log >> /root/logs/access.log
i=`expr $i + 10`
sleep 5
# echo $i
done
#新建这个shell文件,把这个shell文件执行起来
上面的操作执行之后,就会收到刷屏的结果,哈哈哈!!
nginx=>flume=>kafka
补充:
head -n 100 大日志文件.log 100_access.log
wc -l 100_access.log
spark-submit --master local[5] \
--jars $(echo /root/apps/hbase-1.2.0-cdh5.7.0/lib/*.jar | tr ' ' ',') \
--class com.csylh.spark.project.spark.ImoocStatStreamingApp \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 \
/root/jars/streaming-1.0-SNAPSHOT.jar \
spark001:2181 test test 1