Elasticsearch 使用logstash 和elasti

2019-11-14  本文已影响0人  miss_c44c

使用工具 :

初步方案:

  1. 直接通过logstash 进行不同集群之间的数据迁移。

         优点 :处理方便,没有繁琐步骤。

         缺点 :同一版本集群,或集群版本相差不大

  2. 可以通过elasticdump 进行数据导出,再进行logtash 将导出数据写入目标集群。

         优点:跨版本导数据不易出现异常。频繁读写,不会影响线上集群服务 。

         缺点:分段操作较繁琐,占用本地磁盘

  3. 导到目标集群前,需先建好索引及索引配置,可以通过模板预建立

准备步骤:

logstash 下载

下载地址:https://www.elastic.co/cn/downloads/logstash

环境准备 :
环境 备注
JDK 8 或 11 需要配置环境jdk 为 jdk8 或 jdk11
ruby 如需要通过脚本做特属于处理需要安装,不做处理可以不安装
logstash 导数据 elasticsearch:
input {
 elasticsearch {
 hosts => "ip:port"
 index => "index_name"
 query => '{ "query": { "match_all":{ }} }'
 size => 500
 scroll => "5m"
 docinfo => true
 codec => json
 }
 }

filter {
 mutate {
 rename => { 
 # 将字段http_referer重命名为http_source
 "createTime" => "created_at"
 "likeNum" => "like_counter"
 }
# logstash 导出数据会产生一些特殊字段
 remove_tag => ["_grokparsefailure"]
 remove_field => ["@timestamp" , "@version"]
 }
# 导出数据可以添加一些过滤条件
 if[delete] == 1 {
 drop {}
 }
}
# 输出到目标集群
output {
 elasticsearch {
 hosts => ["target_ip:target_port"]
 action => "update" #upsert 有则更新,无则插入
 document_type => "_doc" # 索引type
 document_id => "%{[@metadata][_id]}" # 索引对应的id 字段
 index => "target_index_name"  # 目标索引名称
 doc_as_upsert => true
 }
}

elasticdump 下载

安装准备:

一 、

安装NodeJS

下载源码:wget http://nodejs.org/dist/v0.10.32/node-v0.10.32-linux-x64.tar.gz
解压:tar xvf node-v0.10.22-linux-x64.tar.gz

二 、

安装NPM
curl -L https://npmjs.org/install.sh | sh**

三、

安装elasticdump

npm install elasticdump -g

数据导出:

elasticdump 导出数据脚本:

elasticdump --input=ip:port/index_name --output=file_path --type=data --sourceOnly=true --ignore-errors=true

参数说明 :

--ignore-errors=true :忽略异常造成的中断
--sourceOnly=true : 只导出source 下的数据
其余参数参考官方文档:https://github.com/taskrabbit/elasticsearch-dump

logstash 通过文件导入数据:
input {
   file{
     path => ["file_path"]
     codec => "json"
     start_position => "beginning"
  }
}



filter {
        mutate {
          remove_tag => ["_grokparsefailure"]
          remove_field => ["@timestamp" , "@version", "host", "path"]
      }
     ## 通过ruby 进行特殊字段添加处理
      ruby {
         code  => '  time =  event.get("createTime") ;
         likeNum = event.get("likeNum");
         if !time.nil?
            event.set("create_time_range",Time.at(Integer(time)).strftime("%Y-%m"))
         end
         if !likeNum.nil?
            if Integer(likeNum)>300
                 event.set("like_range","1")
            end
         end
'
      }
}

output {
     elasticsearch {
          hosts => ["target_ip:target_port"]
          action => "update"
          document_type => "index_type"
          document_id  => "%{id}"
          index => "index_name"
          doc_as_upsert  => true
 }
}

备注:

logstash 多次使用后,需要清理游标,否则会导致数据写入不进去。

  1. 清理sincedb_path


    image.png

    sincedb_path :表示文件读取进度的记录,每行表示一个文件,每行有两个数字

  2. 清理logstash/data/ 目录下隐藏文件 .lock

上一篇 下一篇

猜你喜欢

热点阅读