ES的全面用法

2021-08-27  本文已影响0人  机灵鬼鬼

ES官方中文文档:https://www.elastic.co/guide/cn/elasticsearch/guide/current/index.html
https://blog.csdn.net/mon_star/article/details/102934620

坑位1:排序字段,必须要是keyword类型修饰的字段。但keyword修饰的字段只能精确查询,不能模糊查询。要想模糊查询必须是text类型修饰的字段。

如果你的数据存在两张table里,用_id关联,那么你在用match函数以_id作为条件查询的时候,可以命中两条记录,这两条记录是不用指定表的

{"query":{"match":{"_id":"6836808667103232"}}}

查询结果如下:

{
"took": 14,
"timed_out": false,
"_shards": {
"total": 53,
"successful": 53,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 2,
"relation": "eq"
},
"max_score": 1,
"hits": [
{
"_index": "common_user",
"_type": "table",
"_id": "6836808667103232",
"_score": 1,
"_source": {
"tenant_id": "1233445",
"creator": null,
"clf_code": "znyh",
"gender": "1",
"user_name": "zhangsan2",
"expiration_time": "2022-02-26T23:57:37.000Z",
"show_name": null,
"modifier": null,
"created_at": "2021-08-26T23:57:37.000Z",
"real_name": null,
"avatar": null,
"cipher_algorithm": null,
"password": null,
"updated_at": "2021-08-26T23:57:37.000Z",
"out_uid": null,
"phone": null,
"nick_name": null,
"encryption_key": null,
"id": "6836808667103232",
"email": null,
"status": 1
}
}
,
{
"_index": "user_ext",
"_type": "table",
"_id": "6836808667103232",
"_score": 1,
"_source": {
"exten1": "扩展字段1",
"exten2": "扩展字段2"
}
}
]
}
}
  1. 数据存储,mybatis插件查询ES详细设计【优先级:高】

    1. 数据库和ES规则定义

数据库使用规则

  1. 数据库和表
image
<dependency>
    <groupId>com.paraview.boot</groupId>
    <artifactId>uid-spring-boot-starter</artifactId>
</dependency>

  1. 索引相关

无法复制加载中的内容

无法复制加载中的内容

  1. 程序相关

1、如果数据表的数据没某些定时任务所监控,定时迁移或批量更新某些数据的场景。必须分批取用,分批更新。而且每批次的大小,最好控制在1000条以内。

2、这种被监控的数据表,必须有阶段性统计程序来监控,频次可以一天或者半天一次,避免出现数据拥塞或数据爆仓。

  1. 数据库链接
---获取数据库服务的最大链接数
show variables like '%max_connections%';
---获取当前数据库的连接数
show global status like 'Max_used_connections';

程序当中的数据库链接原则:

设置数据库最大连接数合理区间表达式=Max_used_connections / max_connections * 100%
对于mysql服务器最大连接数值的设置范围比较理想的是:服务器响应的最大连接
数值占服务器上限连接数值的比例值在10%以上,如果在10%以下,
容易出现MySQL: ERROR 1040: Too many connections”
###所以数据库链接数的大小是要根据系统业务量来做定期调整的,不是一成不变的。
datasource:
    #druid相关配置
    druid:
      #监控统计拦截的filters
      filters: stat
      driver-class-name: com.mysql.cj.jdbc.Driver
      #基本属性
      url: jdbc:mysql://xxxxx:3306/idaas2021?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
      username:xxxx
      password:xxxx
      #配置初始化数据库连接数大小/最小/最大
      initial-size: 4
      min-idle: 4
      max-active: 20
      #获取连接等待超时时间
      max-wait: 60000
      #间隔多久进行一次检测,检测需要关闭的空闲连接
      time-between-eviction-runs-millis: 60000
      #一个连接在池中最小生存的时间
      min-evictable-idle-time-millis: 300000

ES的使用规则

  1. 字段相关

1、排序字段,必须要是keyword类型修饰的字段。但keyword修饰的字段只能精确查询,不能模糊查询。要想模糊查询,字段必须是text类型修饰。

2、优先使用ES ApI方式进行数据检索

无法复制加载中的内容

3、推荐使用单层平铺的数据结构,如果需要嵌套,尽量减少嵌套层级,因为es对嵌套查询的效率相比于非嵌套的数据,会下降几十倍,甚至上百倍。

4、如果非要需要存储嵌套数而又不想影响性能,最好选用mongodb来做结构化数据存储。

5、Mybatis底层与ES整合方案,不论是x-pack和elasticsearch-sql的sql好像都是不支持insert和update的,所以新增和插入操作还是需要用rest的方式或者transport的方式来执行。所以sql方式还不是很成熟或者说支持度不是很好,能用原生方式就用原生,一来可控性好,二来版本兼容性也好。

原生方式

<!--引入spring data的技术es组件-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

package com.paraview.bum.elasticsearch.utils;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * ES操作工具类
 *
 * @author andyzhang
 * @since 2021/08/16
 */
@Component
@Slf4j
public class ElasticsearchTempUtils {

    /**
     * 获取原生的ElasticsearchRestTemplate
     *
     * @return
     */
    public static ElasticsearchRestTemplate getClient() {
        return SpringContextHandler.getBean(ElasticsearchRestTemplate.class);
    }

    /**
     * 创建IndexQuery的便利函数
     *
     * @param id
     * @param map
     * @return
     */
    public static IndexQuery createIndexQuery(String id, Map map) {
        IndexQuery indexQuery = new IndexQueryBuilder()
                .withId(id)
                .withObject(map).build();
        return indexQuery;
    }

    /**
     * 创建IndexQuery的便利函数
     *
     * @param id
     * @param obj
     * @return
     */
    public static IndexQuery createIndexQuery(String id, Object obj) {
        IndexQuery indexQuery = new IndexQueryBuilder()
                .withId(id)
                .withObject(obj).build();
        return indexQuery;
    }

    /**
     * 创建UpdateQuery的便利函数
     *
     * @param id
     * @param map
     * @return
     */
    public static UpdateQuery createUpdateQuery(String id, Map<java.lang.String,? extends java.lang.Object> map) {
        UpdateQuery updateQuery = UpdateQuery
                .builder(id)
                .withDocument(Document.from(map))
                .build();
        return updateQuery;
    }

    /**
     * 判断索引是否存在
     *
     * @param index 索引
     * @return documentid 文档编号
     */
    public static boolean existsIndex(String index) {
        return getClient().indexOps(IndexCoordinates.of(index)).exists();
    }

    /**
     * 删除索引
     *
     * @param index 索引
     * @return documentid 文档编号
     */
    public static boolean deleteIndex(String index) {
        return getClient().indexOps(IndexCoordinates.of(index)).delete();
    }

    /**
     * 创建索引
     *
     * @param index 索引
     * @return documentid 文档编号
     */
    public static boolean createIndex(String index) {
        return getClient().indexOps(IndexCoordinates.of(index)).create();
    }

    /**
     * 创建索引同时添加数据
     * 更新/创建同时支持,存在则更新,不存在则插入
     * @param index 索引
     * @param id    编号
     * @param map   数据
     * @return documentid 文档编号
     */
    public static String createIndex(String index, String id, Map map) {
        IndexQuery indexQuery = new IndexQueryBuilder()
                .withId(id)
                .withObject(map).build();

        return getClient().index(indexQuery, IndexCoordinates.of(index));

    }

    /**
     * 批量插入
     *
     * @param index   索引
     * @param queries 数据
     * @return
     */
    public static List<String> bulkIndex(String index, List<IndexQuery> queries) {

        return getClient().bulkIndex(queries, IndexCoordinates.of(index));
    }

    /**
     * 批量更新
     *
     * @param index   索引
     * @param queries 数据
     * @return
     */
    public static void bulkUpdate(String index, List<UpdateQuery> queries) {
        getClient().bulkUpdate(queries, IndexCoordinates.of(index));
    }

}

具体在业务中使用:

/**
*新增用户
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void save(User user) {
    UserPO userPO = mapper.toPO(user);
    //插入用户主表
    userMapper.insert(userPO);
    //批量更新动态字段
    ElasticsearchTempUtils.createIndex(TableName.TB_COMMON_USER_DYNAMIC_PROPERTIES_VALUE,user.getUid(),user.getMutableCondition());
}
/**
*更新用户
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void update(List<User> users) {
    List<UserPO> userPOs = mapper.toPOs(users);
    //插入用户主表
    userMapper.batchUpdate(userPOs);
    //封装动态字段
    Map<String,Map<String,Object>> dpos = Maps.newConcurrentMap();
    for (User user: users) {
        dpos.put(user.getUid() , user.getMutableCondition());
    }
    List<UpdateQuery> updateQueryList=new ArrayList<>();
    users.forEach(user -> {
        UpdateQuery updateQuery=ElasticsearchTempUtils.createUpdateQuery(user.getUid(),user.getMutableCondition());
        updateQueryList.add(updateQuery);
    });
    //批量更新动态字段
    ElasticsearchTempUtils.bulkUpdate(TableName.TB_COMMON_USER_DYNAMIC_PROPERTIES_VALUE,updateQueryList);

}
/**
*查询用户
*/
@Override
public List<User> findByUsers(User user) {
    List<UserPO> userPOs = userMapper.findByUsers(mapper.toPO(user));
    List<User> restUsers= mapper.posTo(userPOs);
    //组装加工es查询出来的数据
    CriteriaQuery query = new CriteriaQuery(new Criteria()
            .and(new Criteria("gender").is("0")));
    List<Map> map=ElasticsearchTempUtils.queryIndex(TableName.TB_COMMON_USER_DYNAMIC_PROPERTIES_VALUE,query);
    return restUsers;
}

/**
 * 删除一组id的用户
 * @param ids
 * @return
 */
@Transactional(rollbackFor = Exception.class)
@Override
public Result delete(List<String> ids) {
    int cnt= userMapper.batchDelete(ids);
    if(cnt>=0){
        ids.forEach(id->{
            ElasticsearchTempUtils.deleteIndex(TableName.TB_COMMON_USER_DYNAMIC_PROPERTIES_VALUE,id);
        });
        return ResultUtils.success("成功");
    }
    return ResultUtils.error("失败");
}

sql方式

优点

可以用kibana控制台和sql语句来查询es的数据,对开发者来说降低学习成本。

image
POST /_sql?format=txt
{
  "query": """
    select * FROM "bum_user_classification" where id='6849018910998528'
  """
}

POST /_sql?format=txt
{
  "query": """
    select * FROM "bum_user_classification" where clfName='cz12222222222j'
  """
}

POST /_sql?format=txt
{
  "query": """
    select * FROM "bum_user_classification" where tenantId='11'
  """
}

  1. 数据库和ES同步设计

  2. mybatis插件设计

官方文档xpack-sql SQL | Elasticsearch Guide [7.15] | Elastic

支持很多种获取数据的方式:其中我们用到的就是jdbc的方式

image

前提条件:ES 及 x-pack 下载安装 es版本7.8为例

添加x-pack配置:


spring:
  datasource:
    name: mysql_test
    type: com.alibaba.druid.pool.DruidDataSource
    es:
      url: jdbc:es://http://dev.kibana.idaas.sso360.cn:9200
      driver-class-name: org.elasticsearch.xpack.sql.jdbc.EsDriver
      mapperLocations: classpath:mapper/es/*.xml
      configLocation: classpath:config/mybatis.cfg.xml

xpack.security.enabled: false

server:
  port: 8770

es的配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
    <settings>
        <!-- 开启二级缓存-->
        <setting name="cacheEnabled" value="true"/>
        <!-- 打印查询语句-->
        <setting name="logImpl" value="STDOUT_LOGGING"/>
        <!-- 此处配置非常重要 不能缺失-->
        <setting name="useColumnLabel" value="false"/>
    </settings>
</configuration>

添加pom配置:

 <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.paraview.boot</groupId>
        <artifactId>paraview-boot-parent</artifactId>
        <version>1.2.0-SNAPSHOT</version>
        <relativePath/>
    </parent>
    <groupId>com.example</groupId>
    <artifactId>xpack</artifactId>
    <version>1.0.1-SNAPSHOT</version>
    <name>xpack</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>11</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.2.6</version>
        </dependency>

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.3</version>
        </dependency>
        <!-- xpack-sql引入-->
        <dependency>
            <groupId>org.elasticsearch.plugin</groupId>
            <artifactId>x-pack-sql-jdbc</artifactId>
            <version>7.8.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>  

datasource的定义类

package com.example.xpack.config;

import com.alibaba.druid.pool.DruidDataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

import javax.sql.DataSource;

/**
 * @author lius
 * @version : EsDruidDataSourceConfig.java, v 1.0 2021年09月2021/9/29日 10时57分
 */
@Configuration
@MapperScan(basePackages = {"com.example.xpack.dao"},sqlSessionFactoryRef = "esSqlSessionFactory")
public class EsDruidDataSourceConfig {
    @Value("${spring.datasource.es.configLocation}")
    private String configLocation;
    @Value("${spring.datasource.es.mapperLocations}")
    private String mapperLocations;
    @Value("${spring.datasource.es.url}")
    private String esUrl;
    @Value("${spring.datasource.es.driver-class-name}")
    private String driverClassName;
    @Bean(name = "dataSource")
    public DataSource dataSource(){
        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setDriverClassName(driverClassName);
        dataSource.setUrl(esUrl);
        return dataSource;
    }
    @Bean(name = "esSqlSessionFactory")
    public SqlSessionFactory sqlSessionFactory(@Qualifier("dataSource") DataSource dataSource) throws Exception{

        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSource);

        PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
        //配置mapper文件位置
        sqlSessionFactoryBean.setMapperLocations(resolver.getResources(mapperLocations));
        sqlSessionFactoryBean.setConfigLocation(resolver.getResource(configLocation));
        return sqlSessionFactoryBean.getObject();
    }
}

持久化操作文件dao类

package com.example.xpack.dao;

import org.apache.ibatis.annotations.Mapper;

import java.util.Map;

@Mapper
public interface RwTradeMapper {
    Map testSql();
}

查询es服务的mapper文件


<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.xpack.dao.RwTradeMapper">
    <select id="testSql" resultType="java.util.Map">
        select * FROM "bum_user_classification" where id='1632898311335'
    </select>
</mapper>

注意事项

1、es的sql语法和特性介绍

2、双引号和单引号的区别:双引号用于修饰表或字段,单引号修饰值,如下:

select * FROM "bum_user_classification" where id='1632898311335' 

JDBC Connection [org.elasticsearch.xpack.sql.jdbc.JdbcConnection@3e28dc96] will not be managed by Spring
==>  Preparing: select * FROM "bum_user_classification" where clfName='cz12222222j'
==> Parameters: 
<==    Columns: _class, clfName, clfNameI18.defaultValue, clfNameI18.label, clfNameI18.lang, clfNameI18.value, createdAt, describe, id, propertiesList._class, propertiesList.attributeField, propertiesList.id, propertiesList.isFormDisplay, propertiesList.isRequire, propertiesList.isSearchable, propertiesList.isUnion, propertiesList.pgroup, propertiesList.proType, propertiesList.status, propertiesList.tenantId, status, tenantId, updatedAt
<==        Row: com.paraview.bum.resource.biz.domain.model.UserClassification, cz12222222j, 2, 2, en_US, czj, 1632902638159, 11, 6849005615054850, com.paraview.bum.resource.api.model.request.PropertiesDTO, null, 123, false, false, false, false, ext, user, null, t1, 0, 11, 1632902638158
<==      Total: 1
Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@5fb07347]
{clfNameI18={defaultValue=2, label=2, lang=en_US, value=czj}, propertiesList={pgroup=ext, proType=user, isFormDisplay=false, isUnion=false, tenantId=t1, _class=com.paraview.bum.resource.api.model.request.PropertiesDTO, id=123, isRequire=false, isSearchable=false}, createdAt=1632902638159, tenantId=11, _class=com.paraview.bum.resource.biz.domain.model.UserClassification, describe=11, id=6849005615054850, clfName=cz12222222j, status=0, updatedAt=1632902638158}

局限性

使用SQL查询ES有一定的局限性,没有原生的Query DSL那么强大,对于嵌套属性和某些函数的支持并不怎么好,但是平时用来查询下数据基本够用了。

上一篇 下一篇

猜你喜欢

热点阅读