Elasticsearch 5.x 源码分析(8)用plugin
最近项目上的需要,要在某些场景下拦截ES的Request 和Response,进而把ES的整个plugin 的机制原理分析了一遍。但是并不等于说会写plugin就可以为所欲为,实操下来还是发现不少问题,这篇文章主要还是围绕一个目的:如何通过写plugin来想办法拦截ES的Request 和Response 的思路来写。
这篇文章不是详细介绍如何写一个plugin的,如果有这个需求我建议阅读下面任何一篇文章就可以了
Creating a Plugin for Elasticsearch 5.0 Using Maven
这里简单几句话说说Plugin,在Elasticsearch 5.x 针对不同的需求,如果想对某个模块做扩展性开发和修改,可以通过实现这个模块开放出来的Plugin 接口,然后写好完整的逻辑并打包上传,重启ES 就可以加载你写的Plugin,可实现的Plugin 都在org.elasticsearch.plugin
包下
从这个图上看,首先猜测,如果我们要写Plugin来拦截Request 和Response,那么无非就是实现ActionPlugin 和SearchPlugin,那下面一步步来。
用ActionPlugin 实现
首先回顾一下ES整个端到端的调用,这里都是以Rest 请求为例,因为Transport Client调用方式就简单很多了,直接在 client.search()的时候塞一个回调函数就可以了。(下面每个类的含义不懂的可以回看之前的文章Elasticsearch 5.x 源码分析(6)Request 和Response 在ES 中的传输和解析
)
图画的差别介意,再结合一下流程图来看
ES调用流程图OK,先看ActionPlugin接口描述和结合上面两幅图对比,我们可以得到下面三个总结:
- 可以通过实现Plugin的RestHandlerWrapper (待会会解释这个是什么东西)来拦截RestAction的调用,也就是说Request 应该是可以拦截得到的
- 可以通过实现getActionFilters() 来拦截具体的Action的调用,那么在这一层应该可以拦截所有的内部调用(理解一下一次的请求可能会由非常多次的内部调用完成)
- 看看上图最后一条线,意思就是,别指望在这些wrapper或者filters里面可以抓取到Response,因为里面全部都是异步调用,所以,你能做的就是想办法在Wrapper 或者Filters 那里去添加一个onResponse() 回调来实现了
RestHandlerWrapper
那什么是RestHandlerWrapper 呢,在Elasticsearch 5.2之前,其实你是可以在Plugin 里注册一个或多个RestFilters 的,就像这样
在Plugin里注册一个RestFilter据ES开发团队的说法就是,这种方式太危险,它是可以控制调用链的(在里面获取到
restFilterChain
也就是可以得到别人的plugin的filter),因此从5.2 开始他们把这个机制改了,整个ES全局只允许有一个RestHandlerWrapper 类,并且里面只有一个方法,是获取不到RestFilterChain的。
想了解更多这个需求可以链进去
Plugins: Replace Rest filters with RestHandler wrapper #21905
RestFilters are a complex way of allowing plugins to add extra code
before rest actions are executed. This change removes rest filters, and
replaces with a wrapper which a single plugin may provide.
下面是我的debug 代码
@Override
public UnaryOperator<RestHandler> getRestHandlerWrapper(ThreadContext threadContext) {
return (RestHandler r) ->{
threadContext.putHeader("request_time", System.currentTimeMillis() + "");
threadContext.putHeader("response_time", System.currentTimeMillis() + "");
LOGGER.error(Thread.currentThread().getId() + "===getRestHandlerWrapper ====" + "request" + System.currentTimeMillis() + "====" + r.toString());
//LOGGER.error(r.getClass().getClassLoader().toString());
//LOGGER.error(PallasPlugin.class.getClassLoader().toString());
String className = r.getClass().getName();
if (className.contains(".RestSearchAction") || className.contains(".RestSearchTemplateAction")) {
return new MyRestHandler(r);
} else {
return r;
}
};
}
public static class MyRestHandler implements RestHandler {
private RestHandler r;
public MyRestHandler(RestHandler r) {
this.r = r;
}
@Override
public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
//request.params().put("UUID", UUID.randomUUID().toString());
LOGGER.error(request.params());
LOGGER.error(request.getHeaders());
r.handleRequest(request, channel, client);
}
}
虽然调试信息很简单,但是这里已经可以取得
结论一
- threadContext put的那些Header 在Response是拿不到的
- request是不允许乱塞params 的,想插个自定义属性,直接报错
- handleRequest()是一个void方法,所以想非入侵地去给Response 添加回调,很难很难
- 从上面一点推导来讲就是,这里可以拦截到RestRequest,但是拦截不到RestResponse
- 细心的人留意到上面为什么我要
r.getClass()
没有而不是直接instance of
没,这里最最重要的是,Plugin是独立ClassLoader 来加载的,在Plugin里是读取不到其他Plugin/Modules 的类的,只能读core包里面的类
ActionFilters
再看看我的测试代码
public static class MyActionFilter implements ActionFilter {
@Inject
public MyActionFilter() {
}
@Override
public int order() {
return 0;
}
@Override
public <Request extends ActionRequest, Response extends ActionResponse> void apply(Task task, String action, Request request, ActionListener<Response> listener, ActionFilterChain<Request, Response> chain) {
if(action.equals("indices:data/read/search") || action.equals("indices:data/read/search/template")) {
LOGGER.error(Thread.currentThread().getId() + "==" + action + "===" + request.toString() + ":" + listener.toString());
LOGGER.error("===replacing Action Lisener====");
LOGGER.error(task.getId() + "," + task.getParentTaskId().getId() + "," + task.getStatus() + "," + task.getType());
LOGGER.error(request.remoteAddress());
chain.proceed(task, action, request, new MyActionListener(listener,task));
} else {
LOGGER.error("===NOT replacing Action Lisener====");
chain.proceed(task, action, request, listener);
}
}
}
public static class MyActionListener implements ActionListener {
private ActionListener listener;
private Task task;
public MyActionListener(ActionListener listener, Task task) {
this.listener = listener;
this.task = task;
}
@Override
public void onResponse(Object o) {
LOGGER.error(Thread.currentThread().getId() + "==onResponse()===" + o.toString());
LOGGER.error(task.getStartTime());
LOGGER.error(System.currentTimeMillis());
listener.onResponse(o);
}
@Override
public void onFailure(Exception e) {
LOGGER.error(Thread.currentThread().getId() + "==onError()===");
listener.onFailure(e);
}
}
看到onRespone() 是否异常兴奋,是的,ActionFilters 就nice 很多,request 、Response都可以给你,通过这个测试得到了
结论二
- ActionFilters确实可以拦截所有的内部Action调用,不管你是用RestClient 过来还是TransportClient 过来。
- 这里拦截的是SearchRequest 和SearchResponse, 而不是RestRequest 和RestResponse
- 在chain.proceed() 包装一个listener,就可以在onResponse() 里做逻辑了,而不影响它本身逻辑
这里有一点尤其需要注意,刚刚说了,这里是拦截所有调用,也就是说,就算你是一个SearchTemplateAction调用,其实也可能进来了多次,因为在SearchTemplateAction内部还会发起多个并发的SearchAction请求,一个是给本机,一个是给其他nodes,而其他nodes也会发送多个请求过来,同样会被拦截,这样对于有目的地去监听某一个具体的request和Response 是不利的。
这里我想到两个办法可以解决这个问题:
- 因为这个调用链里ActionFilter的重复进入其实是同一个线程完成的,因此可以通过ThreadLocal来实现信息的保存,比如首先进入时保存个信息,那再次进入就可以识别了
- 更加暴力点的做法是直接再封装一个SearchRequest来记录信息
用SearchPlugin 实现
到这里我们知道用ActionPlugin的方式是勉强可以实现的,虽然道路比较曲折,那么SearchPlugin 是否也可以实现呢?从这个Plugin中我第一眼就瞄中了List<BiConsumer<SearchRequest, SearchResponse>> getSearchResponseListeners()
这个方法,结果一试,无果,百思不得其解,再次点进去源码想看看它如何调用,结果一点,真想骂人!
这个接口是完全没有地方用到,所以实现了也没用,那么
总结就是SearchPlugin是实现不了拦截Request或者Response的。
好了,我的调研就基本到这里了,最后给多篇扩展阅读
Search Plugin to intercept search response
How to intercept search request (in a pluign) without creating separate RestAction