Elasticsearch+Dubbo+Spring实践
1. 需求
因为公司一直用的是阿里的dubbo作为业务工程的RPC通信框架,我现在的任务是在mentor的指导下尝试重构公司的NLP服务,想把NLP的资源检索,单轮对话,多轮对话这些功能模块拆分成各自独立的服务。第一个尝试是优化问答系统中信息检索的过程,所以拿到的需求是调研用Elasticsearch来代替已有的资源检索性能会不会更好。这样也就产生了Elasticsearch+Dubbo+Spring这样奇葩的组合方式,毕竟Elasticsearch提倡的是用RESTful API交互来简化对数据的操作,而我要硬生生地用RPC的方式去调用。
想想都刺激
文章结构
- 需求
- Dubbo框架搭建
- Elasticsearch功能添加
- 源码
2. Dubbo框架搭建
环境准备
- Zookeeper注册中心安装,下载。
解压
cd your_zookeeper_path/conf
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
根据自己的需要修改配置文件,参见,这里Zookeeper的端口号采用默认配置2181。
- Zookeeper启动:
cd your_zookeeper_path
./bin/zkServer.sh start
- Zookeeper停止:
cd your_zookeeper_path
./bin/zkServer.sh stop
服务提供者Demo
- 在项目中添加Spring,Dubbo,Zookeeper的依赖:
<dependencies>
<!-- spring -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<classifier>jdk15</classifier>
<version>2.4</version>
</dependency>
<!-- dubbo -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
<version>2.5.3</version>
<exclusions>
<exclusion>
<artifactId>spring</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- zkclient -->
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
<!-- zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.3.6</version>
</dependency>
</dependencies>
- 新建服务提供者对外暴露的接口类:
public interface SearchService {
public String sayHello(String name);
}
- 实现服务的接口:
public class SearchServiceImpl implements SearchService{
public String sayHello(String name) {
System.out.println("received from remote: "+name);
return "Hello " + name;
}
}
- 用 Spring 配置声明暴露服务:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://code.alibabatech.com/schema/dubbo
http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<!-- 提供方应用信息,用于计算依赖关系 -->
<dubbo:application name="elastic-search"/>
<dubbo:registry protocol="zookeeper" address="127.0.0.1:2181" />
<!-- 用dubbo协议在20880端口暴露服务 -->
<dubbo:protocol name="dubbo" port="20880"/>
<!-- 声明需要暴露的服务接口 -->
<dubbo:service interface="com.yyz.elasticsearch.SearchService" ref="demoService" />
<!-- 和本地bean一样实现服务 -->
<bean id="demoService" class="com.yyz.elasticsearch.SearchServiceImpl" />
</beans>
- 启动服务提供者:
public class Starter {
public static void main(String[] args) throws IOException {
System.setProperty("java.net.preferIPv4Stack", "true");
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"server.xml"});
context.start();
System.in.read();
}
}
服务消费者Demo
- 在pom.xml中添加spring,dubbo,zookeeper的依赖:
<dependencies>
<!-- spring -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>1.5.1.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<classifier>jdk15</classifier>
<version>2.4</version>
</dependency>
<!-- dubbo -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
<version>2.5.3</version>
<exclusions>
<exclusion>
<artifactId>spring</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- zkclient -->
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
<!-- zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.3.6</version>
</dependency>
</dependencies>
2.通过 Spring 配置引用远程服务:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://code.alibabatech.com/schema/dubbo
http://code.alibabatech.com/schema/dubbo/dubbo.xsd
">
<!-- 消费方应用名,用于计算依赖关系,不是匹配条件,不要与提供方一样 -->
<dubbo:application name="demo-consumer"/>
<dubbo:registry protocol="zookeeper" address="127.0.0.1:2181" />
<!-- 生成远程服务代理,可以和本地bean一样使用demoService -->
<dubbo:reference id="demoService" check="false" interface="com.yyz.elasticsearch.SearchService"/>
</beans>
- 加载Spring配置,并调用远程服务:
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Client {
public static void main(String[] args) {
String configName = "client.xml";
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{configName});
context.start();
SearchService searchService = (SearchService) context.getBean("demoService");
while (true) {
try {
Thread.sleep(1000);
// 执行远程方法
String hello = searchService.sayHello("world");
// 显示调用结果
System.out.println( hello );
} catch (Throwable throwable) {
throwable.printStackTrace();
}
}
}
}
好啦,到这里用阿里的dubbo框架搭建的RPC通信的服务提供者和消费者已经完成,可以运行起来看看效果,记得先启动zookeeper注册中心。
3. Elasticsearch功能添加
下面我们往上面的Demo中添加操作Elasticsearch分布式搜索分析引擎的功能。
Elasticsearch安装
Elasticsearch是免安装的,下载后解压就好。关于下载版本的问题,我想发一个表情。
此处有好大一个坑。
sping data elasticsearch和elasticsearch的版本对应关系
因为一开始并不知道Spring data和elasticsearch之间有严格的版本对应关系,所以出了一堆莫名其妙的bug,这张对应表可以在这里看,虽然我下面用的是spring boot,但是同样有这个问题。
建议下载Elasticsearch2.4.0,下载后解压就算是安装完成了。
修改配置文件:
cd your_Elasticsearch
vi config/elasticsearch.yml
默认的配置,elasticsearch使用的HTTP端口是9200,TCP端口是9300.
启动Elasticsearch:
cd your_Elasticsearch/bin
./elasticsearch
Server Demo
- pom.xml中添加elasticsearch的依赖
<!-- elasticsearch -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
这时会产生netty包的冲突,解决:
<!-- dubbo -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
<version>2.5.3</version>
<exclusions>
<exclusion>
<artifactId>spring</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
<exclusion>
<artifactId>netty</artifactId>
<groupId>org.jboss.netty</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.3.6</version>
<exclusions>
<exclusion>
<artifactId>netty</artifactId>
<groupId>io.netty</groupId>
</exclusion>
</exclusions>
</dependency>
- 新建实体类:
@Document(indexName = "dialog", type = "qa", shards = 1, replicas = 0)
public class QA {
@Id
private int id;
private String Q;
@Field(type = FieldType.Nested)
private List<String> A;
public QA() {
}
public QA(int id, String q, List<String> a) {
this.id = id;
Q = q;
A = a;
}
public int getId() {
return id;
}
public String getQ() {
return Q;
}
public void setQ(String q) {
Q = q;
}
public List<String> getA() {
return A;
}
public void setA(List<String> a) {
A = a;
}
@Override
public String toString() {
return " QA { " +
" id = " + id + "," +
" Q = '" + Q + "'," +
" A = " + A +
'}';
}
}
- 新建Repository类:
public interface QARepository extends ElasticsearchRepository<QA,Long> {
}
- 修改server的接口类,添加操作elasticsearch的方法:
public interface SearchService {
public String sayHello(String name);
public String search(final String query);
}
- 修改service的实现,实现操作elasticsearch的方法:
@Service("demoService")
public class SearchServiceImpl implements SearchService {
@Autowired
private Client client;
@Autowired
private QARepository qARepository;
public String sayHello(String name) {
System.out.println("received from remote: "+name);
return "Hello " + name;
}
/**
* 插入一条数据
* @param qa
*/
public void addEntity(QA qa) {
qARepository.save(qa);
}
/**
* 查询
* @param query
* @return
*/
public String search(final String query) {
System.out.println("query: " + query);
QueryBuilder queryBuilder = QueryBuilders.matchQuery("q", query);
SearchResponse response = client.prepareSearch()
.setQuery(queryBuilder)
.addHighlightedField("Q")
.execute().actionGet();
SearchHit[] searchHitArr = response.getHits().getHits();
/**
* 遍历检索到的结果,这里可以自定义排序算法
*/
for (int i = 0; i < searchHitArr.length; ++i) {
SearchHit searchHit = searchHitArr[i];
System.out.print("index:"+searchHit.index()+" type:"+searchHit.getType()+" id: "+searchHit.getId()+" q:"+(String)searchHit.getSource().get("q"));
List l = (List)searchHit.getSource().get("a");
for (int j = 0;j<l.size();j++){
System.out.print(" a: "+l.get(j));
}
System.out.println("");
}
/**
* 返回第一个相关文档
*/
String result = null;
if(searchHitArr.length>0){
SearchHit searchHit = searchHitArr[0];
result = "index:"+searchHit.index()+" type:"+searchHit.getType()+" id: "+searchHit.getId()+" q:"+(String)searchHit.getSource().get("q");
List l = (List)searchHit.getSource().get("a");
for (int j = 0;j<l.size();j++){
result = result+ " a: "+l.get(j);
}
}
return result;
}
}
注意最上面的注解@service ,因为client和qARepository需要spring自动搜索注入。所以改成了注解的方式,不再在配置文件中显示的配置。
- spring配置文件的改变:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xmlns:elasticsearch="http://www.springframework.org/schema/data/elasticsearch"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://code.alibabatech.com/schema/dubbo
http://code.alibabatech.com/schema/dubbo/dubbo.xsd
http://www.springframework.org/schema/data/elasticsearch
http://www.springframework.org/schema/data/elasticsearch/spring-elasticsearch-1.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"
>
<!-- 提供方应用信息,用于计算依赖关系 -->
<dubbo:application name="elastic-search"/>
<!-- 配置注册中心 -->
<dubbo:registry protocol="zookeeper" address="127.0.0.1:2181" />
<!-- 用dubbo协议在20880端口暴露服务 -->
<dubbo:protocol name="dubbo" port="20880"/>
<!-- 配置elasticsearch 连接 -->
<elasticsearch:transport-client id="client" cluster-nodes="127.0.0.1:9300" cluster-name="elasticsearch"/>
<!-- spring data elasticsearch DAO 必须依赖 elasticsearchTemplate -->
<bean id="elasticsearchTemplate" class="org.springframework.data.elasticsearch.core.ElasticsearchTemplate">
<constructor-arg name="client" ref="client" />
</bean>
<!-- 扫描DAO包 自动创建实现 -->
<elasticsearch:repositories base-package="com.yyz.elasticsearch.dao" />
<!-- 扫描Service包 -->
<context:component-scan base-package="com.yyz.elasticsearch.service" />
<!-- 声明需要暴露的服务接口 -->
<dubbo:service interface="com.yyz.elasticsearch.SearchService" ref="demoService" />
</beans>
Client Demo
客户端需要改变的地方很小,只需要改变调用的远程方法即可。
public interface SearchService {
public String sayHello(String name);
public String search(final String query);
}
public class Client {
public static void main(String[] args) {
String configName = "client.xml";
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{configName});
context.start();
SearchService searchService = (SearchService) context.getBean("demoService");
while (true) {
try {
Thread.sleep(1000);
// 执行远程方法
String hello = searchService.search("你怕黑吗?");
// 显示调用结果
System.out.println( hello );
} catch (Throwable throwable) {
throwable.printStackTrace();
}
}
}
}
记得测试之前先启动Elasticsearch。
4. 源码
Demo已经上传到github,点击ElasticsearchDemo查看。
这篇文章没有详细介绍导入数据的部分,可直接在命令行用Elasticsearch的语法插入数据,也可以先用SearchServiceImpl的addEntity方法插入数据,下一篇文章会介绍Elasticsearch如何批量插入数据。