百亿级数据搜索引擎,Elasticsearch与SpringBo
- Elasticsearch快照
- Elasticsearch恢复
- Spring与Elasticsearch整合
快照和恢复
Snapshot和Restore 模块允许创建单个索引或者整个集群的快照到远程仓库。要试用快照和恢复,需要在Elasticsearch.yml配置文件中添加如下项:
path.repo: /home/app/es_backup
仓库
在进行任何快照或者恢复操作之前必须有一个快照仓库注册在Elasticsearch里。下面的这个命令注册了一个名为 es_backup的共享文件系统仓库,快照将会存储在 /home/app/es_backup 这个目录。
$ curl -XPUT 192.168.56.105:9200/_snapshot/es_backup -d'{
"type": "fs",
"settings": {
"location": "/home/hadoop/es_backup",
"compress": true
}
}'
# 查看这个仓库的信息
$ curl -XGET localhost:9200/_snapshot/es_backup?pretty
$ curl -XGET localhost:9200/_snapshot/repo*,*backup?pretty
$ curl -XGET localhost:9200/_snapshot
$ curl -XGET localhost:9200/_snapshot/_all
共享文件系统的仓库
共享文件系统仓库 ("type": "fs") 是使用共享的文件系统去存储快照。 在 location 参数里指定的具体存储路径必须和共享文件系统里的位置是一致,并且能被所有的数据节点和master节点访问。 另外还支持如下的一些参数设置:
参数 | 说明 |
---|---|
location | 指定快照的存储位置。必填。 |
compress | 指定是否对快照文件进行压缩, 默认为 true。 |
chunk_size | 指定快照大文件可以被拆分为多大。这个参数指明了每块的字节数。也可用不同的单位标识。 比如,1g,10m,5k等。默认是 null (表示不限制块大小)。 |
max_restore_bytes_per_sec | 每个节点恢复数据的最高速度限制,默认是 20mb/s |
max_snapshot_bytes_per_sec | 每个节点做快照的最高速度限制,默认是 20mb/s |
只读URL仓库
URL仓库("type": "url")可以用作以只读方式访问共享文件系统仓库中的数据。
注册存储仓库后,系统会立即在所有主节点和数据节点上对存储仓库进行验证,以确保该存储仓库在群集中所有节点上都可以正常运行。在注册或更新存储仓库时,可以使用verify参数显式禁用存储库验证:
$ curl -XPUT localhost:9200/_snapshot/s3_repository?verify=false
{
"type": "s3",
"settings": {
"bucket": "my_s3_bucket",
"region": "eu-west-1"
}
}
其他仓库类型
官方提供了其他存储仓库插件:
- repository-s3
- repository-hdfs
- repository-azure
- repository-gcs
$ curl -XPUT 192.168.56.105:9200/_snapshot/es_backup/snapshot_1?wait_for_completion=true -d'
{
"indices": "restaurants,news,blog",
"ignore_unavailable": true,
"include_global_state": false
}'
快照
一个仓库可以包含同一个集群的多个快照。快照根据集群中的唯一名字进行区分。 在仓库 es_backup 里创建一个名为 snapshot_1 的快照可以通过下面的命令:
$ curl -XPUT localhost:9200/_snapshot/my_backup/snapshot_1?wait_for_completion=true
wait_for_completion 参数指定快照初始化(默认)后立即返回还是等待快照完成。在快照初始化期间,有关所有先前快照的信息会加载到内存中,这意味着在大型库中,即使将wait_for_completion参数设置为false,此命令也可能需要几秒钟(甚至几分钟)才能返回。
默认情况下,集群中所有打开和启动的索引是自动创建快照的。可以通过在快照请求里列出需要创建快照的索引。
$ curl -XPUT localhost:9200/_snapshot/es_backup/snapshot_1 -d'
{
"indices": "restaurnts,news,blog",
"ignore_unavailable": true,
"include_global_state": false
}'
- indices 参数指定快照包含的索引,这个参数支持同时配置多个索引
- ignore_unavailable 这个选项设置为 true 的时候在创建快照的过程中会忽略不存在的索引。默认情况下, 如果没有设置 ignore_unavailable 在索引不存在的情况下快照请求将会失败。
-
include_global_state 为 false 能够防止集群的全局状态被作为快照的一部分存储起来。默认情况下,如果快照中的1个或多个索引不是全部主分片都可用会导致整个创建快照的过程失败。 通过设置 partial 为 true 可以改变这个行为。
索引创建快照的过程是增量的。在给索引创建快照的过程中,Elasticsearch会分析存储在仓库中的索引文件并且只会复制那些自从上次快照 之后新建或有所更新的文件。这使得多个快照以一种紧凑的方式存储在同一个仓库里。
创建快照的过程是以非阻塞方式执行的。一个索引在创建快照的同时能够被检索和查询。尽管如此,快照保存的是在开始进行创建快照的那个时间点的索引的视图。
如果指定了仓库名字和快照id,这个命令就会返回这个快照的详细信息,甚至这个快照是不是正在运行。
$ curl -XGET "localhost:9200/_snapshot/es_backup/snapshot_1/_status"
装填 | 说明 |
---|---|
SUCCESS | 快照已经完成,所有的分片已经成功存储 |
FAILED | 快照出现错误,无法完成存储 |
PARTIAL | 全局的集群状态已存储,但至少有一个分片没有存储成功。在这种情况下,失败部分会包含未正确处理分片的详细信息。 |
INCOMPATIBLE | 快照是是用旧版本创建的,与当前集群的版本不兼容。 |
可以通过如下的命令将仓库里的某个快照删除:
$ curl -XDELETE 192.168.56.105:9200/_snapshot/es_backup/snap_shot3
从库中删除快照后,Elasticsearch会删除与快照相关联且未被其他任何快照使用的所有文件。如果在创建快照时执行了删除操作,则快照过程将中止,并且将清理快照过程中创建的所有文件。因此,删除快照操作可用于取消误启动后长时间运行的快照操作。
恢复
快照可以使用如下的操作来恢复:
$ curl -XPOST localhost:9200/_snapshot/my_backup/snapshot_1/_restore
默认情况下,快照中的所有索引以及集群状态都会被恢复。在恢复请求中可以通过 indices 来指定需要被恢复的索引,同样可以使用 include_global_state 选项来防止恢复集群的状态。indices 支持配置多个索引。rename_pattern 和 rename_replacement 选项可以在恢复的时候使用正则表达式来重命名index。
curl -XPOST 192.168.56.105:9200/_snapshot/es_backup/snapshot_5/_restore -d'{
"indices": "index_1, index_2",
"ignore_unavailable": true,
"include_global_state": false,
"rename_pattern": "index_(.+)",
"rename_replacement": "restored_indexd_$1"
}'
在还原过程中,可以覆盖大多数索引设置。例如,以下命令将在还原索引index_1而不创建任何副本:
curl -XPOST 192.168.56.105:9200/_snapshot/es_backup/snapshot_5/_restore -d'{
"indices": "index_1, index_2",
"index_settings": {
"index.number_of_replicas": 0
}
}'
Spring和Elasticsearch整合
Transport Client
TransportClient利用Transport模块远程连接一个Elasticsearch集群。它并不加入到集群中,只是简单的获得一个或者多个初始化的Transport地址,并以轮询的方式与这些地址进行通信。
Java High Level REST Client
Java High Level REST Client在Java Low Leve REST Client之上工作。它的主要目的是公开API的特定方法,这些方法接受请求对象作为参数并返回响应对象,因此请求编组和响应解编组由客户端本身处理。
每个API可以同步或异步进行调用。同步方法会返回一个响应对象,而名称以async后缀结尾的异步方法则需要一个监听器,一旦接收到响应或错误,监听器就会获得通知(基于低级客户端管理的线程池之上)。
Java High Level REST Client依赖Elasticsearch核心项目。它接受与TransportClient相同的请求参数,并返回相同的响应对象。
<!-- 所需的最低Java版本是1.8 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>5.6.16</version>
</dependency>
ElasticsearchRestClientConfig.java
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ElasticsearchRestClientConfig {
@Value("${elasticsearch.host}")
private String host;
@Value("${elasticsearch.http.port}")
private int port;
@Bean
public RestHighLevelClient restHighLevelClient() {
String[] hosts = host.split(",");
HttpHost[] httpHosts = new HttpHost[hosts.length];
for (int i = 0; i < hosts.length; i++) {
String addr = hosts[i];
httpHosts[i] = new HttpHost(addr, port);
}
return new RestHighLevelClient(RestClient.builder(httpHosts).build());
}
}
Java Low Level REST Client
低级客户端包括如下功能:
- 跨所有可用节点进行负载平衡
- 如果节点发生故障会根据特定响应代码进行故障转移
- 失败的连接惩罚(是否对失败的节点进行重试取决于连续失败的次数;-
失败尝试次数越多,客户端再次尝试该节点之前等待的时间越长) - 持久连接
- 跟踪并记录请求和响应
- 可选的群集节点自动发现
<!-- 所需的最低Java版本是1.7 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>5.6.16</version>
</dependency>
Spring-Data-Elasticsearch
Spring-data-elasticsearch是Spring提供的操作ElasticSearch的数据层,封装了大量的基础操作,通过它可以很方便的操作ElasticSearch的数据。
下表显示了Spring Data、Elasticsearch、Spring Data Elasticsearch,以及Spring Boot版本之间的对应关系:
Spring Data Elasticsearch | Elasticsearch | Spring Boot |
---|---|---|
3.2.x | 6.8.4 | 2.2.x |
3.1.x | 6.2.2 | 2.1.x |
3.0.x | 5.5.0 | 2.0.x |
2.1.x | 2.4.0 | 1.5.x |
ElasticsearchRepository
Spring-Data-Elasticsearch支持的关键字列表如下所示。
关键字 | 示例 | Spring Boot |
---|---|---|
And | findByNameAndPrice | {"bool" : {"must" : [ {"field" : {"name" : "?"}}, {"field" : {"price" : "?"}} ]}} |
Or | findByNameOrPrice | {"bool" : {"should" : [ {"field" : {"name" : "?"}}, {"field" : {"price" : "?"}} ]}} |
Is | findByName | {"bool" : {"must" : {"field" : {"name" : "?"}}}} |
Not | findByNameNot | {"bool" : {"must_not" : {"field" : {"name" : "?"}}}} |
Between | findByPriceBetween | {"bool" : {"must" : {"range" : {"price" : {"from" : ?,"to" : ?,"include_lower" : true,"include_upper" : true}}}}} |
LessThanEqual | findByPriceLessThan | {"bool" : {"must" : {"range" : {"price" : {"from" : null,"to" : ?,"include_lower" : true,"include_upper" : true}}}}} |
GreaterThanEqual | findByPriceGreaterThan | {"bool" : {"must" : {"range" : {"price" : {"from" : ?,"to" : null,"include_lower" : true,"include_upper" : true}}}}} |
Before | findByPriceBefore | {"bool" : {"must" : {"range" : {"price" : {"from" : null,"to" : ?,"include_lower" : true,"include_upper" : true}}}}} |
After | findByPriceAfter | {"bool" : {"must" : {"range" : {"price" : {"from" : ?,"to" : null,"include_lower" : true,"include_upper" : true}}}}} |
Like | findByNameLike | {"bool" : {"must" : {"field" : {"name" : {"query" : "?*","analyze_wildcard" : true}}}}} |
StartingWith | findByNameStartingWith | {"bool" : {"must" : {"field" : {"name" : {"query" : "?*","analyze_wildcard" : true}}}}} |
EndingWith | findByNameEndingWith | {"bool" : {"must" : {"field" : {"name" : {"query" : "*?","analyze_wildcard" : true}}}}} |
Contains/Containing | findByNameContaining | {"bool" : {"must" : {"field" : {"name" : {"query" : "?","analyze_wildcard" : true}}}}} |
In | findByNameIn(Collectionnames) | {"bool" : {"must" : {"bool" : {"should" : [ {"field" : {"name" : "?"}}, {"field" : {"name" : "?"}} ]}}}} |
NotIn | findByNameNotIn(Collectionnames) | {"bool" : {"must_not" : {"bool" : {"should" : {"field" : {"name" : "?"}}}}}} |
Near | findByStoreNear | Not Supported Yet ! |
True | findByAvailableTrue | {"bool" : {"must" : {"field" : {"available" : true}}}} |
False | findByAvailableFalse | {"bool" : {"must" : {"field" : {"available" : false}}}} |
OrderBy | findByAvailableTrueOrderByNameDesc | {"sort" : [{ "name" : {"order" : "desc"} }],"bool" : |
{"must" : {"field" : {"available" : true}}}} |
ElasticsearchTemplate
ElasticsearchRepository继承了ElasticsearchCrudRepository,而ElasticsearchCrudRepository又继承自PagingAndSortingRepository。ElasticSearchTemplate更多是对ElasticsearchRepository的补充,里面提供了一些更底层的方法。
代码部分
pom.xml
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>ai.yunxi</groupId>
<artifactId>vip-elasticsearch</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>vip-elasticsearch</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<elasticsearch.version>5.6.16</elasticsearch.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
application.properties
# spring-data-elasticsearch 方式
spring.data.elasticsearch.cluster-name=elasticsearch
spring.data.elasticsearch.cluster-nodes=192.168.56.105:9300
spring.data.elasticsearch.repositories.enabled=true
# transport client 方式
elasticsearch.clusterName=elasticsearch
elasticsearch.host=192.168.56.105
elasticsearch.port=9200
AccountRepository.java
import ai.yunxi.es.model.Account;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.annotations.Query;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
public interface AccountRepository extends ElasticsearchRepository<Account, Long> {
@Query("{\"bool\" : {\"must\" : {\"field\" : {\"name\" : \"?0\"}}}}")
Page<Account> findByName(String name, Pageable pageable);
Page<Account> findByAge(int age, Pageable pageable);
}
AccountServiceImpl.java
import ai.yunxi.es.config.ElasticsearchRestClientConfig;
import ai.yunxi.es.model.Account;
import ai.yunxi.es.repo.AccountRepository;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.ScrolledPage;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.SearchQuery;
import org.springframework.data.util.CloseableIterator;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
@Service
public class AccountServiceImpl implements AccountService {
@Autowired
private ElasticsearchRestClientConfig restClient;
@Autowired
private AccountRepository accountRepository;
@Autowired
private ElasticsearchTemplate elasticsearchTemplate;
@Override
public Optional<Account> findById(long id) {
return accountRepository.findById(id);
}
@Override
public Account save(Account account) {
return accountRepository.save(account);
}
@Override
public void delete(Account account) {
accountRepository.delete(account);
}
@Override
public Optional<Account> findOne(long id) {
return accountRepository.findById(id);
}
@Override
public List<Account> findAll() {
return (List<Account>) accountRepository.findAll();
}
@Override
public List<Account> findByName(String fieldName, String keyword) {
TermQueryBuilder termQueryBuilder = new TermQueryBuilder(fieldName, keyword);
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(termQueryBuilder).build();
return (List<Account>) elasticsearchTemplate.queryForList(searchQuery, Account.class);
}
@Override
public Page<Account> findByNameWithPage(String fieldName) {
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(matchAllQuery())
.withFields(fieldName)
.withPageable(PageRequest.of(20000, 10))
.withSort(new FieldSortBuilder(fieldName).order(SortOrder.DESC))
.build();
return elasticsearchTemplate.queryForPage(searchQuery, Account.class);
}
@Override
public List<Account> findByNameDeepth(String fieldName) {
// 浅分页
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(matchAllQuery())
.withFields(fieldName)
.withPageable(PageRequest.of(0, 10))
.build();
long scollTimeInMillis = 1000;
Page<Account> scroll = elasticsearchTemplate.startScroll(scollTimeInMillis, searchQuery, Account.class);
String scrollId = ((ScrolledPage) scroll).getScrollId();
List<Account> accounts = new ArrayList<>();
// scroll里面的内容就是类似Iterator(hasNext)
while (scroll.hasContent()) {
System.out.println(scroll.getContent());
accounts.addAll(scroll.getContent());
scrollId = ((ScrolledPage) scroll).getScrollId();
scroll = elasticsearchTemplate.continueScroll(scrollId, scollTimeInMillis, Account.class);
}
elasticsearchTemplate.clearScroll(scrollId);
return accounts;
}
public List<Account> findByStream() {
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(matchAllQuery())
.withFields("firstname")
.withPageable(PageRequest.of(0, 10))
.build();
CloseableIterator<Account> stream = elasticsearchTemplate.stream(searchQuery, Account.class);
List<Account> accounts = new ArrayList<>();
while (stream.hasNext()) {
accounts.add(stream.next());
}
return accounts;
}
@Override
public void findNameByRest() {
SearchRequest searchRequest = new SearchRequest("bank");
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
searchBuilder.query(QueryBuilders.matchAllQuery());
searchBuilder.from(0);
searchBuilder.size(10);
try {
SearchResponse response = restClient.restHighLevelClient().search(searchRequest);
SearchHit[] searchHits = response.getHits().getHits();
for (SearchHit hit : searchHits) {
System.out.println(hit.getSourceAsString());
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public Page<Account> findByName(String name, PageRequest pageRequest) {
return accountRepository.findByName(name, pageRequest);
}
@Override
public Page<Account> findByAge(int age, PageRequest pageRequest) {
return accountRepository.findByAge(age, pageRequest);
}
}
AccountController.java
import ai.yunxi.es.model.Account;
import ai.yunxi.es.service.AccountService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.math.BigDecimal;
import java.util.List;
import java.util.Optional;
@Controller
@RequestMapping("/account")
public class AccountController {
@Autowired
private AccountService accountService;
@RequestMapping("/{id}")
@ResponseBody
public Account getAccountById(@PathVariable int id) {
Optional<Account> opt = accountService.findById(id);
Account account = opt.get();
return account;
}
@RequestMapping("/{field}/{name}")
@ResponseBody
public List<Account> getAccountByName(@PathVariable String field, @PathVariable String name) {
List<Account> list = accountService.findByName(field, name);
return list;
}
@RequestMapping("/page/{field}")
@ResponseBody
public Page<Account> getByNameWithPage(@PathVariable String field) {
Page<Account> page = accountService.findByNameWithPage(field);
return page;
}
@RequestMapping("/deepth/{field}")
@ResponseBody
public List<Account> getByNameDeepth(@PathVariable String field) {
List<Account> list = accountService.findByNameDeepth(field);
return list;
}
@RequestMapping("/rest")
@ResponseBody
public void getByRestClient() {
accountService.findNameByRest();
}
@RequestMapping("/save")
@ResponseBody
public void Save() {
Account account = new Account(2001, 2001, "Micheal", "Jackson", 35, "M",
new BigDecimal("999999"), "test@126.com", "CA", "Belvoir",
"499 Laurel Avenue", "");
accountService.save(account);
}
}