springbootSpringBoot极简教程 · Spring Boot Spring Boot

spring data elasticsearch 使用及源码分

2017-12-13  本文已影响161人  freelands
spring integrate es.png
最近一直在用elasticsearch做一些数据的存储和查询 ,开始为了方便就直接使用了spring data elasticsearch ,期间也经历了很多坑,比如findByName(String name)这种接口默认只返回10条数据,自己也对源码做个跟踪,也研究了一下为什么elasticsearch要这样做。接下来我们通过分析spring data es的两个接口,来看一下是如果实现对es进行操作的。
1.findByName接口分析
public interface UserRepository extends ElasticsearchRepository<User,String> {
    /**
     * 默认只返回10条
     */
    List<User> findByName(String name);


    List<User> deleteByName(String name);
}

    @Override
    public void afterPropertiesSet() {
        super.afterPropertiesSet();
        Assert.notNull(operations, "ElasticsearchOperations must be configured!");
    }

这里的super指是RepositoryFactoryBeanSupport,spring data jpa启动整体代理接口注入到spring容器中也是通过这个类做的,其中spring data jpa 对应的类是JpaRepositoryFactoryBean

#其中RepositoryFactoryBeanSupport类中的下面这段代码比较核心
public void afterPropertiesSet() {
        #getRepositoryMetadata是获取这个接口的元数据,就是这个接口的一些基本的属
        this.repositoryMetadata = this.factory.getRepositoryMetadata(repositoryInterface);
        #这个方法是生成这个接口的实现类
        this.repository = Lazy.of(() -> this.factory.getRepository(repositoryInterface, repositoryFragmentsToUse));
        #如果不是懒加载立即初始化
        if (!lazyInit) {
            this.repository.get();
        }
    }

接着我们分析上面的核心代码getRepository这个方法

public <T> T getRepository(Class<T> repositoryInterface, RepositoryFragments fragments) {

        Assert.notNull(repositoryInterface, "Repository interface must not be null!");
        Assert.notNull(fragments, "RepositoryFragments must not be null!");

        RepositoryMetadata metadata = getRepositoryMetadata(repositoryInterface);
        RepositoryComposition composition = getRepositoryComposition(metadata, fragments);
        RepositoryInformation information = getRepositoryInformation(metadata, composition);

        validate(information, composition);

        //这里是构造一个对象,然后填充一些这个接口对应的entity信息,比如索引名,类型名等
        Object target = getTargetRepository(information);

        // Create proxy 接下来通过代理工厂创建代理
        ProxyFactory result = new ProxyFactory();
        result.setTarget(target);
        result.setInterfaces(repositoryInterface, Repository.class, TransactionalProxy.class);

        result.addAdvice(SurroundingTransactionDetectorMethodInterceptor.INSTANCE);
        result.addAdvisor(ExposeInvocationInterceptor.ADVISOR);

        postProcessors.forEach(processor -> processor.postProcess(result, information));
    
        result.addAdvice(new DefaultMethodInvokingMethodInterceptor());
        //这个DefaultMethodInvokingMethodInterceptor
        //会为所有我们在repository接口中自定义的方法加上切面
        result.addAdvice(new QueryExecutorMethodInterceptor(information));

        composition = composition.append(RepositoryFragment.implemented(target));
        result.addAdvice(new ImplementationMethodExecutionInterceptor(composition));
        //生产代理类返回注入到spring容器
        return (T) result.getProxy(classLoader);
    }

接下来我们看看QueryExecutorMethodInterceptor类,主要是resolveQuery这个方法

this.queries = lookupStrategy.map(it -> {

                SpelAwareProxyProjectionFactory factory = new SpelAwareProxyProjectionFactory();
                factory.setBeanClassLoader(classLoader);
                factory.setBeanFactory(beanFactory);

                return repositoryInformation.getQueryMethods().stream()//
                        .map(method -> Pair.of(method, it.resolveQuery(method, repositoryInformation, factory, namedQueries)))//
                        .peek(pair -> invokeListeners(pair.getSecond()))//
                        .collect(Pair.toMap());

            }).orElse(Collections.emptyMap());

resolveQuery 中会为每一个方法创建一个ElasticsearchPartQuery,其中ElasticsearchPartQuery是自定义操作实现的核心类,我们看一下代码

    //构建类的时候会创建一个PartTree主要用来描述这个方法是什么方法,有没有分页等到
    private final PartTree tree;
    private final MappingContext<?, ElasticsearchPersistentProperty> mappingContext;

    public ElasticsearchPartQuery(ElasticsearchQueryMethod method, ElasticsearchOperations elasticsearchOperations) {
        super(method, elasticsearchOperations);
        this.tree = new PartTree(method.getName(), method.getEntityInformation().getJavaType());
        this.mappingContext = elasticsearchOperations.getElasticsearchConverter().getMappingContext();
    }

    //这个是在方法运行时调用时通过切面到这里然后转换成elasticsearch的查询接口
    @Override
    public Object execute(Object[] parameters) {
        ParametersParameterAccessor accessor = new ParametersParameterAccessor(queryMethod.getParameters(), parameters);
        CriteriaQuery query = createQuery(accessor);
        if(tree.isDelete()) {
            Object result = countOrGetDocumentsForDelete(query, accessor);
            elasticsearchOperations.delete(query, queryMethod.getEntityInformation().getJavaType());
            return result;
        } else if (queryMethod.isPageQuery()) {
            query.setPageable(accessor.getPageable());
            return elasticsearchOperations.queryForPage(query, queryMethod.getEntityInformation().getJavaType());
        } else if (queryMethod.isStreamQuery()) {
            Class<?> entityType = queryMethod.getEntityInformation().getJavaType();
            if (query.getPageable().isUnpaged()) {
                int itemCount = (int) elasticsearchOperations.count(query, queryMethod.getEntityInformation().getJavaType());
                query.setPageable(PageRequest.of(0, Math.max(1, itemCount)));
            }

            return StreamUtils.createStreamFromIterator((CloseableIterator<Object>) elasticsearchOperations.stream(query, entityType));

        } else if (queryMethod.isCollectionQuery()) {
            if (accessor.getPageable() == null) {
                int itemCount = (int) elasticsearchOperations.count(query, queryMethod.getEntityInformation().getJavaType());
                query.setPageable(PageRequest.of(0, Math.max(1, itemCount)));
            } else {
                query.setPageable(accessor.getPageable());
            }
            return elasticsearchOperations.queryForList(query, queryMethod.getEntityInformation().getJavaType());
        } else if (tree.isCountProjection()) {
            return elasticsearchOperations.count(query, queryMethod.getEntityInformation().getJavaType());
        }
        return elasticsearchOperations.queryForObject(query, queryMethod.getEntityInformation().getJavaType());
    }

    

可以看到上面那个execute方法中主要有个ElasticsearchOperations类,负责调用es的方法,因为我们的List<User> findByName(String name)接口返回的是一个List,如果我们没有设置分页接口,这里会填充Unpaged.INSTANCE也就是表示客户端没有设置分页,es服务端会有默认填充。

image.png
然后逻辑走到了elasticsearchOperations.queryForList这个方法,这个方法调用的是queryForPage方法。这样整个流程我们就走通了,我们也看到了为什么我们写的es repository接口默认返回的是10条数据,所以如果想用这个接口去做大批量数据查询的话是会出现问题的,需要自己实现原生的接口,用scroll方式去拉取数据。其中deleteByName方法用按同样方式分析。
2.userRepository.save(User user)接口分析
public <S extends T> S save(S entity) {
        Assert.notNull(entity, "Cannot save 'null' entity.");
        elasticsearchOperations.index(createIndexQuery(entity));
        elasticsearchOperations.refresh(entityInformation.getIndexName());
        return entity;
    }

    /**
     * Index an object. Will do save or update
     *
     * @param query
     * @return returns the document id
     */
    String index(IndexQuery query);

    @Override
    public String index(IndexQuery query) {
        String documentId = prepareIndex(query).execute().actionGet().getId();
        // We should call this because we are not going through a mapper.
        if (query.getObject() != null) {
            setPersistentEntityId(query.getObject(), documentId);
        }
        return documentId;
    }

    /**
     * refresh the index
     *
     * @param indexName
     *
     */
    void refresh(String indexName);

    @Override
    public void refresh(String indexName) {
        Assert.notNull(indexName, "No index defined for refresh()");
        client.admin().indices().refresh(refreshRequest(indexName)).actionGet();
    }

可以看到在save entity的时候做了两件事情,首先是index,也就是这条数据会存到es,但是此时我们查询是不能查到的,因为查询到es数据需要建立对应的倒排索引,index只是把数据放到es中的buffer中,还没有建立倒排索引所以index后是不能立即搜索到es中对应的数据的,然后又调用了refresh方法,这个方法会触发es去清空buffer 并写入文件系统缓存,也就是会为我们的索引文件建立倒排索引,此时就可以搜索到了。所以save这个接口性能是很差的,比起mysql单挑插入,我这边测试了,性能下降一半,大概是300ms,服务器上面可能性能好一点,速度快一点;es插入最好调用批量的bulk接口

public <S extends T> List<S> save(List<S> entities) {
        Assert.notNull(entities, "Cannot insert 'null' as a List.");
        Assert.notEmpty(entities, "Cannot insert empty List.");
        List<IndexQuery> queries = new ArrayList<>();
        for (S s : entities) {
            queries.add(createIndexQuery(s));
        }
        elasticsearchOperations.bulkIndex(queries);
        elasticsearchOperations.refresh(entityInformation.getIndexName());
        return entities;
    }`

可以看到这里批量插入最后只调1次refresh,refresh操作会清空es的buffer,并且写入到文件系统缓存,这个操作开销还是比较大的。

上一篇下一篇

猜你喜欢

热点阅读