日志采集Agent

2018-11-24  本文已影响161人  john不哭

前言

日志采集这一领域,好像没多少技术含量。但是如果往下挖技术细节,可以涉及到Linux文件系统的工作原理(文件回收规则);进程管理(如何保证进程不会被杀死,杀死后如何恢复等问题);如何保证日志不漏,然后到日志不重复;从轮询收集,到inotify事件收集;如何处理日志接收方背压的问题等等。

业界方案

采集agent的方案有以下两种代表:

参考资料

https://yq.aliyun.com/articles/204554?spm=5176.10695662.1996646101.searchclickresult.6ed6ff98OMv6De
http://www.man7.org/linux/man-pages/man7/inotify.7.html
https://www.elastic.co/guide/en/beats/filebeat/current/index.html

核心问题

进阶问题:

基本概念

Demo

Version0.1(模仿Tail 实现日志采集,拥有断点续传功能。)

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time


class Agent(object):
    def __init__(self):
        pass

    def time_count(fun):  # fun = world
        #  @functools.wraps(fun) 的作用就是保留原函数信息如__name__, __doc__, __module__
        @functools.wraps(fun)
        def wrapper(*args, **kwargs):
            """
            this is wrapper function
            :param args:
            :param kwargs:
            :return:
            """
            start_time = time.time()
            temp = fun(*args, **kwargs)  # world(a=1, b=2)
            end_time = time.time()
            print("%s函数运行时间为%s" % (fun.__name__, end_time - start_time))
            return temp

        return wrapper

    @time_count
    def collect(self, file_name):
        f = open(file_name, "r")
        length = 0

        #Recover from the lastest readed.
        try:
            loc = open("tail-seek", "r")
            pre_location = loc.readline()[:-1]
            if pre_location != "":
                f.seek(int(pre_location))
                length += int(pre_location)
                loc.close()
        except:
            pass
        loc = open("tail-seek", "w", buffering=0)

        # Avoid the overload the agent cpu and disk.
        # __loop = 0
        while 1:
            temp = f.readline()
            print temp[:-1]

            # Avoid the overload the agent cpu and disk.
            # if __loop%1000 == 0:
            #   time.sleep(0.003)

            # Push the data to the server.
            if temp != "":
                 print temp
 
            length += len(temp)
            loc.seek(0)
            loc.write(str(length) + '\n')
            if temp == "":
                f.close()
                loc.close()
                break

if __name__ == "__main__":
    s = Agent()
    s.collect("./show")

坑:

2018/11/26更新

Version1.0(引入网络部分)

特性:

问题:

  1. 依旧为单进程,但是已经写好各种基础模块。
  2. Server端作为展示,没有把数据落盘,数据落盘时,机器断电,可能存在丢数据的风险。(依旧是版本号解决问题)。
  3. 收集方式依旧为轮询方式。
  4. 没有背压感知功能(可以通过参考Mysql 刷脏的做法,通过引入版本号,对比发送版本号和确认版本号,当发现确认版本号落后发送版本号10%时,只接收,不发送,直至服务器版本号追赶至5%。)。

服务端数据落盘设计

方案:

如何写:
1 轮询所有不同主题的内存,满足一定条件(一定条数,一定时间等多种条件下)写入。并且写入对应的文件映射关系(如:主题,1~100000条,开始\结束文件offset)。

如何读:
1 找到对应的文件映射表,找到对于的条目如31081在1~100000之间,读取offset,然后循环找到对应的条目。

如何恢复:
1 检查映射表。
2 利用写入的offset等信息重新从Agent拉取数据。

版本号初步设计

方案一:

2018/11/27更新

Version1.1 (优化采集性能,网络)

特性:

性能如下:

  1. 环境:Aliyun轻量服务器,1Core2G,40G SSD。
  2. 基准性能:
    通过dd测出磁盘性能连续读写性能大概在130MB/s之间。
  3. 日志类型:
    1. 短日志。如(787897797987897)
    2. 正常日志。如Nginx日志。
      优化前:
      1 1亿条,880Mb短日志。大概需要50秒。
      2 一千万条,3.7G日志。大概需要120秒。

优化记录:

方案:

  1. 优化Socket buff大小。
    • 修改Tcp buff为32k。

2.尝试引入压缩机制。

  1. 优化文件读取方式。
    • 每一次读取更多的字节数,减低日志收集细粒度,以8k的细粒度进行读取文件。Python函数具体实现中readlines(size),指定缓存最小就是8k,他会接近于8k以保证数据完整性。(读的粒度越大,文件check_point同步写次数越少,变相提高整体性能。)
    • 后期可以参照readlines,利用read做一个类似的功能(以4k的方式读入,4k刚好能够吻合ssd的一次读写)。
  2. 调整Queue的大小等细节,在保证性能的前提下,减少内存使用,以防止OOM。

优化后:

1 1亿条,880Mb短日志。大概需要14秒(受益于文件读取优化)。
2 1千万条,3.7G日志。大概需要55秒。

2018/11/29更新

考虑到单核存在进程调度的消耗。

在腾讯云再次测试:
条件:
Agent amd 2c4g 50g ssd
Server amd 2c4g 50g ssd
通过dd测出磁盘性能连续读写性能大概在130MB/s之间。

结果:

  1. 1亿条,880Mb短日志。大概需要15秒(受益于文件读取优化)。
  2. 1千万条,3.7G日志。大概需要45秒。

2018/12/3更新

大幅度减少Cpu占用,磁盘性能成为系统瓶颈,引入LZ4作为提高性能选择。(ps:LZ4算法需要依赖相应的python packet。)

彻底解决网络bug问题,由于Socket中 recv(len)不保证接收完整的len长度的data, 会出现数据异常问题。(通过判断每一次recv大小,直至达到len大小作为一次传输完整的数据。查了4天bug~~~)

由于毕业问题,暂停埋坑2周,任何问题请联系我18689235591@163.com,我叫John即可。

上一篇下一篇

猜你喜欢

热点阅读