DB优化分布式&高可用Java技术升华

Elasticsearch详解-续

2019-05-22  本文已影响414人  Chandler_珏瑜

5.3 性能调优

 Elasticsearch的默认配置,是综合了数据可靠性、写入速度、搜索实时性等因素。实际使用时,我们需要根据公司要求,进行偏向性的优化。

5.3.1 写优化

 假设我们的应用场景要求是,每秒300万的写入速度,每条500字节左右。
 正对这种对于搜索性能要求不高,但是对写入要求较高的场景,我们需要尽可能的选择恰当写优化策略。综合来说,可以考虑以下几种方面来提升写索引的性能:

1. 批量提交

 Elasticsearch提供了bulk API支持批量操作,当我们有大量的写任务时,可以使用bulk来进行批量写入。每次提交的数据量为多少时,能达到最优的性能,主要受到文件大小、网络情况、数据类型、集群状态等因素影响。
 通用的策略如下:

bulk默认设置批量提交的数据量不能超过100M。数据条数一般是根据文档的大小和服务器性能而定的,但是单次批处理的数据大小应从5MB~15MB逐渐增加,当性能没有提升时,把这个数据量作为最大值。

 我们可以跟着,感受一下bulk接口,如下所示:

$ vi request
$ cat request
{ "index" : { "_index" : "chandler","_type": "test", "_id" : "1" } }
{ "name" : "钱丁君","age": "18" }
$ curl -s -H "Content-Type: application/json" -XPOST localhost:9200/_bulk --data-binary @request; echo
{"took":214,"errors":false,"items":[{"index":{"_index":"chandler","_type":"test","_id":"1","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1,"status":201}}]}
$ curl -XGET localhost:9200/chandler/test/1?pretty
{
  "_index" : "chandler",
  "_type" : "test",
  "_id" : "1",
  "_version" : 1,
  "found" : true,
  "_source" : {
    "name" : "钱丁君",
    "age" : "18"
  }
}

 bulk不支持get操作,因为没什么用处。

2. 优化存储设备

 Elasticsearch是一种密集使用磁盘的应用,在段合并的时候会频繁操作磁盘,所以磁盘要求较高,当磁盘速度提升之后,集群的整体性能会大幅度提高。
 磁盘的选择,提供以下几点建议:

  1. 使用固态硬盘(Solid State Disk)替代机械硬盘。SSD与机械磁盘相比,具有高效的读写速度和稳定性。
  2. 使用RAID 0。RAID0条带化存储,可以提升磁盘读写效率。
  3. 在Elasticsearch的服务器上挂载多块硬盘。使用多块硬盘同时进行读写操作提升效率,在配置文件elasticsearch中设置多个存储路径,如下所示:
path.data:/path/to/data1,/path/to/data2。
  1. 避免使用NFS(Network File System)等远程存储设备,网络 的延迟对性能的影响是很大的。
3. 合理使用合并

 Lucene以段的形式存储数据。当有新的数据写入索引时,Lucene就会自动创建一个新的段。随着数据量的变化,段的数量会越来越多,消耗的多文件句柄数及CPU就越多,查询效率就会下降。
 由于Lucene段合并的计算量庞大,会消耗大量的I/O,所以Elasticsearch默认采用较保守的策略,让后台定期进行段合并,如下所述:

PUT /_cluster/settings
{
    "persistent" : {
        "indices.store.throttle.max_bytes_per_sec" : "100mb"
    }
}
4. 减少refresh的次数

 Lucene在新增数据时,采用了延迟写入的策略,默认情况下索引的refresh_interval为1秒。Lucene将待写入的数据先写到内存中,超过1秒(默认)时就会触发一次refresh,然后refresh会把内存中的的数据刷新到操作系统的文件缓存系统中。如果我们对搜索的实效性要求不高,可以将refresh周期延长,例如30秒。这样还可以有效地减少段刷新次数,但这同时意味着需要消耗更多的Heap内存。如下所示:

index.refresh_interval:30s 
5. 加大flush设置

 flush的主要目的是把文件缓存系统中的段持久化到硬盘,当Translog的数据量达到512MB或者30分钟时,会触发一次Flush。 index.translog.flush_threshold_size 参数的默认值是512MB,我们进行修改。
 增加参数值意味着文件缓存系统中可能需要存储更多的数据,所以我们需要为操作系统的文件缓存系统留下足够的空间。

6. 减少副本的数量

 Elasticsearch为了保证集群的可用性,提供了replicas(副本)支持,然而每个副本也会执行分析、索引及可能的合并过程,所以replicas的数量会严重影响写索引的效率。当写索引时,需要把写入的数据都同步到副本节点,副本节点越多,写索引的效率就越慢。
 如果我们需要大批量进行写入操作,可以先禁止replica复制,设置index.number_of_replicas: 0 关闭副本。在写入完成后,replica修改回正常的状态。

5.3.2 读优化

1. 避免大结果集和深翻

 在5.2.5节中介绍了集群中的查询流程,如果想要查询从from开始的size条数据,需要每个分片查询打分排名在前面的from+size条数据。协同节点将收集到的n✖️(from+size)条数据聚合,再进行一次排序,然后从from+size开始返回size条数据。
 当from、size或者n中有一个值很大的时候,需要参加排序的数量也会增长,这样的查询会消耗很多CPU资源,从而导致效率的降低。
 为了提升查询效率,Elasticsearch提供了scroll和scroll-scan这两种查询模式。

1)scroll

scroll是为检索大量的结果而设计的。例如,我们需要查询1~100页的数据,每页100条数据。
 如果使用search查询:每次都需要在每个分片上查询得分最高的from+100条数据,然后协同节点把收集到的n✖️(from+100)条数据聚合起来再进行一次排序。每次返回from+1开始的100条数据,并且要重复执行100次。
 如果使用scroll查询:在各个分片上查询10000条数据,协同节点聚合n✖️10000条数据进行合并、排序,并将排名前10000的结果快照起来。这样做的好处是减少了查询和排序的次数。
 Scroll初始查询的命令是:

$ vim scroll
$ cat scroll
{
    "query": {
        "match": {
            "name": "钱丁君"
        }
    },
    "size":20
}
$ curl -s -H "Content-Type: application/json; charset=UTF-8" -XGET localhost:9200/chandler/test/_search?scroll=2m --data-binary @scroll; echo
{"_scroll_id":"DnF1ZXJ5VGhlbkZldGNoBQAAAAAAAAAGFlB6Y3QtNk9oUmdpc09Tb21rX2NXQXcAAAAAAAAABxZQemN0LTZPaFJnaXNPU29ta19jV0F3AAAAAAAAAAgWUHpjdC02T2hSZ2lzT1NvbWtfY1dBdwAAAAAAAAAJFlB6Y3QtNk9oUmdpc09Tb21rX2NXQXcAAAAAAAAAChZQemN0LTZPaFJnaXNPU29ta19jV0F3","took":14,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":1,"max_score":0.8630463,"hits":[{"_index":"chandler","_type":"test","_id":"1","_score":0.8630463,"_source":{ "name" : "钱丁君","age": "18" }}]}}

 以上查询语句的含义是,在chandler索引的test type里查询字段name包含“钱丁君”的数据。scroll=2m表示下次请求的时间不能超过2分钟,size表示这次和后续的每次请求一次返回的数据条数。在这次查询的结果中除了返回了查询到的结果,还返回了一个scroll_id,可以把它作为下次请求的参数。
 再次请求的命令,如下所示:

scroll查询.png
 因为这次并没有到分片里查询数据,而是直接在生成的快照里面以游标的形式获取数据,所以这次查询并没有包含index和type,也没有查询条件。
2)scroll-scan

 Scroll是先做一次初始化搜索把所有符合搜索条件的结果缓存起来生成一个快照,然后持续地、批量地从快照里拉取数据直到没有数据剩下。而这时对索引数据的插入、删除、更新都不会影响遍历结果,因此scroll 并不适合用来做实时搜索。其思路和使用方式与scroll非常相似,但是scroll-scan关闭了scroll中最耗时的文本相似度计算和排序,使得性能更加高效。
 为了使用scroll-scan,需要执行一个初始化搜索请求,将search_type设置成scan,告诉Elasticsearch集群不需要文本相似计算和排序,只是按照数据在索引中顺序返回结果集:

$ vi scroll
$ cat scroll
{
    "query": {
        "match": {
            "name": "钱丁君"
        }
    },
    "size":20,
    "sort": [
      "_doc"
    ]
}
$ curl -H "Content-Type: application/json; charset=UTF-8" -XGET 'localhost:9200/chandler/test/_search?scroll=2m&pretty=true' --data-binary @scroll
{
  "_scroll_id" : "DnF1ZXJ5VGhlbkZldGNoBQAAAAAAAABWFlB6Y3QtNk9oUmdpc09Tb21rX2NXQXcAAAAAAAAAVxZQemN0LTZPaFJnaXNPU29ta19jV0F3AAAAAAAAAFgWUHpjdC02T2hSZ2lzT1NvbWtfY1dBdwAAAAAAAABZFlB6Y3QtNk9oUmdpc09Tb21rX2NXQXcAAAAAAAAAWhZQemN0LTZPaFJnaXNPU29ta19jV0F3",
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 1,
    "max_score" : null,
    "hits" : [
      {
        "_index" : "chandler",
        "_type" : "test",
        "_id" : "1",
        "_score" : null,
        "_source" : {
          "name" : "钱丁君",
          "age" : "18"
        },
        "sort" : [
          0
        ]
      }
    ]
  }
}

注意:Elasticsearch 2.1.0版本之后移除了search_type=scan,使用"sort": [ "_doc"]进行代替。

 scroll和scroll-scan有一些差别,如下所示:

  1. scroll-scan不进行文本相似度计算,不排序,按照索引中的数据顺序返回
  2. scroll-scan不支持聚合操作。
  3. scroll-scan的参数size代表着每个分片上的请求的结果数量,每次返回n✖️size条数据。而scroll每次返回size条数据。
2. 选择合适的路由

 ES中所谓的路由和IP网络不同,是一个类似于Tag的东西。在创建文档的时候,可以通过字段为文档增加一个路由属性的Tag。在多分片的Elasticsearch集群中,对搜索的查询大致分为如下两种。

 (1)ES内在机制决定了拥有相同路由属性的文档,一定会被分配到同一个分片上,无论是主分片还是副本。查询时可以根据routing信息,直接定位到目标分片,避免查询所有的分片,再经过协调节点二次排序。如果5-24所示。


图 5-24

 (2)如果在查询条件中不包含routing,在查询时就遍历所有分片,整个查询主要分为Scatter、Sather两个过程。

shard = hash(routing)%number_of_primary_shards

 不过需要注意的是,根据城市id进行分片时,也会容易出现分片不均匀的情况。例如,大型城市的数据过多,而小城市的数据太少,导致分片之间的数据量差异很大。这时就可以进行必要的调整,比如把多个小城市的数据合并到一个分片上,把大城市的数据按区域进行拆分到不同分配。

3. SearchType

 在Scatter、Gather的过程中,节点间的数据传输和打分(SearchType),可以根据不同的场景选择。如下所示

  1. QUERY_THEN_FETCH:Elasticsearch默认的搜索方式。第一步,先向所有的分片发请求,各分片只返回文档的相似度得分和文档的id,然后协调节点按照各分片返回的分数进行重新排序和排名,再取出需要返回給客户端的size个文档id;第2步,在相关的分片中取出文档的详细信息并返回給用户。
  2. QUERY_AND_FETCH:协调节点向所有分片发送查询请求,各分片将文档的相似度得分和文档的详细信息一起返回。然后,协调节点进行重新排序,再取出需要返回給客户端的数据,将其返回給客户端。由于只需要在分片中查询一次,所以性能是最好的。
  3. DFS_QUERY_THEN_FETCH:与QUERY_THEN_FETCH类似,但它包含一个额外的阶段:在初始查询中执行全局的词频计算,以使得更精确地打分,从而让查询结果更相关。QUERY_THEN_FETCH使用的是分片内部的词频信息,而DFS_QUERY_THEN_FETCH使用访问公共的词频信息,所以相比QUERY_THEN_FETCH性能更低。
  4. DFS_QUERY_AND_FETCH:与QUERY_AND_FETCH类似,不过使用的是全局的词频。
4. 定期删除

 由于在Lucene中段具有不变性,每次进行删除操作后不会立即从硬盘中进行实际的删除,而是产生一个.del文件记录删除动作。随着删除操作的增长,.del文件会越来也多。当我们进行查询操作的时候,被删除的数据还会参与检索中,然后根据.del文件进行过滤。.del文件越多,查询过滤过程越长,进而影响查询的效率。当机器空闲时,我们可以通过如下命令删除文件,来提升查询的效率:

$ curl -XPOST localhost:9200/chandler/_forcemerge?only_expunge_deletes=true
{"_shards":{"total":10,"successful":5,"failed":0}}

定期对不再更新的索引做optimize (ES2.0以后更改为force merge api)。这Optimze的实质是对segment file强制做合并,可以节省大量的segment memory。

5.4.3 堆大小的设置

 Elasticsearch默认安装后设置的内存是1GB,对于任何一个现实业务来说,这个设置都太小了。如果是通过解压安装的Elasticsearch,则在Elasticsearch安装文件中包含一个jvm.option文件,添加如下命令来设置Elasticsearch的堆大小:

-Xms10g
-Xmx10g

 Xms表示堆的初始大小,Xmx表示可分配的最大内存,都是10GB。确保Xmx和Xms的大小是相同的,其目的是为了能够在java垃圾回收机制清理完堆区后不需要重新分隔计算堆区的大小而浪费资源,可以减轻伸缩堆大小带来的压力。
 也可以通过设置环境变量的方式设置堆的大小。服务进程在启动时候会读取这个变量,并相应的设置堆的大小。比如:

export ES_HEAP_SIEZE=10g

 也可以通过命令行参数的形式,在程序启动的时候把内存大小传递给Elasticsearch,如下所示:

./bin/elasticsearch -Xmx10g -Xms10g

这种设置方式是一次性的,在每次启动Elasticsearch时都需要添加。

 假设你有一个64G内存的机器,按照正常思维思考,你可能会认为把64G内存都给Elasticsearch比较好,但现实是这样吗, 越大越好?虽然内存对Elasticsearch来说是非常重要的,但是答案是否定的!因为Elasticsearch堆内存的分配需要满足以下两个原则:

 Java使用内存指针压缩(Compressed Oops)技术来解决这个问题。它的指针不再表示对象在内存中的精确位置,而是表示偏移量。这意味着32位的指针可以引用4GB个Byte,而不是4GB个bit。也就是说,当堆内存为32GB的物理内存时,也可以用32位的指针表示。
 不过,在越过那个神奇的边界----32GB时,指针就会变为普通对象的指针,每个对象的指针都变长了,就会浪费更多的内存,降低了CPU的性能,还要让GC应对更大的内存。事实上,当内存到达40~40GB时,有效的内存才相当于内存对象指针压缩技术时的32GB内存。所以即便你有足够的内存,也尽量不要超过32G,比如我们可以设置为31GB:

-Xms31g
-Xmx31g

  32GB是ES一个内存设置限制,那如果你的机器有很大的内存怎么办呢?现在的机器内存普遍增长,甚至可以看到有300-500GB内存的机器。这时我们需要根据业务场景,进行恰当内存的分配。

5.3.4 服务器配置的选择

swapping是性能的坟墓

 在选择Elasticsearch服务器时,要尽可能地选择与当前应用场景相匹配的服务器。如果服务器配置很低,则意味着需要更多的节点,节点数量的增加会导致集群管理的成本大幅度提高。如果服务器配置很高,,而在单机上运行多个节点时,也会增加逻辑的复杂度。
 在计算机中运行的程序均需在内存执行,若内存消耗殆尽将导致程序无法进行。为了解决这个问题,操作系统使用一种叫作虚拟内存的技术。当内存耗尽时,操作系统就会自动把内存中暂时不使用的数据交换到硬盘中,需要使用的时候再从硬盘交换到内存。
 如果内存交换到磁盘上需要10毫秒,从磁盘交换到内存需要20浩渺,那么多的操作时延累加起来,将导致几何级增长。不难看出swapping对于性能是多么可怕。所以为了使Elasticsearch有更好等性能,强烈建议关闭swap
 关闭swap的方式如下。
 (1)暂时禁用。如果我们想要在linux服务器上暂时关闭,可以执行如下命令,但在服务器重启后失效:

sudo swapoff -a

 (2)永久性关闭。我们可以修改/etc/sysctl.conf(不同等操作系统路径有可能不同),增加如下参数:

vm.swappiness = 1      //0-100,则表示越倾向于使用虚拟内存。

注意:swappiness设置为1比设置为0要好,因为在一些内核版本,swappness=0会引发OOM(内存溢出)。

  swappiness默认值为60,当设置为0时,在某些操作系统中有可能会触发系统级的OOM-killer,例如在Linux内核的内存不足时,为了防止系统的崩溃,会自动强制kill一个“bad”进程。
 (3)在Elasticsearch中设置。如果上面的方法都不能做到,你需要打开配置文件中的mlockall开关,它的作用就是运行JVM锁住内存,禁止OS交换出去。在elasticsearch.yml配置如下:

bootstrap.mlockall: true

5.3.5 硬盘的选择和设置

 所以,如果条件允许,则请尽可能地使用SSD,它的读写性能将远远超出任何旋转介质的硬盘(如机械硬盘、磁带等)。基于SSD的Elasticsarch集群节点对于查询和索引性能都有提升。
 另外无论是使用固态硬盘还是使用机械硬盘,我们都建议将磁盘的阵列模式设置为RAID 0,以此来提升磁盘的写性能。

5.4.6 接入方式

 Elastic search提供了Transport Client(传输客户端)和Node Client(节点客户端)的接入方式,这两种方式各有利弊,分别对应不同的应用场景。

1. Transport Client

 Transport Client作为一个集群和应用程序之间的通信层,和集群是安全解耦的。由于与集群解耦,所以在连接集群和销毁连接时更加高效,适合大量的客户端连接。

2. Node Client

 Node Client把应用程序当作一个集群中的Client节点(非Data和Master节点)。由于它是集群一个的内部节点,意味着它可以感知整个集群的状态、所有节点的分布情况、分片的分布状况等。
 由于Node Client是集群的一部分,所以在接入和退出集群时进行比较复杂操作,并且还会影响整个集群的状态,所以Node Client更适合少量客户端,能够提供更好的执行效率。

5.4.7 角色隔离和脑裂

1. 角色隔离

 Elasticsearch集群中的数据节点负责对数据进行增、删、改、查和聚合等操作,所以对CPU、内存和I/O的消耗很大。在搭建Elasticsearch集群时,我们应该对Elasticsearch集群中的节点进行角色划分和隔离。
 候选主节点:

node.master=true
node.data=false

 数据节点:

node.master=false
node.data=true

 最后形成如图5-26所示的逻辑划分。


图 5-26 elasticsearch集群
2. 避免脑裂

 网络异常可能会导致集群中节点划分出多个区域,区域发现没有master节点的时候,会选举出了自己区域内Maste节点r,导致一个集群被分裂为多个集群,使集群之间的数据无法同步,我们称这种现象为脑裂。为了防止脑裂,我们需要在Master节点的配置文件中添加如下参数:

discovery.zen.minimum_master_nodes=(master_eligible_nodes/2)+1        //默认值为1

 其中master_eligible_nodes为Master集群中的节点数。这样做可以避免脑裂的现象都出现,最大限度地提升集群的高可用性。只要不少于discovery.zen.minimum_master_nodes个候选节点存活,选举工作就可以顺利进行。

5.4 Elasticsearch实战

5.4.1 Elasticsearch的配置说明

 在Elasticsearch安装目录下的conf文件夹中包含了一个重要的配置文件:elasticsearch.yaml。
 Elasticsearch的配置信息有很多种,大部分配置都可以通过elasticsearch.yaml和接口的方式进行。下面我们列出一些比较重要的配置信息。

5.4.2 常用接口

 虽然现在有很多开源软件对Elasticsearch的接口进行了封装,使我们可以很方便、直观地监控集群的状况,但是在Elasticsearch 5以后,很多软件开始收费。了解常用的接口有助于我们在程序或者脚本中查看我们的集群情况,以下接口适用于Elasticsearch 6.5.2版本。

1. 索引类接口
PUT http://localhost:9200/indexname?pretty
content-type →application/json; charset=UTF-8
{
    "settings":{
        "number_of_shards" : 3,
        "number_of_replicas" : 1
    }
}
创建索引
DELETE http://localhost:9200/indexname
删除索引
 通过该接口就可以删除索引名称为indexname的索引。
 通过下面的接口可以删除多个索引。
DELETE http://localhost:9200/indexname1,indexname2

DELETE http://localhost:9200/indexname*

 通过下面的接口可以删除集群下的全部索引。

DELETE http://localhost:9200/_all
DELETE http://localhost:9200/*

 进行全部索引删除是很危险的,我们可以通过在配置文件中添加下面的配置信息,来关闭使用_all和使用通配符删除索引的接口,使用删除索引职能通过索引的全称进行。

action.destructive_requires_name: true
GET http://localhost:9200/indexname?pretty
查询索引
POST http://localhost:9200/indexname/_close
POST http://localhost:9200/indexname/_open
打开关闭索引
GET http://localhost:9200/indexname/typename/_mapping?pretty

 当一个索引中有多个type时,获得mapping时要加上typename。

2、document操作

 安装ES和Kibana之后,进入Kibana操作页面,然后进去的DevTools执行下面操作:

#添加一条document
PUT /test_index/test_type/1
{
    "test_content":"test test"
}

#查询
GET /test_index/test_type/1
#返回
{
  "_index" : "test_index",
  "_type" : "test_type",
  "_id" : "1",
  "_version" : 2,
  "found" : true,
  "_source" : {
    "test_content" : "test test"
  }
}
添加document
查询document

 put /index/type/id,说明如下:

5.4.3 接口应用

1、search接口

 search是我们最常用的API,ES给我提供了丰富的查询条件,比如模糊匹配match,字段判空exists,精准匹配term和terms,范围匹配range

GET /_search
{
  "query": { 
    "bool": { 
      "must": [     //must_not
        { "match": { "title":   "Search"        }}, 
        { "match": { "content": "Elasticsearch" }},
        {"exists":{"field":"字段名"}}   //判断字段是否为空
      ],
      "filter": [ 
        { "term":  { "status": "published" }},
        { "terms":  { "status": [0,1,2,3] }},//范围
        { "range": { "publish_date": { "gte": "2015-01-01" }}} //范围gte:大于等于;gt:大于;lte:小于等于;lt:小于
      ]
    }
  }
}

 查询索引为test_index,doc类型为test_type的数据。

GET /test_index/test_type/_search
ES查询一

 查询索引为test_index,doc类型为test_type,docment字段num10为4的数据

GET /test_index/test_type/_search?pretty=true
{
  "query": { 
    "bool": { 
      "filter": [ 
        { "term":  { "num10": 4 }}
      ]
    }
  }
}
ES精确查询.png

 更多查询条件的组合,大家可以自行测试。

2、修改mapping
PUT /my_index/_mapping/my_type
{
  "properties": {
       "new_field_name": {
           "type":     "string"         //字段类型,string、long、boolean、ip
       }
   }
}

 如上是修改mapping结构,然后利用脚本script给字段赋值:

POST my_index/_update_by_query
{
  "script": {
    "lang": "painless",
    "inline": "ctx._source.new_field_name= '02'"
  }
}
3、修改别名

 如下給index为test_index的数据绑定alias为test_alias

POST /_aliases
{
  "actions": [
    {
      "add": {      //add,remove
        "index": "test_index",
        "alias": "test_alias"
      }
    }
  ]
}
新增别名关联

 验证别名关联,根据别名来进行数据查询,如下:

GET /test_alias/test_type/3
根据别名操作es数据
4、定制返回内容

 _source元数据:就是说,我们在创建一个document的时候,使用的那个放在request body中的json串(所有的field),默认情况下,在get的时候,会原封不动的给我们返回回来。

定制返回的结果,指定_source中,返回哪些field。

#语法:
GET /test_index/test_type/1?_source=test_field2
#返回
{
  "_index" : "test_index",
  "_type" : "test_type",
  "_id" : "1",
  "_version" : 3,
  "found" : true,
  "_source" : {
    "test_field2" : "test field2"
  }
}

#也可返回多个field使用都好分割
GET /test_index/test_type/1?_source=test_field2,test_field1
_source测试
_source定制返回内容

5.4.4 JAVA封装

 组件elasticsearch.jar提供了丰富API,不过不利于我们理解和学习,现在我们自己来进行封装。
 组件API使用RestClient封装document查询接口:

/**
 * @param index
 * @param type
 * @param id
 * @param fields
 *            查询返回字段,可空
 * @return
 * @throws Exception
 * @Description:
 * @create date 2019年4月3日下午3:12:40
 */
public String document(String index, String type, String id, List<String> fields) throws Exception {
    Map<String, String> paramsMap = new HashMap<>();
    paramsMap.put("pretty", "true");
    if (null != fields && fields.size() != 0) {
        String fieldValue = "";
        for (String field : fields) {
            fieldValue += field + ",";
        }
        if (!"".equals(fieldValue)) {
            paramsMap.put("_source", fieldValue);
        }
    }
    return CommonUtils.toString(es.getRestClient()
            .performRequest("GET", "/" + index + "/" + type + "/" + id, paramsMap).getEntity().getContent());
}

 工程使用,封装:

public String searchDocument(String index, String type, String id, List<String> fields) {
    try {
        return doc.document(index, type, id, fields);
    } catch (Exception e) {
        log.error(e.getMessage());
        ExceptionLogger.log(e);
        throw new RuntimeException("ES查询失败");
    }
}

 测试用例,代码如下:

/**
 * ES交互验证-查询、更新等等操作
 *
 * @version
 * @author 钱丁君-chandler 2019年4月3日上午10:27:28
 * @since 1.8
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Bootstrap.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ESManagerTest {

    @Autowired
    private ESBasicManager esBasicManager;

    @Test
    public void query() {
        String result = esBasicManager.searchDocument(ESTagMetadata.INDEX_ALIAS, ESTagMetadata.DOC_TYPE,
                "188787665220752824", ImmutableList.of("signup_time", "tag_days_no_visit_after_1_order"));
        System.out.println("----------->" + result);
    }
}

 控台输出:

----------->{
  "_index" : "crm_tag_idx_20181218_708672",
  "_type" : "crm_tag_type",
  "_id" : "188787665220752824",
  "_version" : 1,
  "found" : true,
  "_source" : {
    "signup_time" : "2017-12-24",
    "tag_days_no_visit_after_1_order" : "339"
  }
}

 我只是抛砖引玉,大家可以自行进行各种操作的封装,不管对于理解ES的使用,还是对代码质量提升都有很多帮助。

最后谢谢大家观赏

《Elasticsearch详解》

如果需要給我修改意见的发送邮箱:erghjmncq6643981@163.com
资料参考:《可伸缩服务架构》
转发博客,请注明,谢谢。

上一篇 下一篇

猜你喜欢

热点阅读