spring data elasticsearch 使用及源码分
最近一直在用elasticsearch做一些数据的存储和查询 ,开始为了方便就直接使用了spring data elasticsearch ,期间也经历了很多坑,比如findByName(String name)
这种接口默认只返回10条数据,自己也对源码做个跟踪,也研究了一下为什么elasticsearch要这样做。接下来我们通过分析spring data es的两个接口,来看一下是如果实现对es进行操作的。
1.findByName接口分析
- spring data es repository 接口
public interface UserRepository extends ElasticsearchRepository<User,String> {
/**
* 默认只返回10条
*/
List<User> findByName(String name);
List<User> deleteByName(String name);
}
- spring 启动分析构造UserRepository代理接口
启动的时候ElasticsearchRepositoryFactoryBean
的方法会进行对应代理类的生成,并注入到spring容器
@Override
public void afterPropertiesSet() {
super.afterPropertiesSet();
Assert.notNull(operations, "ElasticsearchOperations must be configured!");
}
这里的super指是RepositoryFactoryBeanSupport
,spring data jpa启动整体代理接口注入到spring容器中也是通过这个类做的,其中spring data jpa 对应的类是JpaRepositoryFactoryBean
。
#其中RepositoryFactoryBeanSupport类中的下面这段代码比较核心
public void afterPropertiesSet() {
#getRepositoryMetadata是获取这个接口的元数据,就是这个接口的一些基本的属
this.repositoryMetadata = this.factory.getRepositoryMetadata(repositoryInterface);
#这个方法是生成这个接口的实现类
this.repository = Lazy.of(() -> this.factory.getRepository(repositoryInterface, repositoryFragmentsToUse));
#如果不是懒加载立即初始化
if (!lazyInit) {
this.repository.get();
}
}
接着我们分析上面的核心代码getRepository
这个方法
public <T> T getRepository(Class<T> repositoryInterface, RepositoryFragments fragments) {
Assert.notNull(repositoryInterface, "Repository interface must not be null!");
Assert.notNull(fragments, "RepositoryFragments must not be null!");
RepositoryMetadata metadata = getRepositoryMetadata(repositoryInterface);
RepositoryComposition composition = getRepositoryComposition(metadata, fragments);
RepositoryInformation information = getRepositoryInformation(metadata, composition);
validate(information, composition);
//这里是构造一个对象,然后填充一些这个接口对应的entity信息,比如索引名,类型名等
Object target = getTargetRepository(information);
// Create proxy 接下来通过代理工厂创建代理
ProxyFactory result = new ProxyFactory();
result.setTarget(target);
result.setInterfaces(repositoryInterface, Repository.class, TransactionalProxy.class);
result.addAdvice(SurroundingTransactionDetectorMethodInterceptor.INSTANCE);
result.addAdvisor(ExposeInvocationInterceptor.ADVISOR);
postProcessors.forEach(processor -> processor.postProcess(result, information));
result.addAdvice(new DefaultMethodInvokingMethodInterceptor());
//这个DefaultMethodInvokingMethodInterceptor
//会为所有我们在repository接口中自定义的方法加上切面
result.addAdvice(new QueryExecutorMethodInterceptor(information));
composition = composition.append(RepositoryFragment.implemented(target));
result.addAdvice(new ImplementationMethodExecutionInterceptor(composition));
//生产代理类返回注入到spring容器
return (T) result.getProxy(classLoader);
}
接下来我们看看QueryExecutorMethodInterceptor
类,主要是resolveQuery
这个方法
this.queries = lookupStrategy.map(it -> {
SpelAwareProxyProjectionFactory factory = new SpelAwareProxyProjectionFactory();
factory.setBeanClassLoader(classLoader);
factory.setBeanFactory(beanFactory);
return repositoryInformation.getQueryMethods().stream()//
.map(method -> Pair.of(method, it.resolveQuery(method, repositoryInformation, factory, namedQueries)))//
.peek(pair -> invokeListeners(pair.getSecond()))//
.collect(Pair.toMap());
}).orElse(Collections.emptyMap());
resolveQuery
中会为每一个方法创建一个ElasticsearchPartQuery
,其中ElasticsearchPartQuery
是自定义操作实现的核心类,我们看一下代码
//构建类的时候会创建一个PartTree主要用来描述这个方法是什么方法,有没有分页等到
private final PartTree tree;
private final MappingContext<?, ElasticsearchPersistentProperty> mappingContext;
public ElasticsearchPartQuery(ElasticsearchQueryMethod method, ElasticsearchOperations elasticsearchOperations) {
super(method, elasticsearchOperations);
this.tree = new PartTree(method.getName(), method.getEntityInformation().getJavaType());
this.mappingContext = elasticsearchOperations.getElasticsearchConverter().getMappingContext();
}
//这个是在方法运行时调用时通过切面到这里然后转换成elasticsearch的查询接口
@Override
public Object execute(Object[] parameters) {
ParametersParameterAccessor accessor = new ParametersParameterAccessor(queryMethod.getParameters(), parameters);
CriteriaQuery query = createQuery(accessor);
if(tree.isDelete()) {
Object result = countOrGetDocumentsForDelete(query, accessor);
elasticsearchOperations.delete(query, queryMethod.getEntityInformation().getJavaType());
return result;
} else if (queryMethod.isPageQuery()) {
query.setPageable(accessor.getPageable());
return elasticsearchOperations.queryForPage(query, queryMethod.getEntityInformation().getJavaType());
} else if (queryMethod.isStreamQuery()) {
Class<?> entityType = queryMethod.getEntityInformation().getJavaType();
if (query.getPageable().isUnpaged()) {
int itemCount = (int) elasticsearchOperations.count(query, queryMethod.getEntityInformation().getJavaType());
query.setPageable(PageRequest.of(0, Math.max(1, itemCount)));
}
return StreamUtils.createStreamFromIterator((CloseableIterator<Object>) elasticsearchOperations.stream(query, entityType));
} else if (queryMethod.isCollectionQuery()) {
if (accessor.getPageable() == null) {
int itemCount = (int) elasticsearchOperations.count(query, queryMethod.getEntityInformation().getJavaType());
query.setPageable(PageRequest.of(0, Math.max(1, itemCount)));
} else {
query.setPageable(accessor.getPageable());
}
return elasticsearchOperations.queryForList(query, queryMethod.getEntityInformation().getJavaType());
} else if (tree.isCountProjection()) {
return elasticsearchOperations.count(query, queryMethod.getEntityInformation().getJavaType());
}
return elasticsearchOperations.queryForObject(query, queryMethod.getEntityInformation().getJavaType());
}
可以看到上面那个execute方法中主要有个ElasticsearchOperations类,负责调用es的方法,因为我们的List<User> findByName(String name)
接口返回的是一个List,如果我们没有设置分页接口,这里会填充Unpaged.INSTANCE
也就是表示客户端没有设置分页,es服务端会有默认填充。
然后逻辑走到了
elasticsearchOperations.queryForList
这个方法,这个方法调用的是queryForPage
方法。这样整个流程我们就走通了,我们也看到了为什么我们写的es repository接口默认返回的是10条数据,所以如果想用这个接口去做大批量数据查询的话是会出现问题的,需要自己实现原生的接口,用scroll方式去拉取数据。其中deleteByName方法用按同样方式分析。
2.userRepository.save(User user)接口分析
public <S extends T> S save(S entity) {
Assert.notNull(entity, "Cannot save 'null' entity.");
elasticsearchOperations.index(createIndexQuery(entity));
elasticsearchOperations.refresh(entityInformation.getIndexName());
return entity;
}
/**
* Index an object. Will do save or update
*
* @param query
* @return returns the document id
*/
String index(IndexQuery query);
@Override
public String index(IndexQuery query) {
String documentId = prepareIndex(query).execute().actionGet().getId();
// We should call this because we are not going through a mapper.
if (query.getObject() != null) {
setPersistentEntityId(query.getObject(), documentId);
}
return documentId;
}
/**
* refresh the index
*
* @param indexName
*
*/
void refresh(String indexName);
@Override
public void refresh(String indexName) {
Assert.notNull(indexName, "No index defined for refresh()");
client.admin().indices().refresh(refreshRequest(indexName)).actionGet();
}
可以看到在save entity的时候做了两件事情,首先是index,也就是这条数据会存到es,但是此时我们查询是不能查到的,因为查询到es数据需要建立对应的倒排索引,index只是把数据放到es中的buffer中,还没有建立倒排索引所以index后是不能立即搜索到es中对应的数据的,然后又调用了refresh方法,这个方法会触发es去清空buffer 并写入文件系统缓存,也就是会为我们的索引文件建立倒排索引,此时就可以搜索到了。所以save这个接口性能是很差的,比起mysql单挑插入,我这边测试了,性能下降一半,大概是300ms,服务器上面可能性能好一点,速度快一点;es插入最好调用批量的bulk接口
public <S extends T> List<S> save(List<S> entities) {
Assert.notNull(entities, "Cannot insert 'null' as a List.");
Assert.notEmpty(entities, "Cannot insert empty List.");
List<IndexQuery> queries = new ArrayList<>();
for (S s : entities) {
queries.add(createIndexQuery(s));
}
elasticsearchOperations.bulkIndex(queries);
elasticsearchOperations.refresh(entityInformation.getIndexName());
return entities;
}`
可以看到这里批量插入最后只调1次refresh,refresh操作会清空es的buffer,并且写入到文件系统缓存,这个操作开销还是比较大的。