Logstash_输入插件(未完成版)
标准输入
input {
stdin {
add_field => {"key" => "value"}
codec => "plain"
tags => ["add"]
type => "std" # 还可以在filebeat内定义 过滤和输出的时候都可以用
}
}
运行结果
用上面的新 stdin 设置重新运行一次最开始的 hello world 示例。我建议大家把整段配置都写入一个文本文件,然后运行命令:logstash -f stdin.conf。输入 "hello world" 并回车后,你会在终端看到如下输出:
{
"message" => "hello world",
"@version" => "1",
"@timestamp" => "2014-08-08T06:48:47.789Z",
"type" => "std",
"tags" => [
[0] "add"
],
"key" => "value",
"host" => "raochenlindeMacBook-Air.local"
}
解释
type 和 tags 是 logstash 事件中两个特殊的字段。通常来说我们会在输入区段中通过 type 来标记事件类型 —— 我们肯定是提前能知道这个事件属于什么类型的。
而 tags 则是在数据处理过程中,由具体的插件来添加或者删除的。
最常见的用法是像下面这样:
input {
stdin {
type => "web" # web类型 后面判断时候用
}
}
filter {
if [type] == "web" {
grok {
match => ["message", %{COMBINEDAPACHELOG}]
}
}
}
output {
if "_grokparsefailure" in [tags] {
nagios_nsca {
nagios_status => "1"
}
} else {
elasticsearch {
}
}
}
读取文件
利用logstash来处理日志文件
一个简单示例
input
file {
path => ["/var/log/*.log", "/var/log/message"] # 文件路径,可以写多个
type => "system" # 定义类型 过滤和输出有时候会用
start_position => "beginning"
}
file {
path => ["/*.log] # 文件路径,可以写多个
type => "log" # 定义类型 过滤和输出有时候会用
start_position => "beginning"
}
}
上面的案例中,
读取了多个文件
,并定义了不同的类型
,当一台服务器读取多种类型的日志文件的时候,可以这样使用
读取网络数据
未来你可能会用 Redis 服务器或者其他的消息队列系统来作为 logstash broker 的角色。不过 Logstash 其实也有自己的 TCP/UDP 插件,在临时任务的时候,也算能用,尤其是测试环境。
小贴士:虽然 LogStash::Inputs::TCP 用 Ruby 的 Socket 和 OpenSSL 库实现了高级的 SSL 功能,但 Logstash 本身只能在 SizedQueue 中缓存 20 个事件。这就是我们建议在生产环境中换用其他消息队列的原因。
简单示例
# 读取网络数据通过服务和端口号进行读取
input {
tcp {
port => 8888
mode => "server"
ssl_enable => false
}
}
常见场景
目前来看,LogStash::Inputs::TCP 最常见的用法就是配合 nc 命令导入旧数据。在启动 logstash 进程后,在另一个终端运行如下命令即可导入数据:
# nc 127.0.0.1 8888 < olddata
用这个命令将数据导出来,在用logstash从这个端口进行读取数据
生成测试数据
实际运行的时候这个插件是派不上用途的,但这个插件依然是非常重要的插件之一。因为每一个使用 ELK stack 的运维人员都应该清楚一个道理:数据是支持操作的唯一真理(否则你也用不着 ELK)。
所以在上线之前,你一定会需要在自己的实际环境中,测试 Logstash 和 Elasticsearch 的性能状况。这时候,这个用来生成测试数据的插件就有用了!
input {
generator { # 使用插件
count => 10000000 # 生成的文档数
message => '{"key1":"value1","key2":[1,2],"key3":{"subkey1":"subvalue1"}}'
codec => json
}
}
output {
stdout {
codec => dots # 将输出的事件变成.
}
}
Dots 也是一个另类的 codec 插件,他的作用是:把每个 event 都变成一个点(.)。这样,在输出的时候,就变成了一个一个的 . 在屏幕上。显然这也是一个为了测试而存在的插件。
下面就要介绍 pv
命令了。。这个命令的作用,就是作实时的标准输入、标准输出监控。我们这里就用它来监控标准输出:
安装pv命令:
yum -y install epel-release
yum -y install pv
$ ./bin/logstash -f generator_dots.conf | pv -abt > /dev/null
2.2MiB 0:03:00 [12.5kiB/s]
刚开始,速度为0,这是因为logstash依赖java环境,起来的比较慢,开始运行的时候,速度依然不快,慢慢增长到比较稳定的状态,这时候才是我们需要的数据
这里单位是 B/s,但是因为一个 event 就输出一个 .,也就是 1B。所以 12.5kiB/s 就相当于是 12.5k event/s。
读取系统日志数据
这里我们研究的是系统日志通过syslog将数据传给logstash,logstash只需要接收数据进行处理的情况
logstash的配置文件
input {
syslog {
port => "514"
}
}
读取redis数据
LogStash::Inputs::Redis 支持三种 data_type(实际上是redis_type),不同的数据类型会导致实际采用不同的 Redis 命令操作
:
- list => BLPOP
- channel => SUBSCRIBE
- pattern_channel => PSUBSCRIBE
配置文件
input {
redis {
data_type => "pattern_channel"
key => "logstash-*"
host => "192.168.0.2" # 主机
port => 6379 # 端口
threads => 5 # 线程数
}
}
使用方式
基本方法
首先确认你设置的 host 服务器上已经运行了 redis-server 服务,然后打开终端运行 logstash 进程等待输入数据,然后打开另一个终端,输入 redis-cli 命令(先安装好 redis 软件包),在交互式提示符后面输入PUBLISH logstash-demochan "hello world"
:
systemctl restart redis-sentinel.service # 开启server服务
# redis-cli
127.0.0.1:6379> PUBLISH logstash-demochan "hello world"
你会在第一个终端里看到 logstash 进程输出类似下面这样的内容:
{
"message" => "hello world",
"@version" => "1",
"@timestamp" => "2014-08-08T16:26:29.399Z"
}
输入 JSON 数据
如果你想通过 redis 的频道给 logstash 事件添加更多字段,直接向频道发布 JSON 字符串就可以了。 LogStash::Inputs::Redis 会直接把 JSON 转换成事件。
继续在第二个终端的交互式提示符下输入如下内容:
127.0.0.1:6379> PUBLISH logstash-chan '{"message":"hello world","@version":"1","@timestamp":"2014-08-08T16:34:21.865Z","host":"raochenlindeMacBook-Air.local","key1":"value1"}'
你会看到第一个终端里的 logstash 进程随即也返回新的内容,如下所示:
{
"message" => "hello world",
"@version" => "1",
"@timestamp" => "2014-08-09T00:34:21.865+08:00",
"host" => "raochenlindeMacBook-Air.local",
"key1" => "value1"
}
127.0.0.1:6379> PUBLISH logstash-demochan "hello world"
(integer) 5
输出结果中 有个数字5 ,这个数字代表的是redis向logstash中输入数据的次数,连着输入了5次