ElasticTalk

聊聊 Elasticsearch 中的任务管理机制

2022-12-10  本文已影响0人  rockybean

Elasticsearch 对外提供了一个 _tasks 接口,用于获取当前各个节点正在执行的任务,这里要避免和 pending_tasks 搞混,后者是用于获取在 master leader 节点排队等待修改 cluster state 的处理任务。

WHY

Task 管理功能不是一开始就有的,是从 5.0 开始推出并不断完善的,如下是相关的 issue 链接。

摘取上面的部分描述如下:

We have identified several potential features of elasticsearch that can spawn long running tasks and therefore require a common management mechanism for this tasks.

This issue will introduce task management API that will provide a mechanism for communicating with and controlling currently running tasks.

The task management API will be based on the top of existing TransportAction framework, which will allow any transport action to become a task.

The tasks will maintain parent/child relationship between tasks running on the coordinating nodes and subtasks that are spawn by the coordinating node on other nodes.

上面这段话把引入 Task 的原因、作用都讲解的很清楚了,简单总结如下:

看到这里,相信大家已经对于 Task 管理的来历以及要实现的功能有了大致的理解,最后再补充一句。

Task 管理功能也极大提升了 Elasticsearch 系统的可观测性,对于各个节点当前在执行的任务有了统一观测的手段,不用再猜了。

接下来我们就看看什么是 Task。

WHAT

Task 通常包含如下信息:

Task 的相关接口主要是下面两个:

下面是通过 GET _tasks?detailed=true 获取的一个样例数据:

{
  "-Tws1PJEQ4WW_GSsRLTSLg:35061634": {
    "node": "-Tws1PJEQ4WW_GSsRLTSLg",
    "id": 35061634,
    "type": "transport",
    "action": "indices:data/write/bulk[s]",
    "status": {
      "phase": "waiting_on_primary"
    },
    "description": "requests[1], index[.monitoring-es-7-2022.12.11][0]",
    "start_time_in_millis": 1670740946462,
    "running_time_in_nanos": 359046,
    "cancellable": false,
    "parent_task_id": "_Z3jXvvvQnqR6pye8zwZtg:69312208",
    "headers": {}
  }
}

其他字段大家可以自行解释,这里不再展开讲解。

通过这一小节,我们已经知道了 Task 的组成,那么接下来我们看看 Task 管理是如何实现的。

HOW

TaskManager 是核心管理类,它提供两个核心方法供外部调用。

TaskManager 内部使用一个 Map<Long,Task> 的结构维护当前 Node 的所有 task,即 task node idtask 的映射管理。

Task 创建的时机主要是两个地方:

client rest 请求的处理流程大概如下:

image
  1. client 发起 http 请求(e.g. GET _tasks),被路由到相关的 RestAction 中,比如 RestListTasksAction
  2. RestAction 对请求做验证和处理后,会转换成对应的 transport action request 发送出去,转交本地对应的 TransportAction 处理,比如 TransportListTasksAction
  3. TransportAction 在执行任务之前会通过 TaskManager 注册一个 task
  4. TransportAction 处理相关的请求
  5. 在第 4 步处理结束后通过 TaskManger 注销之前注册的 task
  6. 一路返回给 client 相关结果

TransportAction 中的代码如下,注 1 为 register,注 2 为 unregister。

image

Node 与 Node 之间请求的处理流程大致如下:

image
  1. Node1 通过 TransportService 的 sendReqeust 方法向 Node2 发送请求,这其中使用的 actionName 是相关 TransportAction 中定义的 transportNodeAction,比如 TransportListTasksAction 发送的 action name 是 cluster:monitor/tasks/lists[n]
  2. Node2 接收到请求,在 InboudHandler 中,通过 action name 从 ReqeustHandlers 中获取对应的 request handler,然后进行处理
  3. 在 RequestHandler 的 processMessageReceived 方法中,会通过 TaskManager 注册一个 task
  4. RequestHandler 处理相关的请求
  5. 在第 4 步处理结束后通过 TaskManger 注销之前注册的 task
  6. 一路返回给 Node1 相关结果

RequestHandlerRegistry 中的代码如下,注 1 为 register,注 2 为 unregister。

image

关于 register 和 unregister 的实现逻辑,这里就不展开讲了,感兴趣的同学可以自行去查看相关代码。

其他

持久化结果:.tasks 索引

细心的同学可能会发现在 elasticsearch 中有一个系统索引 .tasks,如果去查询这个索引的内容,会得到类似如下的文档内容。

GET .tasks/_search
{
        "_index" : ".tasks",
        "_type" : "task",
        "_id" : "9m1T5Qx6RnaRXER7Z:1715505780",
        "_score" : 2.8134105,
        "_source" : {
          "completed" : true,
          "task" : {
            "node" : "9m1T5Qx6RnaRXER7Z",
            "id" : 1715505780,
            "type" : "transport",
            "action" : "indices:data/write/reindex",
            "status" : {
              "total" : 34556,
              "updated" : 0,
              "created" : 34556,
              "deleted" : 0,
              "batches" : 35,
              "version_conflicts" : 0,
              "noops" : 0,
              "retries" : {
                "bulk" : 0,
                "search" : 0
              },
              "throttled_millis" : 0,
              "requests_per_second" : -1.0,
              "throttled_until_millis" : 0
            },
            "description" : "reindex from [.indexA] to [.indexA_reindex][_doc]",
            "start_time_in_millis" : 1657627161222,
            "running_time_in_nanos" : 7259281625,
            "cancellable" : true,
            "headers" : { }
          },
          "response" : {
            "took" : 7239,
            "timed_out" : false,
            "total" : 34556,
            "updated" : 0,
            "created" : 34556,
            "deleted" : 0,
            "batches" : 35,
            "version_conflicts" : 0,
            "noops" : 0,
            "retries" : {
              "bulk" : 0,
              "search" : 0
            },
            "throttled" : "0s",
            "throttled_millis" : 0,
            "requests_per_second" : -1.0,
            "throttled_until" : "0s",
            "throttled_until_millis" : 0,
            "failures" : [ ]
          }
        }
      }

这个返回的文档记录了一个 reindex task 的详情和结果。

需要注意的是,并非所有的 task 都可以持久化结果到 .tasks 索引中,这只支持某些 long running task ,如下:

在发起相关请求时,只要加上一个参数 wait_for_completion=true,请求会返回一个 task id,然后该 task 的结果会被记录到 .tasks 索引中。如果不加该参数,则不会记录。

另外 .tasks 索引是按需创建的,只有在需要记录结果时才会创建该索引,如果你的 cluster 里面没有,也没有什么问题。

取消任务 Cancel Task

部分 Task 在执行过程中可以被取消(Cancel),相关接口为 POST _tasks/[task_id/_cancel 。但不是所有 Task 都可以被取消,只有 Cancellable 为 true 的才可以。

可以取消的任务主要是一些 long running 的task,比如 reindex、update by query、delete by query、search 等,它们的 task 都继承了 CancellableTask

另外 ES 还引入了自动 cancel search 任务的机制,如下是相关 issue:

当 ES 发现 client 主动断开连接时,会主动 cancel 当前正在执行的 search 任务,以便减轻集群负载。

Persistent Task

Persistent Task 是一类比较特殊的任务,一般的 Task 在 Node 停止或者 Crash 后就结束了,即便 Node 重启也无法继续之前在执行的 Task,但是 Persistent Task 通过将自身持久化到 Cluster State 中,即便相关 Node 停止,它依然可以被重新分配到其他 Node 上继续运行。

这部分使用的不多,主要是 x-pack 增加的一些如 ml、rollup、transform 等功能在用,了解下即可。

总结

本文主要讲解了 Elasticsearch 中 Task 的来历、组成和实现,希望能对大家有所帮助,以后可以正确的使用 Task Management API 来解决使用中的问题。

引用

上一篇 下一篇

猜你喜欢

热点阅读