部署efk+kafka高并发日志收集系统
一架构&详解
架构图:
本次实验主要以采集nginx日志为例,采用filebeat采集日志,传输到kafka,然后到logstash处理后存储到es中,kibana再从es中读取数据做数据可视化展示。
详情:
elk212 ==> 192.168.68.212
elk214 ==> 192.168.68.213
elk214 ==> 192.168.68.214
web50 ==> 192.168.68.50
二工具&原料
1、系统环境与软件详情
elasticsearch-7.2.0
kibana-7.2.0
logstash-7.2.0
filebeat-7.2.0
kafka_2.12-2.2.1
jdk-8u211-linux-x64
系统详情:[elk212,elk213,elk214]
[root@elk212 iso]# cat /etc/redhat-release
CentOS Linux release 7.4.1708 (Core)
[root@elk212 iso]# uname -r
3.10.0-693.el7.x86_64
[root@elk212 ~]# sed -i 's/SELINUX=enforcing/SELINUX=disabled/' /etc/selinux/config
[root@elk212 ~]# setenforce 0
[root@elk212 ~]# systemctl stop firewalld
[root@elk212 ~]# systemctl disable firewalld
[root@elk212 iso]# yum -y install iptables-services
[root@elk212 iso]# iptables -F
[root@elk212 iso]# service iptables save
[root@elk212 ~]# echo '192.168.68.212 elk212' >> /etc/hosts
[root@elk212 iso]# vim /etc/security/limits.conf
* soft nofile 100000
* hard nofile 100000
* soft nproc 100000
* hard nproc 100000
[root@elk212 iso]# ulimit -SHn 100000
[root@elk212 es]# echo 'vm.max_map_count=262144' >> /etc/sysctl.conf
[root@elk212 es]# sysctl -p
vm.max_map_count = 262144
三方法&步骤
1、服务端
1)安装java[elk212,elk213,elk214]
[root@elk212 iso]# tar -xvf jdk-8u211-linux-x64.tar.gz
[root@elk212 iso]# mv jdk1.8.0_211 /usr/local/java
[root@elk212 iso]# vim /etc/profile
export JAVA_HOME=/usr/local/java
export PATH=$JAVA_HOME/bin:$PATH
[root@elk212 iso]# source /etc/profile
[root@elk212 iso]# java -version
java version "1.8.0_211"
Java(TM) SE Runtime Environment (build 1.8.0_211-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.211-b12, mixed mode)
2)安装kafka[elk212,elk213,elk214]
****安装配置****
[root@elk212 iso]# tar -xvf kafka_2.12-2.2.1.tgz
[root@elk212 iso]# mv kafka_2.12-2.2.1 /usr/local/kafka
[root@elk212 iso]# cd /usr/local/kafka/config
[root@elk212 config]# grep -Ev "^$|^#" server.properties
broker.id=1 #elk213:2 elk214:3
delete.topic.enable=true
listeners=PLAINTEXT://192.168.68.212:9092 #本机ip
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs
num.partitions=12
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.68.212:2181,192.168.68.213:2181,192.168.68.214:2181
zookeeper.connection.timeout.ms=10000
group.initial.rebalance.delay.ms=0
[root@elk212 config]# grep -Ev "^$|^#" consumer.properties
bootstrap.servers=192.168.68.212:9092,192.168.68.213:9092,192.168.68.214:9092
zookeeper.connection.timeout.ms=60000
group.id=test-consumer-group
[root@elk212 config]# grep -Ev "^$|^#" producer.properties
bootstrap.servers=192.168.68.212:9092,192.168.68.213:9092,192.168.68.214:9092
compression.type=none
[root@elk212 config]# mkdir /data/kafka-logs -p
****检查配置****
#分别启动三台zookeeper,查看有无报错(无报错不要关闭,等待kafka来注册)
[root@elk212 config]# /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
#在另一个终端再分别启动三台kafka,查看有无报错[提示,先启动三台zookeeper]
[root@elk212 ~]# /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
[root@elk212 ~]# ss -tunl | grep -E ':9092|2181'
tcp LISTEN 0 50 ::ffff:192.168.68.212:9092 :::*
tcp LISTEN 0 50 :::2181 :::*
**编写简单管理kafka脚本**
每一台都需要编写
[root@elk212 ~]# vim /usr/local/kafka/kafka.sh
#!/bin/bash
if [ $# -eq 1 ];then
case $1 in
start)
echo -e '\033[32mkafka启动中....\033[0m'
nohup /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &
sleep 8
nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
sleep 1
echo -e '\033[32mkafka启动成功....\033[0m';;
stop)
echo -e '\033[32mkafka停止中....\033[0m'
nohup /usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/config/server.properties &
sleep 5
nohup /usr/local/kafka/bin/zookeeper-server-stop.sh /usr/local/kafka/config/zookeeper.properties &
sleep 1
echo -e '\033[32mkafka停止成功\033[0m';;
*)
echo -e '\033[31mstart|stop1\033[0m'
esac
else
echo -e '\033[31mstart|stop\033[0m'
fi
[root@elk212 ~]# chmod +x /usr/local/kafka/kafka.sh
[root@elk212 ~]# /usr/local/kafka/kafka.sh start
kafka启动中....
nohup: 把输出追加到"nohup.out"
nohup: 把输出追加到"nohup.out"
kafka启动成功....
[root@elk212 ~]# /usr/local/kafka/kafka.sh stop
kafka停止中....
nohup: 把输出追加到"nohup.out"
nohup: 把输出追加到"nohup.out"
kafka停止成功
****测试一下kafka****
[root@elk212 ~]# cd /usr/local/kafka/
#创建topic
[root@elk212 kafka]# bin/kafka-topics.sh --create --zookeeper 192.168.68.212:2181 --replication-factor 1 --partitions 1 --topic test1
Created topic test1
#查看创建的topic
[root@elk212 kafka]# ./bin/kafka-topics.sh --list --zookeeper 192.168.68.212:2181
test
#模拟客户端发送消息
[root@elk212 kafka]# ./bin/kafka-console-producer.sh --broker-list 192.168.68.212:9092 --topic test
>hello jluocc
>bye
#模拟客户端接收信息(如果能正常接收到信息说明kafka部署正常)
[root@elk212 kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.68.212:9092 --topic test --from-beginning
hello jluocc
bye
3)安装Elasticsearch[elk212,elk213,elk214]
[root@elk212 iso]# tar -xvf elasticsearch-7.2.0-linux-x86_64.tar.gz
[root@elk212 iso]# mv elasticsearch-7.2.0 /usr/local/es
[root@elk212 iso]# cd /usr/local/es/config
[root@elk212 config]# vim elasticsearch.yml
[root@elk212 config]# cat elasticsearch.yml | grep -Ev "^$|^#"
cluster.name: elasticsearch
node.name: node-1 #elk213:node-2 elk214:node-3
path.data: /data
path.logs: /data/logs
network.host: 192.168.68.212
http.port: 9200
discovery.seed_hosts: ["192.168.68.212","192.168.68.213","192.168.68.214"]
cluster.initial_master_nodes: ["node-1", "node-2" , "node-3" ]
[root@elk212 config]# useradd log
[root@elk212 config]# mkdir /data/logs -p
[root@elk212 config]# chown -R log:log /data/
[root@elk212 config]# chown -R log:log /usr/local/es/
[root@elk212 config]# su - log -c "/usr/local/es/bin/elasticsearch"
[root@elk212 ~]# curl 192.168.68.212:9200
{
"name" : "node-1",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "yxC9x1hQQ9mja35PWSqG0A",
"version" : {
"number" : "7.2.0",
"build_flavor" : "default",
"build_type" : "tar",
"build_hash" : "508c38a",
"build_date" : "2019-06-20T15:54:18.811730Z",
"build_snapshot" : false,
"lucene_version" : "8.0.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
****编写简单启动脚本****
[root@elk212 es]# vim manageES.sh
#!/bin/bash
#获取es的pid号
id=`ps -elf | grep -i elasticsearch | grep 'Xmx' | awk '{print $4}'`
if [ $# -eq 1 ];then
case $1 in
start)
if [ -z "$id" ];then
echo -e '\033[32mes启动中....\033[0m'
nohup su - log -c "/usr/local/es/bin/elasticsearch" & > /dev/null
sleep 5
echo -e '\033[32mes已经启动成功\033[0m'
else
echo -e "\033[31mes已经启动!,pid:${id}\033[0m"
fi
;;
restart)
if [ -z "$id" ];then
echo -e '\033[32mes重启中....\033[0m'
nohup su - log -c "/usr/local/es/bin/elasticsearch" & > /dev/null
sleep 5
echo -e '\033[32mes已经重启成功\033[0m'
else
echo -e '\033[32mes重启中....\033[0m'
kill -9 $id &> /dev/null
nohup su - log -c "/usr/local/es/bin/elasticsearch" & > /dev/null
sleep 5
echo -e '\033[32mes已经重启成功\033[0m'
fi
;;
status)
if [ -z "$id" ];then
echo -e '\033[32mes没有启动!\033[0m'
else
echo -e "\033[32mes正在运行,pid:${id}\033[0m"
fi
;;
stop)
if [ -z "$id" ];then
echo -e '\033[31mes没有启动!\033[0m'
else
echo -e '\033[32mes停止中....\033[0m'
kill -9 $id &> /dev/null
sleep 2
echo -e '\033[32mes已经停止成功\033[0m'
fi
;;
*)
echo -e '\033[31mstart|restart|status|stop\033[0m'
esac
else
echo -e '\033[31mstart|restart|status|stop\033[0m'
fi
[root@elk212 es]# chmod +x manageES.sh
[root@elk212 es]# ./manageES.sh -h
start|restart|status|stop
[root@elk212 es]# ./manageES.sh start
es启动中....
nohup: 把输出追加到"nohup.out"
es已经启动成功
[root@elk212 es]# ./manageES.sh status
es正在运行,pid:5596
[root@elk212 es]# ss -tunl | grep -E '9200|9300'
tcp LISTEN 0 128 ::ffff:192.168.68.212:9200 :::*
tcp LISTEN 0 128 ::ffff:192.168.68.212:9300 :::*
4)安装Kibana
[root@elk212 iso]# tar -xvf kibana-7.2.0-linux-x86_64.tar.gz
[root@elk212 iso]# mv kibana-7.2.0-linux-x86_64 /usr/local/kibana
[root@elk212 iso]# chown -R log:log /usr/local/kibana/
[root@elk212 iso]# cd /usr/local/kibana/
[root@elk212 kibana]# vim config/kibana.yml
[root@elk212 kibana]# cat config/kibana.yml | grep -Ev "^$|^#"
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://192.168.68.212:9200"]
kibana.index: ".kibana"
pid.file: /usr/local/kibana/kibana.pid
[root@elk212 kibana]# su - log -c "/usr/local/kibana/bin/kibana"
[root@elk212 iso]# ss -tunl | grep 5601
tcp LISTEN 0 128 192.168.68.212:5601 *:*
****编写启动脚本****
[root@elk212 kibana]# vim kibana.sh
#!/bin/bash
#获取kibanapid
id=`ps -elf | grep -i kibana | grep 'max-http-header-size' | awk '{print $4}'`
if [ $# -eq 1 ];then
case $1 in
start)
if [ -z "$id" ];then
echo -e '\033[32mkibana启动中....\033[0m'
nohup su - log -c "/usr/local/kibana/bin/kibana" & > /dev/null
sleep 5
echo -e '\033[32mkibana已经启动成功\033[0m'
else
echo -e "\033[31mkibana已经启动!,pid:${id}\033[0m"
fi
;;
restart)
if [ -z "$id" ];then
echo -e '\033[32mkibana重启中....\033[0m'
nohup su - log -c "/usr/local/kibana/bin/kibana" & > /dev/null
sleep 5
echo -e '\033[32mkibana已经重启成功\033[0m'
else
echo -e '\033[32mkibana重启中....\033[0m'
kill -9 $id &> /dev/null
nohup su - log -c "/usr/local/kibana/bin/kibana" & > /dev/null
sleep 5
echo -e '\033[32mkibana已经重启成功\033[0m'
fi
;;
status)
if [ -z "$id" ];then
echo -e '\033[32mkibana没有启动!\033[0m'
else
echo -e "\033[32mkibana正在运行,pid:${id}\033[0m"
fi
;;
stop)
if [ -z "$id" ];then
echo -e '\033[31mkibana没有启动!\033[0m'
else
echo -e '\033[32mkibana停止中....\033[0m'
kill -9 $id &> /dev/null
sleep 2
echo -e '\033[32mkibana已经停止成功\033[0m'
fi
;;
*)
echo -e '\033[31mstart|restart|status|stop\033[0m'
esac
else
echo -e '\033[31mstart|restart|status|stop\033[0m'
fi
[root@elk212 kibana]# chmod +x kibana.sh
[root@elk212 kibana]# ./kibana.sh status
kibana没有启动!
[root@elk212 kibana]# ./kibana.sh start
kibana启动中....
nohup: 把输出追加到"nohup.out"
kibana已经启动成功
[root@elk212 kibana]# ./kibana.sh status
kibana正在运行,pid:6714
5)安装Logstash[elk212,elk213,elk214]
[root@elk212 iso]# tar -xvf logstash-7.2.0.tar.gz
[root@elk212 iso]# mv logstash-7.2.0 /usr/local/logstash
[root@elk212 logstash]# chown -R log:log /usr/local/logstash/
[root@elk212 iso]# cd /usr/local/logstash/
[root@elk212 logstash]# cp config/logstash-sample.conf config/logstash.conf
[root@elk212 logstash]# vim config/logstash.conf
input {
kafka {
bootstrap_servers => ["192.168.68.212:9092","192.168.68.213:9092","192.168.68.214:9092"]
topics => ["credit"]
group_id => "test-consumer-group"
codec => "json"
consumer_threads => 1
decorate_events => true
}
}
output {
elasticsearch {
hosts => ["192.168.68.212:9200","192.168.68.213:9200","192.168.68.214:9200"]
index => "logs-%{+YYYY.MM.dd}"
workers => 1
}
}
这里需要注意的就四个地方 bootstrap_servers 也就是kafka集群的地址,在filebeat端要求单个地址加引号,这里是集群地址放一起加引号。
group_id 这里必须保证唯一,是你这个logstash集群消费kafka集群的身份标识。
topics filebeat和logstash使用的topic一致。
codec => json 由于beat传输数据给kafka集群的时候,会附加很多tag,默认情况下,logstash就会将这串tag也认为是message的一部分。这样不利于后期的数据处理。所有需要添加codec处理。得到原本的message数据。
#启动logstash,如果没有error , 就启动完成了
[root@elk212 ~]# /usr/local/logstash/bin/logstash -f /usr/local/logstash/config/logstash.conf
[root@elk212 ~]# ss -tunl | grep :9600
tcp LISTEN 0 50 ::ffff:127.0.0.1:9600 :::*
****编写管理logstash脚本****
[root@elk212 ~]# cd /usr/local/logstash/
[root@elk212 logstash]# vim logstash.sh
#!/bin/bash
#获取logstash进程pid号
id=`ps -elf | grep -i logstash | grep 'Xms' | awk '{print $4}'`
if [ $# -eq 1 ];then
case $1 in
start)
if [ -z "$id" ];then
echo -e '\033[32mlogstash启动中....\033[0m'
nohup su - log -c " /usr/local/logstash/bin/logstash -f /usr/local/logstash/config/logstash.conf" & > /dev/null
sleep 5
echo -e '\033[32mlogstash已经启动成功\033[0m'
else
echo -e "\033[31mlogstash已经启动!,pid:${id}\033[0m"
fi
;;
restart)
if [ -z "$id" ];then
echo -e '\033[32mlogstash重启中....\033[0m'
nohup su - log -c " /usr/local/logstash/bin/logstash -f /usr/local/logstash/config/logstash.conf" & > /dev/null
sleep 5
echo -e '\033[32mlogstash已经重启成功\033[0m'
else
echo -e '\033[32mlogstash重启中....\033[0m'
kill -9 $id &> /dev/null
nohup su - log -c " /usr/local/logstash/bin/logstash -f /usr/local/logstash/config/logstash.conf" & > /dev/null
sleep 5
echo -e '\033[32mlogstash已经重启成功\033[0m'
fi
;;
status)
if [ -z "$id" ];then
echo -e '\033[32mlogstash没有启动!\033[0m'
else
echo -e "\033[32mlogstash正在运行,pid:${id}\033[0m"
fi
;;
stop)
if [ -z "$id" ];then
echo -e '\033[31mlogstash没有启动!\033[0m'
else
echo -e '\033[32mlogstash停止中....\033[0m'
kill -9 $id &> /dev/null
sleep 2
echo -e '\033[32mlogstash已经停止成功\033[0m'
fi
;;
*)
echo -e '\033[31mstart|restart|status|stop\033[0m'
esac
else
echo -e '\033[31mstart|restart|status|stop\033[0m'
fi
[root@elk212 logstash]# chmod +x logstash.sh
[root@elk212 logstash]# ./logstash.sh
start|restart|status|stop
[root@elk212 logstash]# ./logstash.sh stop
logstash停止中....
logstash已经停止成功
[root@elk212 logstash]# ./logstash.sh status
logstash没有启动!
[root@elk212 logstash]# ./logstash.sh start
logstash启动中....
nohup: 把输出追加到"nohup.out"
logstash已经启动成功
[root@elk212 logstash]# ./logstash.sh status
logstash正在运行,pid:7586
2、客户端
[root@web50 efk]# tar -xvf filebeat-7.2.0-linux-x86_64.tar.gz
[root@web50 efk]# mv filebeat-7.2.0-linux-x86_64 /usr/local/filebeat
[root@web50 filebeat]# useradd log
[root@web50 filebeat]# chown -R log:log /usr/local/filebeat/
[root@web50 efk]# cd /usr/local/filebeat/
[root@web50 filebeat]# vim filebeat.yml
[root@web50 filebeat]# grep -vE '^$|#' filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/*.log
multiline.pattern: ^\d+
multiline.negate: true
multiline.match: after
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 1
setup.kibana:
output.kafka:
hosts: ['192.168.68.212:9092','192.168.68.213:9092' ,'192.168.68.214:9092']
topic: 'credit'
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
[root@web50 filebeat]# ./filebeat modules enable nginx
[root@web50 filebeat]# vim modules.d/nginx.yml
[root@web50 filebeat]# cat modules.d/nginx.yml | grep -Ev '^$|#'
- module: nginx
access:
enabled: true
var.paths: ["/usr/local/nginx/logs/access.log"]
error:
enabled: true
var.paths: ["/usr/local/nginx/logs/error.log"]
#启动
[root@web50 filebeat]# /usr/local/filebeat/filebeat -e -c /usr/local/filebeat/filebeat.yml
****编写管理脚本****
[root@web50 ~]# cd /usr/local/filebeat/
[root@web50 filebeat]# vim mbeat.sh
#/bin/bash
#获取filebeat进程pid号
id=`ps -elf | grep -i filebeat | grep -v grep | awk '{print $4}' | tail -1`
if [ $# -eq 1 ];then
case $1 in
start)
if [ -z "$id" ];then
echo -e '\033[32mlogstash启动中....\033[0m'
nohup su - log -c "/usr/local/filebeat/filebeat -e -c /usr/local/filebeat/filebeat.yml" & > /dev/null
sleep 5
echo -e '\033[32mfilebeat已经启动成功\033[0m'
else
echo -e "\033[31mfilebeat已经启动!,pid:${id}\033[0m"
fi
;;
restart)
if [ -z "$id" ];then
echo -e '\033[32mlogstash重启中....\033[0m'
nohup su - log -c " /usr/local/filebeat/filebeat -e -c /usr/local/filebeat/filebeat.yml" & > /dev/null
sleep 5
echo -e '\033[32mfilebeat已经重启成功\033[0m'
else
echo -e '\033[32mfilebeat重启中....\033[0m'
kill -9 $id &> /dev/null
nohup su - log -c " /usr/local/filebeat/filebeat -e -c /usr/local/filebeat/filebeat.yml" & > /dev/null
sleep 5
echo -e '\033[32mfilebeat已经重启成功\033[0m'
fi
;;
status)
if [ -z "$id" ];then
echo -e '\033[32mfilebeat没有启动!\033[0m'
else
echo -e "\033[32mfilebeat正在运行,pid:${id}\033[0m"
fi
;;
stop)
if [ -z "$id" ];then
echo -e '\033[31mfilebeat没有启动!\033[0m'
else
echo -e '\033[32mfilebeat停止中....\033[0m'
kill -9 $id &> /dev/null
sleep 2
echo -e '\033[32mfilebeat已经停止成功\033[0m'
fi
;;
*)
echo -e '\033[31mstart|restart|status|stop\033[0m'
esac
else
echo -e '\033[31mstart|restart|status|stop\033[0m'
fi
[root@web50 filebeat]# chmod +x mbeat.sh
[root@web50 filebeat]# ./mbeat.sh start
logstash启动中....
nohup: 把输出追加到"nohup.out"
filebeat已经启动成功
[root@web50 filebeat]# ./mbeat.sh status
filebeat正在运行,pid:3216
3、kibana配置
四总结&提示
本次实验主要是简单分享下elk+filebeat+kafka构建日志分析平台,由于自己电脑配置问题,只能部署在三台电脑上,生产环境中,应该把es集群分离出来单独部署,具体情况具体分析,kibana可以多部署几台,用nginx调度,组建高可以kibana,配置访问权限,增加安全性等等。
常见错误01:
解决办法:调大zookeeper连接时间
zookeeper.connection.timeout.ms=60000
看官福利:
结束语:
更多精彩内容持续更新中,关注我,有你更精彩。