Elasticsearch 数据写入过程
1、ES 客户端选择一个node节点发送写入请求,这个节点就是协调节点coordinator node,ES的任意节点都可以作为协调节点。协调节点处理过程如下:
1.1 处理injest pipeline
查看本次请求是否符合某个pipeline的模式匹配,符合则执行pipeline中的逻辑,进行预处理:格式变换、新增字段、设置某个字段值、字段默认值处理等等。如果当前节点没有injest角色,则需要将请求转发给有injest角色的节点。
1.2 创建索引
如果开启了自动创建索引,则索引不存在自动创建,否则报错。
1.3 routing获取
获取请求url或者mapping配置中的_routing信息,如果没有则使用_id,一般业务中不手动维护_id,使用ES自动生成的全局唯一ID。
1.4 构建BulkShardRequest
Bulk Request中包含多种(Index/Update/Delete)请求,这些请求分别需要到不同的shard上执行,因此协调节点,会将请求按照shard分开,同一个shard上的请求聚合到一起,构建BulkShardRequest。
1.5 路由请求到primary shard
通过_routing字段按照公式 shard_num = hash(\routing) % num_primary_shards 计算出文档要分配到的分片,并从集群元数据中找出对应primary shard,将请求转发给primary shard。
1.6 等待primary shard处理返回
image.png2、协调节点通过_routing字段进行路由,找到对应的primary shard,并将请求转发给primary shard。primary shard处理过程如下:
2.1 判断操作类型
遍历bulk请求中的子请求,根据类型走不同处理逻辑分支。
2.2 update操作转换
将update转换成index和delete操作,获取文档的当前内容,与update内容合并生成新文档,然后将update请求转换成index请求,此处文档设置一个version v1。
2.3 解析文档
解析文档的各个字段,添加_uid等ES相关的系统字段。
2.4 更新Mapping
对于新增字段会根据dynamic mapping或dynamic template生成对应的mapping,如果mapping中有dynamic mapping相关设置则按设置处理,如忽略或抛出异常。
2.5 获取sequence Id和Version
从SequcenceNumberService获取一个sequenceID和Version。SequcenID用于初始化LocalCheckPoint, verion是根据当前Versoin+1用于防止并发写导致数据不一致。
2.6 写入lucene
这一步开始会对文档uid加锁,然后判断uid对应的version v2和之前update转换时的versoin v1是否一致,不一致则返回第二步重新执行。 如果version一致,如果同id的doc已经存在,则调用lucene的updateDocument接口,如果是新文档则调用lucene的addDoucument. 这里有个问题,如何保证Delete-Then-Add的原子性,ES是通过在Delete之前会加上已refresh锁,禁止被refresh,只有等待Add完成后释放了Refresh Lock, 这样就保证了这个操作的原子性。
2.7 写入translog
写入Lucene的Segment后,会以key value的形式写Translog, Key是Id, Value是Doc的内容。当查询的时候,如果请求的是GetDocById则可以直接根据_id从translog中获取。满足nosql场景的实时性。
2.8 重构bulk request
因为primary shard已经将update操作转换为index操作或delete操作,因此要对之前的bulkrequest进行调整,只包含index或delete操作,不需要再进行update的处理操作。
2.9 flush translog
默认情况下,translog要在此处落盘完成,如果对可靠性要求不高,可以设置translog异步,那么translog的fsync将会异步执行,但是落盘前的数据有丢失风险。
2.10 发送请求给replicas
将构造好的bulkrequest并发发送给各replicas,等待replica返回,这里需要等待所有的replicas返回,响应请求给协调节点。如果某个shard执行失败,则primary会给master发请求remove该shard。这里会同时把sequenceID, primaryTerm, GlobalCheckPoint等传递给replica。
2.11 等待replica响应
当所有的replica返回请求时,更细primary shard的LocalCheckPoint。
3、primary shard完成写入后,将写入并发发送给各replica, raplica执行写入操作后返回给primary shard, primary shard再将请求返回给协调节点。副本分片的执行流程如下(与主分片基本一致):
3.1 判断操作类型
replica收到的写如请求只会有add和delete,因update在primary shard上已经转换为add或delete了。根据不同的操作类型执行对应的操作
3.2 Parse Doc
3.3 更新mapping
3.4 获取sequenceId和Version 直接使用primary shard发送过来的请求中的内容即可
3.5 写入lucene
3.6 写Translog