架构运维监控运维的那点事

部署efk+kafka高并发日志收集系统

2019-07-21  本文已影响151人  无字天书


一架构&详解

架构图:

本次实验主要以采集nginx日志为例,采用filebeat采集日志,传输到kafka,然后到logstash处理后存储到es中,kibana再从es中读取数据做数据可视化展示。

详情:

elk212 ==> 192.168.68.212

elk214 ==> 192.168.68.213

elk214 ==> 192.168.68.214

web50 ==> 192.168.68.50

二工具&原料

1、系统环境与软件详情

elasticsearch-7.2.0

kibana-7.2.0

logstash-7.2.0

filebeat-7.2.0

kafka_2.12-2.2.1

jdk-8u211-linux-x64

系统详情:[elk212,elk213,elk214]

[root@elk212 iso]# cat /etc/redhat-release

CentOS Linux release 7.4.1708 (Core)

[root@elk212 iso]# uname -r

3.10.0-693.el7.x86_64

[root@elk212 ~]# sed -i 's/SELINUX=enforcing/SELINUX=disabled/' /etc/selinux/config

[root@elk212 ~]# setenforce 0

[root@elk212 ~]# systemctl stop firewalld

[root@elk212 ~]# systemctl disable firewalld

[root@elk212 iso]# yum -y install iptables-services

[root@elk212 iso]# iptables -F

[root@elk212 iso]# service iptables save

[root@elk212 ~]# echo '192.168.68.212 elk212' >> /etc/hosts

[root@elk212 iso]# vim /etc/security/limits.conf

* soft nofile 100000

* hard nofile 100000

* soft nproc 100000

* hard nproc 100000

[root@elk212 iso]# ulimit -SHn 100000

[root@elk212 es]# echo 'vm.max_map_count=262144' >> /etc/sysctl.conf

[root@elk212 es]# sysctl -p

vm.max_map_count = 262144

三方法&步骤

1、服务端

1)安装java[elk212,elk213,elk214]

[root@elk212 iso]# tar -xvf jdk-8u211-linux-x64.tar.gz

[root@elk212 iso]# mv jdk1.8.0_211 /usr/local/java

[root@elk212 iso]# vim /etc/profile

export JAVA_HOME=/usr/local/java

export PATH=$JAVA_HOME/bin:$PATH

[root@elk212 iso]# source /etc/profile

[root@elk212 iso]# java -version

java version "1.8.0_211"

Java(TM) SE Runtime Environment (build 1.8.0_211-b12)

Java HotSpot(TM) 64-Bit Server VM (build 25.211-b12, mixed mode)

2)安装kafka[elk212,elk213,elk214]

****安装配置****

[root@elk212 iso]# tar -xvf kafka_2.12-2.2.1.tgz

[root@elk212 iso]# mv kafka_2.12-2.2.1 /usr/local/kafka

[root@elk212 iso]# cd /usr/local/kafka/config

[root@elk212 config]# grep -Ev "^$|^#" server.properties

broker.id=1    #elk213:2  elk214:3

delete.topic.enable=true

listeners=PLAINTEXT://192.168.68.212:9092  #本机ip

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

log.dirs=/data/kafka-logs

num.partitions=12

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

zookeeper.connect=192.168.68.212:2181,192.168.68.213:2181,192.168.68.214:2181

zookeeper.connection.timeout.ms=10000

group.initial.rebalance.delay.ms=0

[root@elk212 config]# grep -Ev "^$|^#" consumer.properties

bootstrap.servers=192.168.68.212:9092,192.168.68.213:9092,192.168.68.214:9092    

zookeeper.connection.timeout.ms=60000

group.id=test-consumer-group

[root@elk212 config]# grep -Ev "^$|^#" producer.properties

bootstrap.servers=192.168.68.212:9092,192.168.68.213:9092,192.168.68.214:9092

compression.type=none

[root@elk212 config]# mkdir /data/kafka-logs -p

****检查配置****

#分别启动三台zookeeper,查看有无报错(无报错不要关闭,等待kafka来注册)

[root@elk212 config]# /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties

#在另一个终端再分别启动三台kafka,查看有无报错[提示,先启动三台zookeeper]

[root@elk212 ~]# /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties

[root@elk212 ~]# ss -tunl | grep -E ':9092|2181'

tcp    LISTEN    0      50        ::ffff:192.168.68.212:9092                :::*                 

tcp    LISTEN    0      50      :::2181                :::*

**编写简单管理kafka脚本**

每一台都需要编写

[root@elk212 ~]# vim /usr/local/kafka/kafka.sh

#!/bin/bash

if [ $# -eq 1 ];then

    case $1 in

    start)

        echo -e '\033[32mkafka启动中....\033[0m'

        nohup /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &

        sleep 8

        nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

        sleep 1

        echo -e '\033[32mkafka启动成功....\033[0m';;

    stop)

        echo -e '\033[32mkafka停止中....\033[0m'

        nohup /usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/config/server.properties &

        sleep 5

        nohup /usr/local/kafka/bin/zookeeper-server-stop.sh /usr/local/kafka/config/zookeeper.properties &

        sleep 1

        echo -e '\033[32mkafka停止成功\033[0m';;

    *)

        echo -e '\033[31mstart|stop1\033[0m'

    esac

else

    echo -e '\033[31mstart|stop\033[0m'

fi

[root@elk212 ~]# chmod +x /usr/local/kafka/kafka.sh

[root@elk212 ~]# /usr/local/kafka/kafka.sh start

kafka启动中....

nohup: 把输出追加到"nohup.out"

nohup: 把输出追加到"nohup.out"

kafka启动成功....

[root@elk212 ~]# /usr/local/kafka/kafka.sh stop

kafka停止中....

nohup: 把输出追加到"nohup.out"

nohup: 把输出追加到"nohup.out"

kafka停止成功

****测试一下kafka****

[root@elk212 ~]# cd /usr/local/kafka/

#创建topic

[root@elk212 kafka]# bin/kafka-topics.sh --create --zookeeper 192.168.68.212:2181 --replication-factor 1 --partitions 1 --topic test1

Created topic test1

#查看创建的topic

[root@elk212 kafka]# ./bin/kafka-topics.sh --list --zookeeper 192.168.68.212:2181

test

#模拟客户端发送消息

[root@elk212 kafka]# ./bin/kafka-console-producer.sh --broker-list 192.168.68.212:9092 --topic test

>hello jluocc

>bye

#模拟客户端接收信息(如果能正常接收到信息说明kafka部署正常)

[root@elk212 kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.68.212:9092 --topic test --from-beginning

hello jluocc

bye

3)安装Elasticsearch[elk212,elk213,elk214]

[root@elk212 iso]# tar -xvf elasticsearch-7.2.0-linux-x86_64.tar.gz

[root@elk212 iso]# mv elasticsearch-7.2.0 /usr/local/es

[root@elk212 iso]# cd /usr/local/es/config

[root@elk212 config]# vim elasticsearch.yml

[root@elk212 config]# cat elasticsearch.yml | grep -Ev "^$|^#"

cluster.name: elasticsearch

node.name: node-1   #elk213:node-2  elk214:node-3

path.data: /data

path.logs: /data/logs

network.host: 192.168.68.212

http.port: 9200

discovery.seed_hosts: ["192.168.68.212","192.168.68.213","192.168.68.214"]

cluster.initial_master_nodes: ["node-1", "node-2" , "node-3" ]

[root@elk212 config]# useradd  log

[root@elk212 config]# mkdir /data/logs -p

[root@elk212 config]# chown -R log:log /data/

[root@elk212 config]# chown -R log:log /usr/local/es/

[root@elk212 config]# su - log -c "/usr/local/es/bin/elasticsearch"

[root@elk212 ~]# curl 192.168.68.212:9200

{

  "name" : "node-1",

  "cluster_name" : "elasticsearch",

  "cluster_uuid" : "yxC9x1hQQ9mja35PWSqG0A",

  "version" : {

    "number" : "7.2.0",

    "build_flavor" : "default",

    "build_type" : "tar",

    "build_hash" : "508c38a",

    "build_date" : "2019-06-20T15:54:18.811730Z",

    "build_snapshot" : false,

    "lucene_version" : "8.0.0",

    "minimum_wire_compatibility_version" : "6.8.0",

    "minimum_index_compatibility_version" : "6.0.0-beta1"

  },

  "tagline" : "You Know, for Search"

}

****编写简单启动脚本****

[root@elk212 es]# vim manageES.sh

#!/bin/bash

#获取es的pid号

id=`ps -elf | grep -i elasticsearch | grep 'Xmx' | awk '{print $4}'`

if [ $# -eq 1 ];then

    case $1 in

    start)

        if [ -z "$id" ];then

    echo -e '\033[32mes启动中....\033[0m'

    nohup su - log -c "/usr/local/es/bin/elasticsearch" &  > /dev/null

    sleep 5

    echo -e '\033[32mes已经启动成功\033[0m'

else

    echo -e "\033[31mes已经启动!,pid:${id}\033[0m"

fi

;;

    restart)

      if [ -z "$id" ];then

          echo -e '\033[32mes重启中....\033[0m'

          nohup su - log -c "/usr/local/es/bin/elasticsearch" &  > /dev/null

  sleep 5

  echo -e '\033[32mes已经重启成功\033[0m'

      else

          echo -e '\033[32mes重启中....\033[0m'

          kill -9 $id &> /dev/null

  nohup su - log -c "/usr/local/es/bin/elasticsearch" &  > /dev/null

  sleep 5

  echo -e '\033[32mes已经重启成功\033[0m'

      fi

      ;;

status)

      if [ -z "$id" ];then

          echo -e '\033[32mes没有启动!\033[0m'

      else

          echo -e "\033[32mes正在运行,pid:${id}\033[0m"

      fi

      ;;

    stop)

        if [ -z "$id" ];then

    echo -e '\033[31mes没有启动!\033[0m'

        else

    echo -e '\033[32mes停止中....\033[0m'

    kill -9 $id  &> /dev/null

    sleep 2

    echo -e '\033[32mes已经停止成功\033[0m'

fi

;;

    *)

        echo -e '\033[31mstart|restart|status|stop\033[0m'

    esac

else

    echo -e '\033[31mstart|restart|status|stop\033[0m'

fi

[root@elk212 es]# chmod +x manageES.sh

[root@elk212 es]# ./manageES.sh  -h

start|restart|status|stop

[root@elk212 es]# ./manageES.sh start

es启动中....

nohup: 把输出追加到"nohup.out"

es已经启动成功

[root@elk212 es]# ./manageES.sh status

es正在运行,pid:5596

[root@elk212 es]# ss -tunl | grep -E '9200|9300'

tcp    LISTEN    0      128      ::ffff:192.168.68.212:9200                :::*                 

tcp    LISTEN    0      128      ::ffff:192.168.68.212:9300                :::*

4)安装Kibana

[root@elk212 iso]# tar -xvf kibana-7.2.0-linux-x86_64.tar.gz

[root@elk212 iso]# mv kibana-7.2.0-linux-x86_64 /usr/local/kibana

[root@elk212 iso]# chown -R log:log /usr/local/kibana/

[root@elk212 iso]# cd /usr/local/kibana/

[root@elk212 kibana]# vim config/kibana.yml

[root@elk212 kibana]# cat config/kibana.yml | grep -Ev "^$|^#"

server.port: 5601

server.host: "0.0.0.0"

elasticsearch.hosts: ["http://192.168.68.212:9200"]

kibana.index: ".kibana"

pid.file: /usr/local/kibana/kibana.pid

[root@elk212 kibana]# su - log -c "/usr/local/kibana/bin/kibana"

[root@elk212 iso]# ss -tunl | grep 5601

tcp    LISTEN    0      128    192.168.68.212:5601                  *:*

****编写启动脚本****

[root@elk212 kibana]# vim kibana.sh

#!/bin/bash

#获取kibanapid

id=`ps -elf | grep -i kibana | grep 'max-http-header-size' | awk '{print $4}'`

if [ $# -eq 1 ];then

    case $1 in

    start)

        if [ -z "$id" ];then

    echo -e '\033[32mkibana启动中....\033[0m'

    nohup su - log -c "/usr/local/kibana/bin/kibana" &  > /dev/null

    sleep 5

    echo -e '\033[32mkibana已经启动成功\033[0m'

else

    echo -e "\033[31mkibana已经启动!,pid:${id}\033[0m"

fi

;;

    restart)

      if [ -z "$id" ];then

          echo -e '\033[32mkibana重启中....\033[0m'

          nohup su - log -c "/usr/local/kibana/bin/kibana" &  > /dev/null

  sleep 5

  echo -e '\033[32mkibana已经重启成功\033[0m'

      else

          echo -e '\033[32mkibana重启中....\033[0m'

          kill -9 $id &> /dev/null

  nohup su - log -c "/usr/local/kibana/bin/kibana" &  > /dev/null

  sleep 5

  echo -e '\033[32mkibana已经重启成功\033[0m'

      fi

      ;;

  status)

      if [ -z "$id" ];then

          echo -e '\033[32mkibana没有启动!\033[0m'

      else

          echo -e "\033[32mkibana正在运行,pid:${id}\033[0m"

      fi

      ;;

    stop)

        if [ -z "$id" ];then

    echo -e '\033[31mkibana没有启动!\033[0m'

        else

    echo -e '\033[32mkibana停止中....\033[0m'

    kill -9 $id  &> /dev/null

    sleep 2

    echo -e '\033[32mkibana已经停止成功\033[0m'

fi

;;

    *)

        echo -e '\033[31mstart|restart|status|stop\033[0m'

    esac

else

    echo -e '\033[31mstart|restart|status|stop\033[0m'

fi

[root@elk212 kibana]# chmod +x kibana.sh

[root@elk212 kibana]# ./kibana.sh status

kibana没有启动!

[root@elk212 kibana]# ./kibana.sh start

kibana启动中....

nohup: 把输出追加到"nohup.out"

kibana已经启动成功

[root@elk212 kibana]# ./kibana.sh status

kibana正在运行,pid:6714

5)安装Logstash[elk212,elk213,elk214]

[root@elk212 iso]# tar -xvf logstash-7.2.0.tar.gz

[root@elk212 iso]# mv logstash-7.2.0 /usr/local/logstash

[root@elk212 logstash]# chown -R log:log /usr/local/logstash/

[root@elk212 iso]# cd /usr/local/logstash/

[root@elk212 logstash]# cp config/logstash-sample.conf config/logstash.conf

[root@elk212 logstash]# vim config/logstash.conf

input {

  kafka {

    bootstrap_servers => ["192.168.68.212:9092","192.168.68.213:9092","192.168.68.214:9092"]

    topics => ["credit"]

    group_id => "test-consumer-group"

    codec => "json"

    consumer_threads => 1

    decorate_events => true

  }

}

output {

  elasticsearch {

    hosts => ["192.168.68.212:9200","192.168.68.213:9200","192.168.68.214:9200"]

    index => "logs-%{+YYYY.MM.dd}"

    workers => 1

  }

}

这里需要注意的就四个地方 bootstrap_servers 也就是kafka集群的地址,在filebeat端要求单个地址加引号,这里是集群地址放一起加引号。

group_id 这里必须保证唯一,是你这个logstash集群消费kafka集群的身份标识。

topics  filebeat和logstash使用的topic一致。

codec => json  由于beat传输数据给kafka集群的时候,会附加很多tag,默认情况下,logstash就会将这串tag也认为是message的一部分。这样不利于后期的数据处理。所有需要添加codec处理。得到原本的message数据。

#启动logstash,如果没有error , 就启动完成了

[root@elk212 ~]# /usr/local/logstash/bin/logstash -f /usr/local/logstash/config/logstash.conf

[root@elk212 ~]# ss -tunl | grep :9600

tcp    LISTEN    0      50    ::ffff:127.0.0.1:9600                :::*

****编写管理logstash脚本****

[root@elk212 ~]# cd /usr/local/logstash/

[root@elk212 logstash]# vim logstash.sh

#!/bin/bash

#获取logstash进程pid号

id=`ps -elf | grep -i logstash | grep 'Xms' | awk '{print $4}'`

if [ $# -eq 1 ];then

    case $1 in

    start)

        if [ -z "$id" ];then

    echo -e '\033[32mlogstash启动中....\033[0m'

    nohup su - log -c " /usr/local/logstash/bin/logstash -f /usr/local/logstash/config/logstash.conf" &  > /dev/null

    sleep 5

    echo -e '\033[32mlogstash已经启动成功\033[0m'

else

    echo -e "\033[31mlogstash已经启动!,pid:${id}\033[0m"

fi

;;

    restart)

      if [ -z "$id" ];then

          echo -e '\033[32mlogstash重启中....\033[0m'

          nohup su - log -c " /usr/local/logstash/bin/logstash -f /usr/local/logstash/config/logstash.conf" &  > /dev/null

  sleep 5

  echo -e '\033[32mlogstash已经重启成功\033[0m'

      else

          echo -e '\033[32mlogstash重启中....\033[0m'

          kill -9 $id &> /dev/null

  nohup su - log -c " /usr/local/logstash/bin/logstash -f /usr/local/logstash/config/logstash.conf" &  > /dev/null

  sleep 5

  echo -e '\033[32mlogstash已经重启成功\033[0m'

      fi

      ;;

  status)

      if [ -z "$id" ];then

          echo -e '\033[32mlogstash没有启动!\033[0m'

      else

          echo -e "\033[32mlogstash正在运行,pid:${id}\033[0m"

      fi

      ;;

    stop)

        if [ -z "$id" ];then

    echo -e '\033[31mlogstash没有启动!\033[0m'

        else

    echo -e '\033[32mlogstash停止中....\033[0m'

    kill -9 $id  &> /dev/null

    sleep 2

    echo -e '\033[32mlogstash已经停止成功\033[0m'

fi

;;

    *)

        echo -e '\033[31mstart|restart|status|stop\033[0m'

    esac

else

    echo -e '\033[31mstart|restart|status|stop\033[0m'

fi

[root@elk212 logstash]# chmod +x logstash.sh

[root@elk212 logstash]# ./logstash.sh

start|restart|status|stop

[root@elk212 logstash]# ./logstash.sh stop

logstash停止中....

logstash已经停止成功

[root@elk212 logstash]# ./logstash.sh status

logstash没有启动!

[root@elk212 logstash]# ./logstash.sh start

logstash启动中....

nohup: 把输出追加到"nohup.out"

logstash已经启动成功

[root@elk212 logstash]# ./logstash.sh status

logstash正在运行,pid:7586

2、客户端

[root@web50 efk]# tar -xvf filebeat-7.2.0-linux-x86_64.tar.gz

[root@web50 efk]# mv filebeat-7.2.0-linux-x86_64 /usr/local/filebeat

[root@web50 filebeat]# useradd log

[root@web50 filebeat]# chown -R log:log /usr/local/filebeat/

[root@web50 efk]# cd /usr/local/filebeat/

[root@web50 filebeat]# vim filebeat.yml

[root@web50 filebeat]# grep -vE '^$|#' filebeat.yml

filebeat.inputs:

- type: log

  enabled: true

  paths:

    - /var/log/*.log

  multiline.pattern: ^\d+

  multiline.negate: true

  multiline.match: after

filebeat.config.modules:

  path: ${path.config}/modules.d/*.yml

  reload.enabled: false

setup.template.settings:

  index.number_of_shards: 1

setup.kibana:

output.kafka:

  hosts: ['192.168.68.212:9092','192.168.68.213:9092' ,'192.168.68.214:9092']

  topic: 'credit'

processors:

  - add_host_metadata: ~

  - add_cloud_metadata: ~

[root@web50 filebeat]# ./filebeat modules enable nginx

[root@web50 filebeat]# vim modules.d/nginx.yml

[root@web50 filebeat]# cat modules.d/nginx.yml | grep -Ev '^$|#'

- module: nginx

  access:

    enabled: true

    var.paths: ["/usr/local/nginx/logs/access.log"]

  error:

    enabled: true

    var.paths: ["/usr/local/nginx/logs/error.log"]

#启动

[root@web50 filebeat]# /usr/local/filebeat/filebeat -e -c /usr/local/filebeat/filebeat.yml

****编写管理脚本****

[root@web50 ~]# cd /usr/local/filebeat/

[root@web50 filebeat]# vim mbeat.sh

#/bin/bash

#获取filebeat进程pid号

id=`ps -elf | grep -i filebeat | grep -v grep | awk '{print $4}' | tail -1`

if [ $# -eq 1 ];then

    case $1 in

    start)

        if [ -z "$id" ];then

    echo -e '\033[32mlogstash启动中....\033[0m'

    nohup su - log -c "/usr/local/filebeat/filebeat -e -c /usr/local/filebeat/filebeat.yml" &  > /dev/null

    sleep 5

    echo -e '\033[32mfilebeat已经启动成功\033[0m'

else

    echo -e "\033[31mfilebeat已经启动!,pid:${id}\033[0m"

fi

;;

    restart)

      if [ -z "$id" ];then

          echo -e '\033[32mlogstash重启中....\033[0m'

          nohup su - log -c " /usr/local/filebeat/filebeat -e -c /usr/local/filebeat/filebeat.yml" &  > /dev/null

  sleep 5

  echo -e '\033[32mfilebeat已经重启成功\033[0m'

      else

          echo -e '\033[32mfilebeat重启中....\033[0m'

          kill -9 $id &> /dev/null

  nohup su - log -c " /usr/local/filebeat/filebeat -e -c /usr/local/filebeat/filebeat.yml" &  > /dev/null

  sleep 5

  echo -e '\033[32mfilebeat已经重启成功\033[0m'

      fi

      ;;

  status)

      if [ -z "$id" ];then

          echo -e '\033[32mfilebeat没有启动!\033[0m'

      else

          echo -e "\033[32mfilebeat正在运行,pid:${id}\033[0m"

      fi

      ;;

    stop)

        if [ -z "$id" ];then

    echo -e '\033[31mfilebeat没有启动!\033[0m'

        else

    echo -e '\033[32mfilebeat停止中....\033[0m'

    kill -9 $id  &> /dev/null

    sleep 2

    echo -e '\033[32mfilebeat已经停止成功\033[0m'

fi

;;

    *)

        echo -e '\033[31mstart|restart|status|stop\033[0m'

    esac

else

    echo -e '\033[31mstart|restart|status|stop\033[0m'

fi

[root@web50 filebeat]# chmod +x mbeat.sh

[root@web50 filebeat]# ./mbeat.sh start

logstash启动中....

nohup: 把输出追加到"nohup.out"

filebeat已经启动成功

[root@web50 filebeat]# ./mbeat.sh status

filebeat正在运行,pid:3216

3、kibana配置

四总结&提示

本次实验主要是简单分享下elk+filebeat+kafka构建日志分析平台,由于自己电脑配置问题,只能部署在三台电脑上,生产环境中,应该把es集群分离出来单独部署,具体情况具体分析,kibana可以多部署几台,用nginx调度,组建高可以kibana,配置访问权限,增加安全性等等。

常见错误01:

解决办法:调大zookeeper连接时间
zookeeper.connection.timeout.ms=60000

看官福利:


结束语:

更多精彩内容持续更新中,关注我,有你更精彩。

上一篇下一篇

猜你喜欢

热点阅读