数据平台实践①——Flume+Kafka+SparkStream
蜻蜓点水
Flume——数据采集
如果说,爬虫是采集外部数据的常用手段的话,那么,Flume就是采集内部数据的常用手段之一(logstash也是这方面的佼佼者)。
下面介绍一下Flume的基本构造。
- Agent:包含Source、Channel和Sink的主体,它是这3个组件的载体,是组成Flume的数据节点。
- Event:Flume 数据传输的基本单元。
- Source: 用来接收Event,并将Event批量传给Channel。
- Channel:Source和Sink之间的Event缓冲通道,它有个type属性,一般为memory,可以提高传输速度。
- Sink:负责将数据沉淀到最终存储区,或沉淀给下一个source,形成数据流。
在大致了解了以上要素之后,通过上图,我们就可以有一个大概的认识。一句话讲,Source接收数据,并转成Event单元,然后导入Channel缓冲通道,最后,经由Sink进行数据沉淀。当然这里的沉淀,有多种选择,除了上图中的HDFS外,还包括HBase、File,或者作为另一个Source的源。在一系列过程,一条有序的数据流就诞生了。
Kafka——数据的发布/订阅
Kafka,作为基于发布/订阅的消息系统,以其分布式性而受到大家的喜爱。
下面介绍一下Kafka的基本构造。
- Broker(代理): Kafka集群可由一个或多个服务器组成,其中的每个服务节点称作这个集群的一个Broker。
- Topic(主题): 一个Topic对应一类消息,Topic用作为消息划分类别。
- Partition(分区): 一个Topic一般含有多个分区。
- Producer(生产者):消息生产者,负责生产Topic消息。
-
Consumer(消费者): 消息消费者,负责消费Topic消息。
Kafka
Zookeeper——服务器间协调
这里需要提一下Zookeeper,对于Kafka这样的分布式服务,大多需要多台服务器相互协调工作,且保持一致性。任意一台服务器出现问题,如果不及时处理,都有可能导致整个服务的崩溃,其后果是不堪设想的。ZooKeeper的分布式设计,可用于领导人选举、群组协同工作和配置服务等,保证了服务的一致性和可用性。
ZookeeperSpark Streaming——Spark核心API
Spark Streaming属于Spark的核心api,它支持高吞吐量、支持容错的实时流数据处理。它可以通过Kafka、HDFS、Flume等多种渠道获取数据,转换数据后利用Spark Engine进行数据处理。现在,包括Python、Java等多种高级语言都对Spark进行支持。本文使用pyspark进行编程。
Spark Streaming实践出真知
要做什么
nginx日志分析,简单统计了下PV和UV,并做了H5图表实时展示。使用的是我开发的基于ace-admin和react的管理端LogAdmin对数据进行展示。这里提供github,感兴趣的朋友可以看一下。
LogAdmin下面是我的主要步骤。
①Flume实时读入nginx日志,并将数据导入Kafka中。
②pyspark从Kafka读入数据,做实时处理,并将处理后的数据导出到redis队列中。
③编写脚本从redis中取出数据,存入mysql。
④H5展示。
【版本:Logstash1.7.0,Kafka 2.11(该版本中已集成了Zookeeper),Spark(2.0.2)】
①Flume实时读入nginx日志,并将数据导入Kafka中。
这一步中,只需配置flume.conf,并依次启动flume、zookeeper、kafka即可
flume.conf(配置中命名已较为明确,hdfs部分被注释了)
# agent-my80
# Finally, now that we've defined all of our components, tell
# agent-my80 which ones we want to activate.
#agent-my80.channels = ch1
#agent-my80.sources = avro-source1
#agent-my80.sinks = hdfs-sink1
agent-my80.channels = ch2
agent-my80.sources = exec-source1
agent-my80.sinks = kafka-sink1
# Define a memory channel called ch1 on agent-my80
#agent-my80.channels.ch1.type = memory
agent-my80.channels.ch2.type = memory
# Define an Avro source called avro-source1 on agent-my80 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
#agent-my80.sources.avro-source1.channels = ch1
#agent-my80.sources.avro-source1.type = avro
#agent-my80.sources.avro-source1.bind = 0.0.0.0
#agent-my80.sources.avro-source1.port = 44444
#agent-my80.sources.avro-source1.basenameHeader = true
agent-my80.sources.exec-source1.channels = ch2
agent-my80.sources.exec-source1.type = exec
agent-my80.sources.exec-source1.command = tail -f /home/www/logs/access.log
# # Define a logger sink that simply logs all events it receives
# # and connect it to the other end of the same channel.
#agent-my80.sinks.hdfs-sink1.channel = ch1
#agent-my80.sinks.hdfs-sink1.type = hdfs
#agent-my80.sinks.hdfs-sink1.hdfs.path = hdfs://my80:9000/flume-test
#agent-my80.sinks.hdfs-sink1.hdfs.filePrefix = event-
#agent-my80.sinks.hdfs-sink1.hdfs.filePrefix = %{basename}
#agent-my80.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
#agent-my80.sinks.hdfs-sink1.hdfs.round = true
#agent-my80.sinks.hdfs-sink1.hdfs.roundValue = 10
#agent-my80.sinks.hdfs-sink1.hdfs.fileType = DataStream
agent-my80.sinks.kafka-sink1.channel = ch2
agent-my80.sinks.kafka-sink1.type = org.apache.flume.sink.kafka.KafkaSink
agent-my80.sinks.kafka-sink1.topic = my80-log
agent-my80.sinks.kafka-sink1.brokerList = localhost:9092
agent-my80.sinks.kafka-sink1.batchSize = 20
flume启动命令
flume-ng agent --conf /usr/local/apache-flume-1.7.0-bin/conf --conf-file /usr/local/apache-flume-1.7.0-bin/conf/flume.conf --name agent-my80 -Dflume.root.logger=INFO,console
zookeeper启动命令
/usr/local/kafka_2.11-0.10.1.0/bin/zookeeper-server-start.sh /usr/local/kafka_2.11-0.10.1.0/config/zookeeper.properties
kafka启动命令
/usr/local/kafka_2.11-0.10.1.0/bin/kafka-server-start.sh /usr/local/kafka_2.11-0.10.1.0/config/server.properties
注意:有些朋友,是用自己的个人服务器做相关实践,那么会遇到内存不足的问题,这时候一般通过,修改Java堆大小来解决。比如我是修改的kafka的kafka-server-start.sh和zookeeper-server-start.sh来解决这个问题。
#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
②pyspark从Kafka读入数据,做实时处理,并将处理后的数据导出到redis队列中。
这部分为方便站点解析,我对nginx日志格式做了修改。
该步骤主要是做正则解析+MapReduce+数据导入redis,并分别将请求内容和请求ip放入redis的list和set,这样主要是方便我统计每天的PV和UV。
还要注意一点,nginx日志中包括静态文件,显然这个不能算UV和PV,所以要过滤。
calculate.py
#coding=utf8
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import re
import redis
import datetime
# 解析日志
def parse(logstring):
#使用正则解析日志
# regex = '(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}).*ip=\/(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*tbl=([a-zA-Z0-9_]+)'
regex = 'ip:(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*?time:\[(.*?)\].*?request:\"(.*?)\".*?status_code:(\d{1,3}).*?agent:\"(.*?)\"'
pattern = re.compile(regex)
m1 = pattern.search(str(logstring))
if m1 is not None:
m = m1.groups()
# print(m
if len(m)!=5 or not m[2]:
m= None
else:
hd_list=[u".js",u".css",u".jpg",u".png",u".jpeg",u".gif",u".bmp",u".woff"];
if doStrContainAnyWords(m[2],hd_list):
m= None
else:
m = None
return m
def doStrContainAnyWords(str,words=[]):
for word in words:
if word in str:
return True;
return False;
class RedisClient:
pool = None
def __init__(self):
self.getRedisPool()
def getRedisPool(self):
redisIp='localhost'
redisPort=6379
redisDB=0
self.pool = redis.ConnectionPool(host=redisIp, port=redisPort, db=redisDB)
return self.pool
def addToHashSet(self, key, value):
if self.pool is None:
self.pool = self.getRedisPool()
r = redis.Redis(connection_pool=self.pool)
hashSetName="my80-log-iphash-"+datetime.datetime.now().strftime("%Y-%m-%d");
flag=False;
if r.exists(hashSetName) is False:
flag=True
if r.hexists(hashSetName,str(key)):
r.hincrby(hashSetName, str(key), value)
else:
r.hset(hashSetName, str(key), value)
if flag is True:
r.expire(hashSetName,3600*24+300);
def addToList(self,value):
if self.pool is None:
self.pool = self.getRedisPool()
r = redis.Redis(connection_pool=self.pool)
r.lpush('my80-log-list', value)
if __name__ == '__main__':
zkQuorum = 'localhost:2181'
topic = 'my80-log'
sc = SparkContext("local[2]", appName="kafka_pyspark_redis")
ssc = StreamingContext(sc, 10)
kvs = KafkaUtils.createStream(ssc, zkQuorum, "kafka-streaming-redis", {topic: 1})
#kafka读取返回的数据为tuple,长度为2,tuple[1]为实际的数据,tuple[0]的编码为Unicode
res = kvs.map(lambda x: x[1]).map(lambda x:parse(x)).filter(lambda x:True if x is not None else False)
items = res.map(lambda item:{"ip":item[0],"time":item[1],"request":item[2],"status_code":item[3],"agent":item[4] } )
# items = res.map(lambda item:{"ip":item[0],"time":item[1] } )
# ipcount = res.map(lambda item:(item[0],1)).reduceByKey(lambda a, b: a+b).map(lambda x:{ x[0]:str(x[1]) } )
ipcount = res.map(lambda item:(item[0],1)).reduceByKey(lambda a, b: a+b)
r = RedisClient()
def handleItem(time,rdd):
if rdd.isEmpty() is False:
for element in rdd.collect():
r.addToList(element)
items.foreachRDD(handleItem)
def ipHandle(time,rdd):
if rdd.isEmpty() is False:
# rddstr = "{"+','.join(rdd.collect())+"}"
for element in rdd.collect():
r.addToHashSet(element[0], element[1] )
ipcount.foreachRDD(ipHandle)
ssc.start()
ssc.awaitTermination()
安装好spark-2.0.2-bin-hadoop2.7,脚本测试ok,最后就需要通过spark streaming提交任务(即提交calculate.py)。任务正常执行的话,数据就会从Kafka导出,经处理后,导入redis。
/usr/local/spark-2.0.2-bin-hadoop2.7/bin/spark-submit --jars /usr/local/spark-2.0.2-bin-hadoop2.7/spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar /home/dcb/python/pyspark/calculate.py
③编写脚本从redis中取出数据,存入mysql。
这一步相信大家没问题。
④H5图表展示
小结
Flume+Kafka+Spark,是一个相对比较流行且可行的实时计算组合,可定制性较高,如果项目需求比较复杂,建议深入了解后进行定制开发。