ES的全面用法
ES官方中文文档:https://www.elastic.co/guide/cn/elasticsearch/guide/current/index.html
https://blog.csdn.net/mon_star/article/details/102934620
坑位1:排序字段,必须要是keyword类型修饰的字段。但keyword修饰的字段只能精确查询,不能模糊查询。要想模糊查询必须是text类型修饰的字段。
如果你的数据存在两张table里,用_id关联,那么你在用match函数以_id作为条件查询的时候,可以命中两条记录,这两条记录是不用指定表的
{"query":{"match":{"_id":"6836808667103232"}}}
查询结果如下:
{
"took": 14,
"timed_out": false,
"_shards": {
"total": 53,
"successful": 53,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 2,
"relation": "eq"
},
"max_score": 1,
"hits": [
{
"_index": "common_user",
"_type": "table",
"_id": "6836808667103232",
"_score": 1,
"_source": {
"tenant_id": "1233445",
"creator": null,
"clf_code": "znyh",
"gender": "1",
"user_name": "zhangsan2",
"expiration_time": "2022-02-26T23:57:37.000Z",
"show_name": null,
"modifier": null,
"created_at": "2021-08-26T23:57:37.000Z",
"real_name": null,
"avatar": null,
"cipher_algorithm": null,
"password": null,
"updated_at": "2021-08-26T23:57:37.000Z",
"out_uid": null,
"phone": null,
"nick_name": null,
"encryption_key": null,
"id": "6836808667103232",
"email": null,
"status": 1
}
}
,
{
"_index": "user_ext",
"_type": "table",
"_id": "6836808667103232",
"_score": 1,
"_source": {
"exten1": "扩展字段1",
"exten2": "扩展字段2"
}
}
]
}
}
-
数据存储,mybatis插件查询ES详细设计【优先级:高】
- 数据库和ES规则定义
数据库使用规则
- 数据库和表
- mysql版本使用mysql 8.0.24,库选择存储引擎必须是InnoDB,需要支持行锁和表锁。
- mysql表的编码字段必须是utf8mb4,核对utf8mb4_general_ci。因为utf8mb4是可以存储表情符的。
- mysql表的前缀必须以"服务业务"为前缀,比如“bum_user_properties_template”
- mysql表必须有主键,而且不允许使用自增长列,必须使用派拉雪花算法库,做唯一id的生成工具。同时每个表都要有create_time和update_time的字段,主要是用于未来数据的迁移而做的准备。
- 字段如果出现组合词,需要用英文"_"来连接,不要出现驼峰式命名比如“isUnique”,需写成is_unique
- 字段中的值,比如状态、性别、开关等散列较小的字段,禁止使用“0、1代表”,最好用英文翻译便于理解。比如“yes/no,open/close”
<dependency>
<groupId>com.paraview.boot</groupId>
<artifactId>uid-spring-boot-starter</artifactId>
</dependency>
- mysql表不允许使用自增长列,必须使用派拉雪花算法库,做唯一id的生成工具。同时每个表都要有create_time和update_time的字段,主要是用于未来数据的迁移而做的准备。
- 索引相关
- 选择的字段存放的值必须有足够的散列,例如性别、是否、有无这种字段不需要添加索引,因为他们就2个值,散列较小。
- 索引失效的现象:
无法复制加载中的内容
- 索引设计的建议:
无法复制加载中的内容
- 程序相关
1、如果数据表的数据没某些定时任务所监控,定时迁移或批量更新某些数据的场景。必须分批取用,分批更新。而且每批次的大小,最好控制在1000条以内。
2、这种被监控的数据表,必须有阶段性统计程序来监控,频次可以一天或者半天一次,避免出现数据拥塞或数据爆仓。
- 数据库链接
---获取数据库服务的最大链接数
show variables like '%max_connections%';
---获取当前数据库的连接数
show global status like 'Max_used_connections';
程序当中的数据库链接原则:
设置数据库最大连接数合理区间表达式=Max_used_connections / max_connections * 100%
对于mysql服务器最大连接数值的设置范围比较理想的是:服务器响应的最大连接
数值占服务器上限连接数值的比例值在10%以上,如果在10%以下,
容易出现MySQL: ERROR 1040: Too many connections”
###所以数据库链接数的大小是要根据系统业务量来做定期调整的,不是一成不变的。
datasource:
#druid相关配置
druid:
#监控统计拦截的filters
filters: stat
driver-class-name: com.mysql.cj.jdbc.Driver
#基本属性
url: jdbc:mysql://xxxxx:3306/idaas2021?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
username:xxxx
password:xxxx
#配置初始化数据库连接数大小/最小/最大
initial-size: 4
min-idle: 4
max-active: 20
#获取连接等待超时时间
max-wait: 60000
#间隔多久进行一次检测,检测需要关闭的空闲连接
time-between-eviction-runs-millis: 60000
#一个连接在池中最小生存的时间
min-evictable-idle-time-millis: 300000
ES的使用规则
- 字段相关
1、排序字段,必须要是keyword类型修饰的字段。但keyword修饰的字段只能精确查询,不能模糊查询。要想模糊查询,字段必须是text类型修饰。
2、优先使用ES ApI方式进行数据检索
无法复制加载中的内容
3、推荐使用单层平铺的数据结构,如果需要嵌套,尽量减少嵌套层级,因为es对嵌套查询的效率相比于非嵌套的数据,会下降几十倍,甚至上百倍。
4、如果非要需要存储嵌套数而又不想影响性能,最好选用mongodb来做结构化数据存储。
5、Mybatis底层与ES整合方案,不论是x-pack和elasticsearch-sql的sql好像都是不支持insert和update的,所以新增和插入操作还是需要用rest的方式或者transport的方式来执行。所以sql方式还不是很成熟或者说支持度不是很好,能用原生方式就用原生,一来可控性好,二来版本兼容性也好。
原生方式
- 引入spring data的es组件
<!--引入spring data的技术es组件-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
- 封装es的工具
package com.paraview.bum.elasticsearch.utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* ES操作工具类
*
* @author andyzhang
* @since 2021/08/16
*/
@Component
@Slf4j
public class ElasticsearchTempUtils {
/**
* 获取原生的ElasticsearchRestTemplate
*
* @return
*/
public static ElasticsearchRestTemplate getClient() {
return SpringContextHandler.getBean(ElasticsearchRestTemplate.class);
}
/**
* 创建IndexQuery的便利函数
*
* @param id
* @param map
* @return
*/
public static IndexQuery createIndexQuery(String id, Map map) {
IndexQuery indexQuery = new IndexQueryBuilder()
.withId(id)
.withObject(map).build();
return indexQuery;
}
/**
* 创建IndexQuery的便利函数
*
* @param id
* @param obj
* @return
*/
public static IndexQuery createIndexQuery(String id, Object obj) {
IndexQuery indexQuery = new IndexQueryBuilder()
.withId(id)
.withObject(obj).build();
return indexQuery;
}
/**
* 创建UpdateQuery的便利函数
*
* @param id
* @param map
* @return
*/
public static UpdateQuery createUpdateQuery(String id, Map<java.lang.String,? extends java.lang.Object> map) {
UpdateQuery updateQuery = UpdateQuery
.builder(id)
.withDocument(Document.from(map))
.build();
return updateQuery;
}
/**
* 判断索引是否存在
*
* @param index 索引
* @return documentid 文档编号
*/
public static boolean existsIndex(String index) {
return getClient().indexOps(IndexCoordinates.of(index)).exists();
}
/**
* 删除索引
*
* @param index 索引
* @return documentid 文档编号
*/
public static boolean deleteIndex(String index) {
return getClient().indexOps(IndexCoordinates.of(index)).delete();
}
/**
* 创建索引
*
* @param index 索引
* @return documentid 文档编号
*/
public static boolean createIndex(String index) {
return getClient().indexOps(IndexCoordinates.of(index)).create();
}
/**
* 创建索引同时添加数据
* 更新/创建同时支持,存在则更新,不存在则插入
* @param index 索引
* @param id 编号
* @param map 数据
* @return documentid 文档编号
*/
public static String createIndex(String index, String id, Map map) {
IndexQuery indexQuery = new IndexQueryBuilder()
.withId(id)
.withObject(map).build();
return getClient().index(indexQuery, IndexCoordinates.of(index));
}
/**
* 批量插入
*
* @param index 索引
* @param queries 数据
* @return
*/
public static List<String> bulkIndex(String index, List<IndexQuery> queries) {
return getClient().bulkIndex(queries, IndexCoordinates.of(index));
}
/**
* 批量更新
*
* @param index 索引
* @param queries 数据
* @return
*/
public static void bulkUpdate(String index, List<UpdateQuery> queries) {
getClient().bulkUpdate(queries, IndexCoordinates.of(index));
}
}
具体在业务中使用:
/**
*新增用户
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void save(User user) {
UserPO userPO = mapper.toPO(user);
//插入用户主表
userMapper.insert(userPO);
//批量更新动态字段
ElasticsearchTempUtils.createIndex(TableName.TB_COMMON_USER_DYNAMIC_PROPERTIES_VALUE,user.getUid(),user.getMutableCondition());
}
/**
*更新用户
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void update(List<User> users) {
List<UserPO> userPOs = mapper.toPOs(users);
//插入用户主表
userMapper.batchUpdate(userPOs);
//封装动态字段
Map<String,Map<String,Object>> dpos = Maps.newConcurrentMap();
for (User user: users) {
dpos.put(user.getUid() , user.getMutableCondition());
}
List<UpdateQuery> updateQueryList=new ArrayList<>();
users.forEach(user -> {
UpdateQuery updateQuery=ElasticsearchTempUtils.createUpdateQuery(user.getUid(),user.getMutableCondition());
updateQueryList.add(updateQuery);
});
//批量更新动态字段
ElasticsearchTempUtils.bulkUpdate(TableName.TB_COMMON_USER_DYNAMIC_PROPERTIES_VALUE,updateQueryList);
}
/**
*查询用户
*/
@Override
public List<User> findByUsers(User user) {
List<UserPO> userPOs = userMapper.findByUsers(mapper.toPO(user));
List<User> restUsers= mapper.posTo(userPOs);
//组装加工es查询出来的数据
CriteriaQuery query = new CriteriaQuery(new Criteria()
.and(new Criteria("gender").is("0")));
List<Map> map=ElasticsearchTempUtils.queryIndex(TableName.TB_COMMON_USER_DYNAMIC_PROPERTIES_VALUE,query);
return restUsers;
}
/**
* 删除一组id的用户
* @param ids
* @return
*/
@Transactional(rollbackFor = Exception.class)
@Override
public Result delete(List<String> ids) {
int cnt= userMapper.batchDelete(ids);
if(cnt>=0){
ids.forEach(id->{
ElasticsearchTempUtils.deleteIndex(TableName.TB_COMMON_USER_DYNAMIC_PROPERTIES_VALUE,id);
});
return ResultUtils.success("成功");
}
return ResultUtils.error("失败");
}
sql方式
优点
可以用kibana控制台和sql语句来查询es的数据,对开发者来说降低学习成本。
POST /_sql?format=txt
{
"query": """
select * FROM "bum_user_classification" where id='6849018910998528'
"""
}
POST /_sql?format=txt
{
"query": """
select * FROM "bum_user_classification" where clfName='cz12222222222j'
"""
}
POST /_sql?format=txt
{
"query": """
select * FROM "bum_user_classification" where tenantId='11'
"""
}
-
数据库和ES同步设计
-
mybatis插件设计
官方文档xpack-sql SQL | Elasticsearch Guide [7.15] | Elastic
支持很多种获取数据的方式:其中我们用到的就是jdbc的方式
前提条件:ES 及 x-pack 下载安装 es版本7.8为例
添加x-pack配置:
spring:
datasource:
name: mysql_test
type: com.alibaba.druid.pool.DruidDataSource
es:
url: jdbc:es://http://dev.kibana.idaas.sso360.cn:9200
driver-class-name: org.elasticsearch.xpack.sql.jdbc.EsDriver
mapperLocations: classpath:mapper/es/*.xml
configLocation: classpath:config/mybatis.cfg.xml
xpack.security.enabled: false
server:
port: 8770
es的配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
<settings>
<!-- 开启二级缓存-->
<setting name="cacheEnabled" value="true"/>
<!-- 打印查询语句-->
<setting name="logImpl" value="STDOUT_LOGGING"/>
<!-- 此处配置非常重要 不能缺失-->
<setting name="useColumnLabel" value="false"/>
</settings>
</configuration>
添加pom配置:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.paraview.boot</groupId>
<artifactId>paraview-boot-parent</artifactId>
<version>1.2.0-SNAPSHOT</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>xpack</artifactId>
<version>1.0.1-SNAPSHOT</version>
<name>xpack</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.6</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.3</version>
</dependency>
<!-- xpack-sql引入-->
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>x-pack-sql-jdbc</artifactId>
<version>7.8.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
datasource的定义类
package com.example.xpack.config;
import com.alibaba.druid.pool.DruidDataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import javax.sql.DataSource;
/**
* @author lius
* @version : EsDruidDataSourceConfig.java, v 1.0 2021年09月2021/9/29日 10时57分
*/
@Configuration
@MapperScan(basePackages = {"com.example.xpack.dao"},sqlSessionFactoryRef = "esSqlSessionFactory")
public class EsDruidDataSourceConfig {
@Value("${spring.datasource.es.configLocation}")
private String configLocation;
@Value("${spring.datasource.es.mapperLocations}")
private String mapperLocations;
@Value("${spring.datasource.es.url}")
private String esUrl;
@Value("${spring.datasource.es.driver-class-name}")
private String driverClassName;
@Bean(name = "dataSource")
public DataSource dataSource(){
DruidDataSource dataSource = new DruidDataSource();
dataSource.setDriverClassName(driverClassName);
dataSource.setUrl(esUrl);
return dataSource;
}
@Bean(name = "esSqlSessionFactory")
public SqlSessionFactory sqlSessionFactory(@Qualifier("dataSource") DataSource dataSource) throws Exception{
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSource);
PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
//配置mapper文件位置
sqlSessionFactoryBean.setMapperLocations(resolver.getResources(mapperLocations));
sqlSessionFactoryBean.setConfigLocation(resolver.getResource(configLocation));
return sqlSessionFactoryBean.getObject();
}
}
持久化操作文件dao类
package com.example.xpack.dao;
import org.apache.ibatis.annotations.Mapper;
import java.util.Map;
@Mapper
public interface RwTradeMapper {
Map testSql();
}
查询es服务的mapper文件
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.xpack.dao.RwTradeMapper">
<select id="testSql" resultType="java.util.Map">
select * FROM "bum_user_classification" where id='1632898311335'
</select>
</mapper>
注意事项
2、双引号和单引号的区别:双引号用于修饰表或字段,单引号修饰值,如下:
select * FROM "bum_user_classification" where id='1632898311335'
JDBC Connection [org.elasticsearch.xpack.sql.jdbc.JdbcConnection@3e28dc96] will not be managed by Spring
==> Preparing: select * FROM "bum_user_classification" where clfName='cz12222222j'
==> Parameters:
<== Columns: _class, clfName, clfNameI18.defaultValue, clfNameI18.label, clfNameI18.lang, clfNameI18.value, createdAt, describe, id, propertiesList._class, propertiesList.attributeField, propertiesList.id, propertiesList.isFormDisplay, propertiesList.isRequire, propertiesList.isSearchable, propertiesList.isUnion, propertiesList.pgroup, propertiesList.proType, propertiesList.status, propertiesList.tenantId, status, tenantId, updatedAt
<== Row: com.paraview.bum.resource.biz.domain.model.UserClassification, cz12222222j, 2, 2, en_US, czj, 1632902638159, 11, 6849005615054850, com.paraview.bum.resource.api.model.request.PropertiesDTO, null, 123, false, false, false, false, ext, user, null, t1, 0, 11, 1632902638158
<== Total: 1
Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@5fb07347]
{clfNameI18={defaultValue=2, label=2, lang=en_US, value=czj}, propertiesList={pgroup=ext, proType=user, isFormDisplay=false, isUnion=false, tenantId=t1, _class=com.paraview.bum.resource.api.model.request.PropertiesDTO, id=123, isRequire=false, isSearchable=false}, createdAt=1632902638159, tenantId=11, _class=com.paraview.bum.resource.biz.domain.model.UserClassification, describe=11, id=6849005615054850, clfName=cz12222222j, status=0, updatedAt=1632902638158}
局限性
使用SQL查询ES有一定的局限性,没有原生的Query DSL那么强大,对于嵌套属性和某些函数的支持并不怎么好,但是平时用来查询下数据基本够用了。