elasticsearchElasticsearch分布式搜索引擎说说Elasticsearch那点事

Elasticsearch 5.x 源码分析(8)用plugin

2017-08-04  本文已影响654人  华安火车迷

最近项目上的需要,要在某些场景下拦截ES的Request 和Response,进而把ES的整个plugin 的机制原理分析了一遍。但是并不等于说会写plugin就可以为所欲为,实操下来还是发现不少问题,这篇文章主要还是围绕一个目的:如何通过写plugin来想办法拦截ES的Request 和Response 的思路来写。


这篇文章不是详细介绍如何写一个plugin的,如果有这个需求我建议阅读下面任何一篇文章就可以了

Creating a Plugin for Elasticsearch 5.0 Using Maven

Adding a New REST Endpoint to Elasticsearch

elasticsearch源码分析之plugin的开发

这里简单几句话说说Plugin,在Elasticsearch 5.x 针对不同的需求,如果想对某个模块做扩展性开发和修改,可以通过实现这个模块开放出来的Plugin 接口,然后写好完整的逻辑并打包上传,重启ES 就可以加载你写的Plugin,可实现的Plugin 都在org.elasticsearch.plugin包下

Elasticsearch 5 可以实现的plugin的接口

从这个图上看,首先猜测,如果我们要写Plugin来拦截Request 和Response,那么无非就是实现ActionPlugin 和SearchPlugin,那下面一步步来。


用ActionPlugin 实现

首先回顾一下ES整个端到端的调用,这里都是以Rest 请求为例,因为Transport Client调用方式就简单很多了,直接在 client.search()的时候塞一个回调函数就可以了。(下面每个类的含义不懂的可以回看之前的文章Elasticsearch 5.x 源码分析(6)Request 和Response 在ES 中的传输和解析

ES查询端到端示意图

图画的差别介意,再结合一下流程图来看

ES调用流程图

OK,先看ActionPlugin接口描述和结合上面两幅图对比,我们可以得到下面三个总结:

ActionPlugin

RestHandlerWrapper

那什么是RestHandlerWrapper 呢,在Elasticsearch 5.2之前,其实你是可以在Plugin 里注册一个或多个RestFilters 的,就像这样

在Plugin里注册一个RestFilter
据ES开发团队的说法就是,这种方式太危险,它是可以控制调用链的(在里面获取到restFilterChain 也就是可以得到别人的plugin的filter),因此从5.2 开始他们把这个机制改了,整个ES全局只允许有一个RestHandlerWrapper 类,并且里面只有一个方法,是获取不到RestFilterChain的。

想了解更多这个需求可以链进去
Plugins: Replace Rest filters with RestHandler wrapper #21905
RestFilters are a complex way of allowing plugins to add extra code
before rest actions are executed. This change removes rest filters, and
replaces with a wrapper which a single plugin may provide.

下面是我的debug 代码

@Override
    public UnaryOperator<RestHandler> getRestHandlerWrapper(ThreadContext threadContext) {
        return (RestHandler r) ->{
            threadContext.putHeader("request_time", System.currentTimeMillis() + "");
            threadContext.putHeader("response_time", System.currentTimeMillis() + "");
            LOGGER.error(Thread.currentThread().getId() + "===getRestHandlerWrapper ====" + "request" + System.currentTimeMillis() + "====" + r.toString());
            //LOGGER.error(r.getClass().getClassLoader().toString());
            //LOGGER.error(PallasPlugin.class.getClassLoader().toString());
            String className = r.getClass().getName();
            if (className.contains(".RestSearchAction") || className.contains(".RestSearchTemplateAction")) {
                return new MyRestHandler(r);
            } else {
                return r;
            }
        };
    }

public static class MyRestHandler implements RestHandler {

        private RestHandler r;

        public MyRestHandler(RestHandler r) {
            this.r = r;
        }

        @Override
        public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
            //request.params().put("UUID", UUID.randomUUID().toString());
            LOGGER.error(request.params());
            LOGGER.error(request.getHeaders());
            r.handleRequest(request, channel, client);
        }
    }

虽然调试信息很简单,但是这里已经可以取得

结论一


ActionFilters

再看看我的测试代码

public static class MyActionFilter implements ActionFilter {

        @Inject
        public MyActionFilter() {
        }


        @Override
        public int order() {
            return 0;
        }

        @Override
        public <Request extends ActionRequest, Response extends ActionResponse> void apply(Task task, String action, Request request, ActionListener<Response> listener, ActionFilterChain<Request, Response> chain) {

            if(action.equals("indices:data/read/search") || action.equals("indices:data/read/search/template")) {

                LOGGER.error(Thread.currentThread().getId() + "==" + action + "===" + request.toString() + ":" + listener.toString());
                LOGGER.error("===replacing Action Lisener====");
                LOGGER.error(task.getId() + "," + task.getParentTaskId().getId() + "," + task.getStatus() + "," + task.getType());
                LOGGER.error(request.remoteAddress());

                chain.proceed(task, action, request, new MyActionListener(listener,task));
            } else {
                LOGGER.error("===NOT replacing Action Lisener====");
                chain.proceed(task, action, request, listener);
            }

        }
    }

    public static class MyActionListener implements ActionListener {

        private ActionListener listener;

        private Task task;

        public MyActionListener(ActionListener listener, Task task) {
            this.listener = listener;
            this.task = task;
        }

        @Override
        public void onResponse(Object o) {
            LOGGER.error(Thread.currentThread().getId() + "==onResponse()===" + o.toString());
            LOGGER.error(task.getStartTime());
            LOGGER.error(System.currentTimeMillis());
            listener.onResponse(o);
        }

        @Override
        public void onFailure(Exception e) {
            LOGGER.error(Thread.currentThread().getId() + "==onError()===");
            listener.onFailure(e);
        }
    }

看到onRespone() 是否异常兴奋,是的,ActionFilters 就nice 很多,request 、Response都可以给你,通过这个测试得到了

结论二

这里有一点尤其需要注意,刚刚说了,这里是拦截所有调用,也就是说,就算你是一个SearchTemplateAction调用,其实也可能进来了多次,因为在SearchTemplateAction内部还会发起多个并发的SearchAction请求,一个是给本机,一个是给其他nodes,而其他nodes也会发送多个请求过来,同样会被拦截,这样对于有目的地去监听某一个具体的request和Response 是不利的。
这里我想到两个办法可以解决这个问题:

  1. 因为这个调用链里ActionFilter的重复进入其实是同一个线程完成的,因此可以通过ThreadLocal来实现信息的保存,比如首先进入时保存个信息,那再次进入就可以识别了
  2. 更加暴力点的做法是直接再封装一个SearchRequest来记录信息

用SearchPlugin 实现

到这里我们知道用ActionPlugin的方式是勉强可以实现的,虽然道路比较曲折,那么SearchPlugin 是否也可以实现呢?从这个Plugin中我第一眼就瞄中了List<BiConsumer<SearchRequest, SearchResponse>> getSearchResponseListeners()这个方法,结果一试,无果,百思不得其解,再次点进去源码想看看它如何调用,结果一点,真想骂人!

SearchPlugin接口

这个接口是完全没有地方用到,所以实现了也没用,那么

总结就是SearchPlugin是实现不了拦截Request或者Response的。

好了,我的调研就基本到这里了,最后给多篇扩展阅读

Search Plugin to intercept search response

How to intercept search request (in a pluign) without creating separate RestAction

上一篇下一篇

猜你喜欢

热点阅读