Elasticsearch专栏

Elasticsearch Ingest Pipeline数据预

2021-01-26  本文已影响0人  蒙嘉

1、Pipeline定义

在Elasticsearch 5.0版本以后引入了ingest node,用于在文档被索引之前进行预处理。
Pipeline 定义了一系列按顺序执行的 processors, 一个 pipeline 由 description 和 processors两部分组成:

PUT _ingest/pipeline/my-pipeline-id
{
  "description" : "...",
  "processors" : [ ... ]
}

2、Pipeline获取文档信息

2.1 pipeline中的 processors 可以获取到传入的文档的原始数据以及元数据,并进行读写操作。获取单个字段可以直接使用字段名:

{
  "set": {
    "field": "my_field",
    "value": 582.1
  }
}

或者使用_source前缀:

{
  "set": {
    "field": "_source.my_field",
    "value": 582.1
  }
}

2.2 pipeline同样可以访问文档中的元信息,_index、_type、_id、_routing等。

{
  "set": {
    "field": "_id",
    "value": "1"
  }
}

2.3 pipeline中还可以获取Ingest 的元信息,ingest的元数据信息是只存在与文档预处理期间,当文档被pipeline处理完成,ingest元信息也就消息了。

{
  "set": {
    "field": "received",
    "value": "{{_ingest.timestamp}}"
  }
}

此处使用了{{}},这种方式同样可以作为模板获取文档内的字段,比如下面field_c的内容是field_a与field_b连接。

{
  "set": {
    "field": "field_c",
    "value": "{{field_a}} {{field_b}}"
  }
}

在pipeline中还支持使用动态字段,如下将service字段的值当做字段名,将code的值当做字段值:

{
  "set": {
    "field": "{{service}}",
    "value": "{{code}}"
  }
}

3、使用Simulate Pipeline API

在定义了pipeline之后,可以使用Simulate Pipeline API 对文档进行预处理执行测试。

POST /_ingest/pipeline/my-pipeline-id/_simulate
{
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "bar"
      }
    },
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "rab"
      }
    }
  ]
}
POST /_ingest/pipeline/_simulate
{
  "pipeline" :
  {
    "description": "_description",
    "processors": [
      {
        "set" : {
          "field" : "field2",
          "value" : "_value"
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "bar"
      }
    },
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "rab"
      }
    }
  ]
}

4、常用processors类型详解

具体详解请看官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/7.9/ingest-processors.html
此处主要说明几个关键常用。
4.1 Append Processor
该Processor追加一个或者多个值到现存的数组字段中,如果字段本身是标量则将其转换成数组再添加,如果字段不存在则创建包含该值的数字。

{
  "append": {
    "field": "tags",
    "value": ["production", "{{app}}", "{{owner}}"]
  }
}

4.2 Convert Processor
该Processor可以转换当前正在处理的字段数据类型,比如将一个String转换成Integer,如果是数组,则数组中所有成员都会被转换。目前只支持:integer, long, float, double, string, boolean, and auto。

PUT _ingest/pipeline/my-pipeline-id
{
  "description": "converts the content of the price field to an double",
  "processors" : [
    {
      "convert" : {
        "field" : "price",
        "type": "double"
      }
    }
  ]
}

4.3 Date Index Name Processor
该 Processor 可以指定文档中的日期字段,将文档分配到符合指定时间格式的索引中,前提是按照官方提供的说明来进行使用。

PUT _ingest/pipeline/monthlyindex
{
  "description": "monthly date-time index naming",
  "processors" : [
    {
      "date_index_name" : {
        "field" : "date1",
        "index_name_prefix" : "my-index-",
        "date_rounding" : "M"
      }
    }
  ]
}

PUT /my-index/_doc/1?pipeline=monthlyindex
{
  "date1" : "2020-04-25T12:02:01.789Z"
}

{
  "_index" : "my-index-2020-04-01",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 55,
  "_primary_term" : 1
}

上面的不会放入myindex索引中,而是myindex-2016-04-01 索引中,该功能与template配合使用可以按日期进行索引。

4.4 Script Processor
该 Processor 是 Ingest 中功能最强大的Processor,利用Elasticsearch提供的脚本能力。

{
  "script": {
    "lang": "painless",
    "source": "ctx.field_a_plus_b_times_c = (ctx.field_a + ctx.field_b) * params.param_c",
    "params": {
      "param_c": 10
    }
  }
}

4.5 Set Processor
该 Processor 用于指定字段的值,如果该字段存在时,则修改字段的值,如果该字段不存在,则新增字段并设置该字段的值。

{
  "description" : "sets the value of count to 1"
  "set": {
    "field": "count",
    "value": 1
  }
}

该处理器也可以用其他字段的值动态赋值:

PUT _ingest/pipeline/set_os
{
  "description": "sets the value of host.os.name from the field os",
  "processors": [
    {
      "set": {
        "field": "host.os.name",
        "value": "{{os}}"
      }
    }
  ]
}

POST _ingest/pipeline/set_os/_simulate
{
  "docs": [
    {
      "_source": {
        "os": "Ubuntu"
      }
    }
  ]
}
上一篇下一篇

猜你喜欢

热点阅读