ElasticSearch之(四)——索引、文档、映射管理
https://www.cnblogs.com/crystaltu/articles/6992935.html
一、索引管理
- 创建索引
PUT
http://127.0.0.1:9200/blog
{
"settings" : {
"index" : {
"number_of_shards" : 3, //分片数,创建后不可修改
"number_of_replicas" : 1 //副本数
}
}
}
返回
{
"acknowledged": true,//索引创建成功
"shards_acknowledged": true,//所需数量的副本+分片创建成功
"index": "blog"
}
- 索引别名
PUT twitter
{
"aliases" : {
"alias_1" : {},
"alias_2" : {
"filter" : {
"term" : {"user" : "kimchy" }
},
"routing" : "kimchy"
}
}
}
- 更新副本
Elasticsearch 支持修改一个己存在索引的副本数
PUT blog/_settings
{
”number of replicas”: 2
}
- 读写权限
blocks.read_only:true 设置当前索引只允许读不允许写或者更新。
blocks. read:true 禁止对当前索引进行读操作。
blocks.write:true 禁止对当前索引进行写操作
PUT blog/_settings
{
"blocks. write":true
}
- 查看索引
GET blog/_settings
{
"blogs": {
"settings": {
"index": {
"creation_date": "1541150638499",
"number_of_shards": "2",
"number_of_replicas": "1",
"uuid": "D7KEzlRFQdOapGY64iXxOw",
"version": {
"created": "6040299"
},
"provided_name": "blogs"
}
}
}
}
- 删除索引
DELETE http://127.0.0.1:9200/blogs
{
"acknowledged": true
}
- 索引的打开和关闭
Elasticsearch中的索引可以进行打开和关闭操作, 一个关闭了的索引几乎不占用系统资源。
POST /blog/_close
POST /blog/_open
- 复制索引
_reindexAPI可以把文档从一个索引(源索引)复制到另一个索引(目标索引),目标索引不会复制源索引中的配置信息,reindex操作之前需要设置目标索引的分片数、副本数等信息。(也可以使用type和query来做限制,只复制原索引的一部分)
POST reindex
{
"source": {
"index": "blog",
"type":"article",
"query":{
"term":{"title":"git"}
}
},
"dest": {
"index": "blog news"
}
}
- 收缩索引
使用 shrink index AP 提供的缩小索引分片数机制,把一个索引变成一个更少分片的索引,收缩后的分片数必须是原始分片数的因子,比如有8个分片的索引可以收缩为4、2、1,有15个分片的索引可以收缩为5、3、l,如果分片数为素数(7、ll等),那么只能收缩为1个分片。
在收缩索引前,索引必须标记为已读。
步骤:
先把所有主分片都转移到一台主机上;
在这台主机上创建一个新索引,分片数较小,其他设置和原索引一致;
把原索引的所有分片,复制(或硬链接)到新索引的目录下;
对新索引进行打开操作恢复分片数据;
(可选)重新把新索引的分片均衡到其他节点上。
PUT /my_source_index/_settings
{
"settings": {
<!-- 指定进行收缩的节点的名称 -->
"index.routing.allocation.require._name": "shrink_node_name",
<!-- 阻止写,只读 -->
"index.blocks.write": true
}
}
进行收缩:
POST my_source_index/_shrink/my_target_index
{
"settings": {
"index.number_of_replicas": 1,
"index.number_of_shards": 1,
"index.codec": "best_compression"
}
}
- 拆分索引
当索引的分片容量过大时,可以通过拆分操作将索引拆分为一个倍数分片数的新索引。能拆分为几倍由创建索引时指定的index.number_of_routing_shards 路由分片数决定。这个路由分片数决定了根据一致性hash路由文档到分片的散列空间。
如index.number_of_routing_shards = 30 ,指定的分片数是5,则可按如下倍数方式进行拆分:
5 → 10 → 30 (split by 2, then by 3)
5 → 15 → 30 (split by 3, then by 2)
5 → 30 (split by 6)
注:只有在创建时指定了index.number_of_routing_shards 的索引才可以进行拆分,ES7开始将不再有这个限制。
PUT my_source_index
{
"settings": {
"index.number_of_shards" : 1,
<!-- 创建时需要指定路由分片数 -->
"index.number_of_routing_shards" : 2
}
}
设置索引只读
PUT /my_source_index/_settings
{
"settings": {
"index.blocks.write": true
}
}
做拆分
POST my_source_index/_split/my_target_index
{
"settings": {
<!--新索引的分片数需符合拆分规则-->
"index.number_of_shards": 2
}
}
监控拆分过程:
GET _cat/recovery?v
GET _cluster/health
二、文档管理
- 新建文档
指定文档id,新增或者修改
PUT blogs/article/1
{
"id":1,
"title":"Git简介",
"posttime":"2017 - 05-01",
"content ":"Git 是一款免费、开源的分布式版本控制系统"
}
返回
{
"_index": "blogs",
"_type": "article",
"_id": "1", //文档id
"_version": 1,
"result": "created",
"_shards": {
"total": 2, //所在分片有2个副本
"successful": 1, //1个副本上成功写入
"failed": 0 //失败副本数
},
"_seq_no": 0,
"_primary_term": 1
}
不指定文档id,新增并返回id
POST blogs/article
{
"id":1,
"title":"Git简介",
"posttime":"2017 - 05-01",
"content ":"Git 是一款免费、开源的分布式版本控制系统"
}
返回
{
"_index": "blogs",
"_type": "article",
"_id": "yT9942YBJT1L8mzoKCO2",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 1
}
- 获取文档
GET http://127.0.0.1:9200/blogs/article/1
返回
{
"_index": "blogs",
"_type": "article",
"_id": "1",
"_version": 1,
"found": true,
"_source": {
"id": 1,
"title": "Git简介",
"posttime": "2017 - 05-01",
"content ": "Git 是一款免费、开源的分布式版本控制系统"
}
}
- 批量获取
POST http://127.0.0.1:9200/blogs/_mget
{
"ids":["1","2"]
}
返回
{
"docs": [
{
"_index": "blogs",
"_type": "article",
"_id": "1",
"_version": 1,
"found": true,
"_source": {
"id": 1,
"title": "Git简介",
"posttime": "2017 - 05-01",
"content ": "Git 是一款免费、开源的分布式版本控制系统"
}
},
{
"_index": "blogs",
"_type": "article",
"_id": "2",
"_version": 1,
"found": true,
"_source": {
"id": 1,
"title": "svn简介",
"posttime": "2017 - 05-01",
"content ": "svn 是一款免费、开源的分布式版本控制系统"
}
}
]
}
- 更新文档
PUT http://127.0.0.1:9200/test/type1/1
{
"counter":1,
"tags":["red"]
}
返回
{
"_index": "test",
"_type": "type1",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 1
}
- 更新请求
POST http://127.0.0.1:9200/test/type1/1/_update
{
"script":{
"inline":"ctx._source.counter += params.count",
"lang":"painless",
"params":{
"count":4
}
}
"upsert" : { "counter" : 1 }
}
返回
{
"_index": "test",
"_type": "type1",
"_id": "1",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 1,
"_primary_term": 1
}
命令中inline是执行的脚本,ctx是脚本语言中的一个执行对象,painless是Elasticsearch内置的一种脚本语言,params是参数集合。上述命令的自然语言描述如下:使用painless脚本更新文档,通过ctx获取_source再修改counter字段,counter字段等于原值加上count参数的值。ctx对象除了可以访问_source之外,还可以访问_index_type、_id、_version、_routing、_parent等字段。
upsert字段:如果不存在文档,则新增文档,并新增一个字段counter。
增加文档中字段
POST 'localhost:9200/test/type1/1/_update' -d '{
"script" : "ctx._source.name_of_new_field = \"value_of_new_field\""
}'
删除文档
POST 'localhost:9200/test/type1/1/_update' -d '{
"script" : "ctx._source.remove(\"name_of_field\")"
}'
删除指定文档
{
"script":{
"inline":"if(ctx._source.tags.contains(params.tag)){ctx.op=\"delete\"}else{ctx.op=\"none\"}",
"lang":"painless",
"params":{"tag":"red"}
}
- 查询并更新
POST blog/_update_by_query
{
"script": {
"inline": "ctx._source.category=params.category",
"lang": "painless",
"params": {
"category": "git"
}
},
"query": {
"term": {
"title": "git"
}
}
}
- 删除文档
DELETE http://127.0.0.1:9200/blog/article/1
返回
{
"_index": "blog",
"_type": "article",
"_id": "1",
"_version": 1,
"result": "not_found",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 5
}
删除时也可以指定路由,DELETE http://127.0.0.1:9200/blog/article/1?routing=user123
- 查询并删除
POST http://127.0.0.1:9200/blog/_delete_by_query
{
"query":{
"term":{
"titie":"hibernate"
}
}
}
- 批量操作
Bulk API 允许使用单一请求来实现多个文档的 create 、 index、update 或 delete 。
curl -XPOST 'localhost:9200/customer/external/_bulk?pretty' -d '
{ "index" : { "_index" : "test", "_type" : "_doc", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "_doc", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "_doc", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "_doc", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }
action and meta data 行指定了将要在哪个文档中执行什么操作,其中 action 必须是 index 、create 、 update 或者 delete, metadata 需要指明需要被操作文档的_index 、_type 以及_id 。
Elasticsearch 响应包含一个 items 数组,它罗列了每一个请求的结果,结果的顺序与请求的顺序相同:
{
"took": 30,
"errors": false,
"items": [
{
"index": {
"_index": "test",
"_type": "_doc",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 201,
"_seq_no" : 0,
"_primary_term": 1
}
},
{
"delete": {
"_index": "test",
"_type": "_doc",
"_id": "2",
"_version": 1,
"result": "not_found",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 404,
"_seq_no" : 1,
"_primary_term" : 2
}
},
{
"create": {
"_index": "test",
"_type": "_doc",
"_id": "3",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 201,
"_seq_no" : 2,
"_primary_term" : 3
}
},
{
"update": {
"_index": "test",
"_type": "_doc",
"_id": "1",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 200,
"_seq_no" : 3,
"_primary_term" : 4
}
}
]
}
使用 Bulle 操作需要注意一次提交请求文件的大小,整个批量请求需要被加载到接受请求节点的内存里, 所以请求越大 , 给其他请求可用的内存就越小。有一个最佳的 Bulk 请求大小,超过这个大小,性能不再提升而且可能降低 。 一般一个好的批次最好保持在 5~15MB 之间 。
三、版本控制
当我们使用 Elasticsearch 的 API 进行文档更新的时候整个过程如下:首先读取源文档,对原文档进行更新操作,更新操作执行完成以后再重新索引整个文档 。 不论执行多少次更新,最后保存在 Elasticsearch 中的是最后一次更新后的文档。但是如果有两个线程同时修改一个文档,这时候就会发生冲突。
图片.png
Elasticsearch 使用的就是乐观锁机制(假定不会发生并发访问冲突,对数据资源不会锁定,只有在数据提交操作时检查是否违反数据完整性。乐观锁适用于读操作比较多的应用类型,可省去锁开销,可以提高吞吐量。)
Elasticsearch 是一个分布式系统,当文档被创建、更新、删除,新版本的文档必须要复制到集群中的其他节点。 Elasticsearch 也是异步并发的,这意味着复制请求会被并行发送,也意味着请求不是按顺序到达的, Elasticsearch 需要一种方式确保旧版本的文档不会覆盖较新版本的文档。
我们在前面对文档进行索引时提到,文档每被修改→次,文档版本号会自增一次。
Elasticsearch 使用_version 字段确保所有的更新都有序进行。 如果一个低版本的文档在一个高版本的文档之后到达,那么旧版本的文档会被忽略。 Elasticsearch 的文档版本控制机制主要有内部版本控制和外部版本控制,内部版本控制机制要求每次操作请求,只有当版本号相等时才能操作成功, 外部版本控制要求外部文档版本比内部文档版本高时才能更新成功。下面通过具体实例演示一遍 。
创建文档
PUT /website/blog/1/_create
{
"title": "My first blog entry",
"text": "Just trying this out..."
}
查询结果,_version为1
GET /website/blog/1
{
"_index" : "website",
"_type" : "blog",
"_id" : "1",
"_version" : 1,
"found" : true,
"_source" : {
"title": "My first blog entry",
"text": "Just trying this out..."
}
}
version=1 更新文档
PUT /website/blog/1?version=1
{
"title": "My first blog entry",
"text": "Starting to get the hang of this..."
}
返回version自增为2
{
"_index": "website",
"_type": "blog",
"_id": "1",
"_version": 2
"created": false
}
如果再以version=1更新,则报异常version_conflict_engine_exception
{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[blog][1]: version conflict, current [2], provided [1]",
"index": "website",
"shard": "3"
}
],
"type": "version_conflict_engine_exception",
"reason": "[blog][1]: version conflict, current [2], provided [1]",
"index": "website",
"shard": "3"
},
"status": 409
}
外部版本号
PUT /website/blog/2?version=5&version_type=external
{
"title": "My first external blog entry",
"text": "Starting to get the hang of this..."
}
返回
{
"_index": "website",
"_type": "blog",
"_id": "2",
"_version": 5,
"created": true
}
四、路由机制
当索引 一个文档时文档会被存储到 master 节点上的一个主分片上。那么 Elasticsearch 是如何知道文档属于哪个分片的呢?再有当你创建一个新文档,Elasticsearch 是如何知道应该存储在分片 1还是分片 2 上?要想回答这些问题,就需要了解Elasticsearch 的路由机制。 Elasticsearch 的路由机制即是通过哈希算法,将具有相同哈希值的文档放置到同一个主分片中,分片位置计算方法:
shard= hash(routing)% number_of_primary_shards
routing 值可以是一个任意字符串, Elasticsearch 默认将文档的 id 值作为 routing 值,通过哈希函数根据 routing 字符串生成一个数字,然后除以主切片的数量得到一个余数( remainder )。这个数字就是特定文档所在的分片。这种算法基本上会保持所有数据在所有分片上的一个平均分布,而不会造成数据分配不均衡的情况。
也可以自定义 routing 值。默认的路由模式可以保证数据平均分布,文档分配算法对我们来说是透明的,很多时候性能也不是问题。自定义 routing 值在深入理解数据特征之后, 能够带来很多使用上的方便和性能上的提升。
自定义routing 值的好处:
默认情况下, Elasticsearch 使用文档的 id 将文档平均分布于所有的分片上,这导致了Elasticsearch 不能确定文档的位置,所以它必须将这个请求广播到所有的分片上去执行。主分片的数量在索引创建的时候是固定的,并且永远不能改变。因为如果分片的数量改变了,所有先前的路由值就会变成非法,文档相当于丢失了。使用自定义的路由模式,可以便查询更具目的性。你不必盲目地去广播查询请求,而是要告诉Elasticsearch 你的数据在哪个分片上。
PUT /website/blog/l?routing=user123
{
"title":"My first blog entry",
"text":"Just trying this out ..."
}
GET /myblog/search?routing=user123
如果想要查询 user123发布了哪些博客,可 以通过 routing值进行过滤,这样可以避免 Elasticsearch 向所有分片都发送查询请求,大大减少系统的资源。
需要注意的是,可以为文档指定多个路由值,路由值之间使用逗号隔开。
使用自定义 routing 值也会造成一些潜在的问题,比如 user123 本身的文档就非常多,有数十万个,而其他大多数的用户只有几个文档,这样的话就会导致 user123 所在的分片较大,出现数据偏移的情况,特别是多个这样的用户处于同一分片的时候现象会更明显。具体的使用还是要结合实际的应用场景来选择。
五、映射详解
1、映射介绍
映射也就是 Mapping,用来定义一个文档以及其所包含的字段如何被存储和索引,可以在映射中事先定义字段的数据类型、分词器等属性。类似关系数据库定义表结构和字段类型。
映射可分为动态映射和静态映射。在关系型数据库中写入数据之前首先要建表,在建表语句中声明宇段的属性,在 Elasticsearch 中则不必如此, Elasticsearch 最重要的功能之一就是让你尽可能快地开始探索数据,文档写入 Elasticsearch 中,它会根据字段的类型自动识别,这种机制称为动态映射,而静态映射则是写入数据之前对宇段的属性进行手工设置。
PUT http://127.0.0.1:9200/books
http://127.0.0.1:9200/books/_mapping
返回
{
"books": {
"mappings": {}
}
}
添加文档之后
PUT books/it/1
{
"id":1,
"publish_date":"2017-06-01",
"name":"master Elasticsearch"
}
http://127.0.0.1:9200/books/_mapping
返回
{
"books": {
"mappings": {
"it": {
"properties": {
"id": {
"type": "long"
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"publish_date": {
"type": "date"
}
}
}
}
}
}
说明es自动识别了文档中字段的类型。
使用动态 Mapping 要结合实际业务需求来综合考虑 ,如果将 Elasticsearch 当作主要的数据存储使用,井且希望出现未知宇段时抛出异常来提醒你注意这一问题,那么开启动态 Mapping并不适用。在 Mapping 中可以通过 dynamic 设置来控制是否自动新增宇段。
PUT http://127.0.0.1:9200/books
{
"mappings": {
"it": {
"dynamic":"strict", //dynamic有true默认自动添加字段,false不添加,strict严格模式 发现新字段抛异常
"properties": {
"id": {
"type": "long"
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"publish_date": {
"type": "date"
}
}
}
}
}
如果
PUT books/it/1
{
"id":1,
"publish_date":"2017-06-01",
"name":"master Elasticsearch",
"author":"psk"
}
则会抛出strict_dynamic_mapping_exception 异常
2、元字段
元字段是映射中描述文档本身的字段,从大的分类上来看,主要有文档属性的元字段、源文档的元字段、索引的元字段、路由的元字段和自定义元字段。
元字段.png
GET _index1,_index2/_search
{
"query": {
"terms": {
"_index": [
"_indexl",
"_index2"
]
}
},
"aggs": {
"indices": {
"terms": {
"field": "_index",
"size": 10
}
}
},
"sort": [
{
"_index": {
"order": "asc"
}
}
]
}