日志采集Agent
前言
日志采集这一领域,好像没多少技术含量。但是如果往下挖技术细节,可以涉及到Linux文件系统的工作原理(文件回收规则);进程管理(如何保证进程不会被杀死,杀死后如何恢复等问题);如何保证日志不漏,然后到日志不重复;从轮询收集,到inotify事件收集;如何处理日志接收方背压的问题等等。
业界方案
采集agent的方案有以下两种代表:
- 以Flume为代表
- 以Elastic公司为代表的logstash ,beat系列。其中本文重点研究了并且参考Filebeat。
参考资料
https://yq.aliyun.com/articles/204554?spm=5176.10695662.1996646101.searchclickresult.6ed6ff98OMv6De
http://www.man7.org/linux/man-pages/man7/inotify.7.html
https://www.elastic.co/guide/en/beats/filebeat/current/index.html
核心问题
- 轮转时,文件引用次数为零(压缩完成后,发生服务器断电)。导致日志丢失 (通过硬连接hold住文件解决)
- 如何判断为一条完整日志 (通过\N等)
- 采集进程被杀如何恢复工作环境
进阶问题:
- 配置管理问题:如何下放配置,如何热更新配置等问题。
- 资源限制问题,如何限制资源占用上限。
完整代码地址:
https://github.com/Whojohn/log_demo
基本概念
- 文件系统
参考:
https://www.cnblogs.com/JimMoriarty/p/3594335.html
https://www.linuxprobe.com/linux-system-structure.html
https://www.ibm.com/developerworks/cn/linux/l-cn-hardandsymb-links/
标识文件系统中文件的inode。文件操作相关(打开,硬连接)的引用计数d_count,i_links_count(硬连接计数)等。 - 轮转
一句话来说就是打包,压缩的过程。对日志收集的影响无非就是压缩完成太快,Agent上传速度太慢,机器还断电了。导致文件的引用计数为零,文件系统gc掉没有完成上传的日志的问题。类似以下过程:
tail log
mv log log.1
tar -zxvf log.1
tail log(文件已经被gc掉,日志丢失)
Demo
Version0.1(模仿Tail 实现日志采集,拥有断点续传功能。)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
class Agent(object):
def __init__(self):
pass
def time_count(fun): # fun = world
# @functools.wraps(fun) 的作用就是保留原函数信息如__name__, __doc__, __module__
@functools.wraps(fun)
def wrapper(*args, **kwargs):
"""
this is wrapper function
:param args:
:param kwargs:
:return:
"""
start_time = time.time()
temp = fun(*args, **kwargs) # world(a=1, b=2)
end_time = time.time()
print("%s函数运行时间为%s" % (fun.__name__, end_time - start_time))
return temp
return wrapper
@time_count
def collect(self, file_name):
f = open(file_name, "r")
length = 0
#Recover from the lastest readed.
try:
loc = open("tail-seek", "r")
pre_location = loc.readline()[:-1]
if pre_location != "":
f.seek(int(pre_location))
length += int(pre_location)
loc.close()
except:
pass
loc = open("tail-seek", "w", buffering=0)
# Avoid the overload the agent cpu and disk.
# __loop = 0
while 1:
temp = f.readline()
print temp[:-1]
# Avoid the overload the agent cpu and disk.
# if __loop%1000 == 0:
# time.sleep(0.003)
# Push the data to the server.
if temp != "":
print temp
length += len(temp)
loc.seek(0)
loc.write(str(length) + '\n')
if temp == "":
f.close()
loc.close()
break
if __name__ == "__main__":
s = Agent()
s.collect("./show")
坑:
-
参考:https://stackoverflow.com/questions/620367/how-to-jump-to-a-particular-line-in-a-huge-text-file
windows中当前行字符总长度,不等于当前行文件偏移位置(linux暂时没发现这个问题)。
解决方案:
1 使用f.tell()确定当前文件偏移位。 -
已知进程恢复,可能会导致agent重复上传(上传完成后,准备同步写保存文件偏移位时,同步写还未被写入文件。)。如果先同步写,再上传又会导致日志丢失。
解决方案:
1 可以通过双方模拟tcp的ack机制进行,第一版不打算实现这个功能。即通过服务端进行一个版本号(其实就是一个顺序号)进行处理。(完美方案,保证双方都能不重,不漏,但是有一定消耗。)
2 Agent端逻辑不变,服务端自行对文件版本号进行对比,落后就丢弃。优雅一点服务端告知Agent已经收到版本号(行号),然后指定Agent按照最新版本号上传,类似方案1。 -
记录文件偏移位,同步写的消耗问题。简单测试了一下,没有同步写大概能快10%左右的样子。
优化方案:
1 Agent同步写的逻辑改为1000行写一次offset地址。这样恢复的时候重复问题会变得严重,可以通过服务器丢弃落后的日志解决(也可以引入版本号解决)。
2018/11/26更新
Version1.0(引入网络部分)
特性:
- 硬连接保证日志收集完成才释放文件。
- 引入网络部分,简单的server接收者,作为Demo。
代码见https://github.com/Whojohn/log_demo(version1.0部分)
问题:
- 依旧为单进程,但是已经写好各种基础模块。
- Server端作为展示,没有把数据落盘,数据落盘时,机器断电,可能存在丢数据的风险。(依旧是版本号解决问题)。
- 收集方式依旧为轮询方式。
- 没有背压感知功能(可以通过参考Mysql 刷脏的做法,通过引入版本号,对比发送版本号和确认版本号,当发现确认版本号落后发送版本号10%时,只接收,不发送,直至服务器版本号追赶至5%。)。
服务端数据落盘设计
方案:
- 同一主题的日志放置在同一内存中(deque)。
- 为了能够利用顺序写,尽可能压榨性能,不同主题的日志合并写入到同一文件中。
如何写:
1 轮询所有不同主题的内存,满足一定条件(一定条数,一定时间等多种条件下)写入。并且写入对应的文件映射关系(如:主题,1~100000条,开始\结束文件offset)。
如何读:
1 找到对应的文件映射表,找到对于的条目如31081在1~100000之间,读取offset,然后循环找到对应的条目。
如何恢复:
1 检查映射表。
2 利用写入的offset等信息重新从Agent拉取数据。
版本号初步设计
方案一:
- 机器唯一标识(ip)+log文件唯一标识+offset作为版本号, Server端需要重传时候,需要发对应的版本号以及特殊的重传标识到对应的Agent上即可。Agent提取出对应的offset,继续上传。
坑: - offset可能数字可能会很大。
2018/11/27更新
Version1.1 (优化采集性能,网络)
特性:
- Server数据落盘雏形(没有实现版本号)。
- Agent通过多条日志打包成一条TCP报文,Server端通过弃用rfile.readline(Python循环太慢,短报文时间完全浪费在循环上。)。大幅度提高Agent采集性能,Server数据落盘性能。
性能如下:
- 环境:Aliyun轻量服务器,1Core2G,40G SSD。
- 基准性能:
通过dd测出磁盘性能连续读写性能大概在130MB/s之间。 - 日志类型:
- 短日志。如(787897797987897)
- 正常日志。如Nginx日志。
优化前:
1 1亿条,880Mb短日志。大概需要50秒。
2 一千万条,3.7G日志。大概需要120秒。
优化记录:
方案:
- 优化Socket buff大小。
- 修改Tcp buff为32k。
2.尝试引入压缩机制。
- 优化文件读取方式。
- 每一次读取更多的字节数,减低日志收集细粒度,以8k的细粒度进行读取文件。Python函数具体实现中readlines(size),指定缓存最小就是8k,他会接近于8k以保证数据完整性。(读的粒度越大,文件check_point同步写次数越少,变相提高整体性能。)
- 后期可以参照readlines,利用read做一个类似的功能(以4k的方式读入,4k刚好能够吻合ssd的一次读写)。
- 调整Queue的大小等细节,在保证性能的前提下,减少内存使用,以防止OOM。
优化后:
1 1亿条,880Mb短日志。大概需要14秒(受益于文件读取优化)。
2 1千万条,3.7G日志。大概需要55秒。
2018/11/29更新
考虑到单核存在进程调度的消耗。
在腾讯云再次测试:
条件:
Agent amd 2c4g 50g ssd
Server amd 2c4g 50g ssd
通过dd测出磁盘性能连续读写性能大概在130MB/s之间。
结果:
- 1亿条,880Mb短日志。大概需要15秒(受益于文件读取优化)。
- 1千万条,3.7G日志。大概需要45秒。