大型网络日志中心的实现
用elk建立网络日志中心实践,在前面的文章中简单的用了两台设备对elk进行简单的网络日志中心实践,要建立更大规模的日志中心是不够的,以下方案优化了各环境,实现大型网络中可用的日志中心。
架构拓扑如下:
ELK环境的搭建看前面文章用elk建立网络日志中心实践
zookeeper
安装:
Hosts 修改
[root@10-57-22-234 bin]# cat /etc/hosts
10.57.22.167 zk1
10.57.22.218 zk2
10.57.22.234 zk3
Java
yum install java -y
wget http://apache.fayea.com/zookeeper/stable/zookeeper-3.4.10.tar.gz
tar -zxvf zookeeper-3.4.10.tar.gz
mv zookeeper-3.4.10 /usr/local/zookeeper
cd /usr/local/zookeeper/conf/
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
[root@10-57-22-234 bin]# cat /usr/local/zookeeper/conf/zoo.cfg |grep -v ^'#'
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zk
clientPort=2181
server.0=zk1:2888:3888
server.1=zk2:2888:3888
server.2=zk3:2888:3888
mkdir -p /data/zk
生成ID文件很重要,三个文件的ID分别为0 1 2,如果不做会启动失败,报 Invalid config, exiting abnormally
touch /data/zk/myid
echo 0 >/data/zk/myid
启动
./zkServer.sh start
查看
[root@10-57-22-234 bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: follower
报错检查
1、防火墙是否已经关
2、配置文件用主机名,查看主机名能否解析,查看hosts文件
3、dataDir目录需要手工建,并且需要创建myid文件,并在文件中写入不同的值。
kafka
下载:
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.0.1/kafka_2.12-1.0.1.tgz
tar -zxvf kafka_2.12-1.0.1.tgz
mv kafka_2.12-1.0.1 /usr/local/kafka
增加环境变量
vi /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=/usr/local/kafka/bin:$PATH
刷新环境变量
source /etc/profile
启动脚本:新版本的kafka自带zk我zk已经装好,就把zk的启动脚本关了。
chmod + x kafka
启动 /etc/init.d/kafka start
停止 /etc/init.d/kafka stop
[root@10-57-22-218 config]# cat /etc/init.d/kafka
#!/bin/bash
kafka_home=/usr/local/kafka
case $1 in
start) # 服务启动需要做的步骤
# echo "zookeeper start"
# $kafka_home/bin/zookeeper-server-start.sh -daemon $kafka_home/config/zookeeper.properties
#sleep 1
echo "kafka start"
$kafka_home/bin/kafka-server-start.sh -daemon $kafka_home/config/server.properties
#sleep 1
;;
stop) # 服务停止需要做的步骤
echo "kafka stop"
$kafka_home/bin/kafka-server-stop.sh
#sleep 1
# echo "zookeeper stop"
# $kafka_home/bin/zookeeper-server-stop.sh
#sleep 1
;;
restart) # 重启服务需要做的步骤
...
;;
status) # 查看状态需要做的步骤
...
;;
*) echo "$0 {start|stop|restart|status}"
exit 4
;;
esac
测试使用:
1、创建主题
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
2、查看主题
kafka-topics.sh --list --zookeeper localhost:2181
查看主题详情:
kafka-topics.sh --describe --zookeeper zk1:2181 --topic test
3、开启一个终端,发送消息,生产者的消息要发往kafka
kafka-console-producer.sh --broker-list localhost:9092 --topic test
4、另起一个终端,消费消息,消费者的消息来自zookeeper(协调转发)
kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
报错处理:
[2018-03-20 02:14:20,347] WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
原因集群环境执行需要加上所有IP,如下:
kafka-console-producer.sh --broker-list zk1:9092,zk2:9092,zk3:9092 --topic test
三台kafka上logstash配置文件,新版本的logstash已经不支持以下的配置
input {
kafka {
zk_connect => "zk1:2181,zk2:2181,zk3:2181"
topic_id => "networklog"
codec => plain
reset_beginning => false
consumer_threads => 5
decorate_events => true
}
}
output {
elasticsearch { hosts => ["10.57.22.126:9200","10.57.22.128:9200"] }
}
报错
[2018-03-20T02:38:00,052][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.2.2"}
[2018-03-20T02:38:00,144][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2018-03-20T02:38:00,487][ERROR][logstash.inputs.kafka ] Unknown setting 'zk_connect' for kafka
[2018-03-20T02:38:00,488][ERROR][logstash.inputs.kafka ] Unknown setting 'topic_id' for kafka
[2018-03-20T02:38:00,488][ERROR][logstash.inputs.kafka ] Unknown setting 'reset_beginning' for kafka
需要修改成以下,同时注意端口变成9092kafka不再是zk
input {
kafka {
bootstrap_servers => "zk1:9092,zk2:9092,zk3:9092"
topics => ["networklog"]
}
}
output {
elasticsearch { hosts => ["10.57.22.126:9200","10.57.22.128:9200"] }
}
10.57.22.95 的logstash配置文件
input {
tcp {
port => 514
type => syslog
}
udp {
port => 514
type => syslog
}
}
output {
kafka {
bootstrap_servers =>"10.57.22.167:9092,10.57.22.234:9092,10.57.22.218:9092"
topic_id => "networklog"
compression_type => "snappy"
}
}