Spring Cloud入门教程(十):消息总线(Bus)
Spring Cloud入门教程系列:
- Spring Cloud入门教程(一):服务治理(Eureka)
- Spring Cloud入门教程(二):客户端负载均衡(Ribbon)
- Spring Cloud入门教程(三):声明式服务调用(Feign)
- Spring Cloud入门教程(四):微服务容错保护(Hystrix)
- Spring Cloud入门教程(五):API服务网关(Zuul) 上
- Spring Cloud入门教程(六):API服务网关(Zuul) 下
- Spring Cloud入门教程(七):分布式链路跟踪(Sleuth)
- Spring Cloud入门教程(八):统一配置中心(Config)
- Spring Cloud入门教程(九):基于消息驱动开发(Stream)
在我们开始讲Spring Cloud Bus之前来看另外一个IT术语:ESB(Enterprise Service Bus)。ESB在维基百科中是这样描述的:
企业服务总线(Enterprise Service Bus,ESB)的概念是從服務導向架構(Service Oriented Architecture, SOA)發展而來。SOA描述了一种IT基础设施的应用集成模型;其中的软构件集是以一种定义清晰的层次化结构来相互耦合。一个ESB是一个预先组装的SOA实现,它包含了实现SOA分层目标所必需的基础功能部件。
在企业计算领域,企业服务总线是指由中间件基础设施产品技术实现的、 通过事件驱动和基于XML消息引擎,为更复杂的面向服务的架构提供的软件架构的构造物。企业服务总线通常在企业消息系统上提供一个抽象层,使得集成架构师能够不用编码而是利用消息的价值完成集成工作。
企业服务总线提供可靠消息传输,服务接入,协议转换,数据格式转换,基于内容的路由等功能,屏蔽了服务的物理位置,协议和数据格式。
其中,最重要的一句就是:企业服务总线通常在企业消息系统上提供一个抽象层,使得集成架构师能够不用编码而是利用消息的价值完成集成工作。 通俗一点来讲就是企业服务总线是架构在消息中间件之上的另外一个抽象层,使得我们可以不用关心消息相关的处理就可以完成业务逻辑的处理。
到这里你是不是有点突然明白Spring Cloud Bus 和 Spring Cloud Stream之间的关系了,刚开始接触这两个组件时,大部分都会迷惑到底这两者有什么区别?它们又有什么联系?Stream通过对消息中间件进行抽象封装,提供一个统一的接口供我们发送和监听消息,而Bus则是在Stream基础之上再次进行抽象封装,使得我们可以在不用理解消息发送、监听等概念的基础上使用消息来完成业务逻辑的处理。
那么Spring Cloud Bus是如何为我们实现的呢?一句话概括就是事件机制。
1. Spring的事件机制
在Spring框架中有一个事件机制,该机制是一个观察者模式的实现。观察者模式建立一种对象与对象之间的依赖关系,当一个对象(称之为:观察目标)发生改变时将自动通知其它对象(称之为:观察者),这些观察者将做出相应的反应。一个观察目标可以对应多个观察者,而且这些观察者之间没有相互联系,可以根据需要增加和删除观察者,使得系统更易于扩展。通过Spring事件机制可以达到如下目的:
- 应用模块之间的解耦;
- 对同一种事件可以根据需要定义多种处理方式;
- 对主线应用不干扰,是一个极佳的开闭原则(OCP)实践。
当我们在应用中引入事件机制时需要借助Spring中以下接口或抽象类:
- ApplicationEventPublisher: 这是一个接口,用来发布一个事件;
- ApplicationEvent: 这是一个抽象类,用来定义一个事件;
- ApplicationListener<E extends ApplicationEvent>: 这是一个接口,实现事件的监听。
其中Spring应用的上下文ApplicationContext
默认是实现了ApplicationEventPublisher
接口,因此在发布事件时我们可以直接使用ApplicationContext.publishEvent()
方法来发送。
一个典型的Spring事件发送与监听代码如下。
1.1 定义事件
比如,我们定义一个用户事件:
/**
* 用户事件
*
* @author CD826(CD826Dong@gmail.com)
* @since 1.0.0
*/
public class UserEvent extends ApplicationEvent {
/** 消息类型:更新用户,值为: {@value} */
public static final String ET_UPDATE = "update";
// ========================================================================
// fields =================================================================
private String action;
private User user;
// ========================================================================
// constructor ============================================================
public UserEvent(User user) {
super(user);
this.user = user;
}
public UserEvent(User user, String action) {
super(user);
this.action = action;
this.user = user;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("action", this.getAction())
.add("user", this.getUser()).toString();
}
// ==================================================================
// setter/getter ====================================================
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public User getUser() {
return user;
}
public void setUser(User user) {
this.user = user;
}
}
1.2 定义监听
我们定义一个用户事件监听器,当用户变更时做相应处理:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
/**
* 用户事件监听
*
* @author CD826(CD826Dong@gmail.com)
* @since 1.0.0
*/
@Component
public class UserEventListener implements ApplicationListener<UserEvent> {
protected Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void onApplicationEvent(UserEvent userEvent) {
this.logger.debug("收到用户事件:{} ", userEvent);
// TODO: 实现具体的业务处理
}
}
用户事件监听比较简单,只需要实现ApplicationListener
接口,进行相应处理即可。
1.3 发送消息
发送消息比较简单,我们也可以直接在Event中实现,比如我们将上面UserEvent
更改为如下:
/**
* 用户事件
*
* @author CD826(CD826Dong@gmail.com)
* @since 1.0.0
*/
public class UserEvent extends ApplicationEvent {
// 省略了之前的代码
/**
* 发布事件
*/
public void fire() {
ApplicationContext context = ApplicationContextHolder.getApplicationContext();
if(null != context) {
logger.debug("发布事件:{}", this);
context.publishEvent(this);
}else{
logger.warn("无法获取到当前Spring上下文信息,不能够发布事件");
}
}
}
那么我们就可以在需要的地方通过下面的代码来发布事件了:
new UserEvent(user, UserEvent.ET_UPDATE).fire();
2. Spring Cloud Bus机制
我们上面了解了Spring的事件机制,那么Spring Cloud Bus又是如何将事件机制和Stream结合在一起的呢?总起来说机制如下:
- 在需要发布或者监听事件的应用中增加
@RemoteApplicationEventScan
注解,通过该注解就可以启动Stream中所说的消息通道的绑定; - 对于事件发布,则需要继承
ApplicationEvent
的扩展类 --RemoteApplicationEvent
,当通过ApplicationContext.publishEvent()
发布此种类型的事件时,Spring Cloud Bus就会对所要发布的事件进行包装,形成一个我们所熟知的消息,然后通过默认的springCloudBus
消息通道发送到消息中间件; - 对于事件监听者则不需要进行任何变更,仍旧按照上面的方式就可以实现消息的监听。但,需要注意的一点就是在消费的微服务工程中也必须定义第2步所定义的事件,并且需要保障全类名一致(如果不一致,则需要做一点工作)。
嗯,就是这么简单。通过Bus我们就可以像编写单体架构应用一样进行开发,而不需要关系什么消息中间件、主题、消息、通道呀等等一大堆概念。
你也行在怀疑,是不是这么简单呀。那好,让我们来看看是不是很容易就可以实现Stream中示例。
3. 重构Spring Cloud Stream中的示例
3.1 重构商品微服务
3.1.1 增加对Bus的依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
3.1.2 构建商品事件
我们将原来商品配置变更所发送的消息更改为一个事件,代码如下:
package io.twostepsfromjava.cloud.bus;
import com.google.common.base.MoreObjects;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
/**
* 商品事件
*
* @author CD826(CD826Dong@gmail.com)
* @since 1.0.0
*/
public class ProductEvent extends RemoteApplicationEvent {
/** 消息类型:更新商品,值为: {@value} */
public static final String ET_UPDATE = "update";
/** 消息类型:删除商品,值为: {@value} */
public static final String ET_DELETE = "delete";
// ========================================================================
// fields =================================================================
private String action;
private String itemCode;
// ========================================================================
// constructor ============================================================
public ProductEvent() {
super();
}
public ProductEvent(Object source, String originService, String destinationService, String action, String itemCode) {
super(source, originService, destinationService);
this.action = action;
this.itemCode = itemCode;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("action", this.getAction())
.add("itemCode", this.getItemCode()).toString();
}
// ==================================================================
// setter/getter ====================================================
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public String getItemCode() {
return itemCode;
}
public void setItemCode(String itemCode) {
this.itemCode = itemCode;
}
}
这里和之前事件构建函数不同的是:在构建一个事件时需要指定originService
和destinationService
。对于事件发布者来说originService
就是自己,而destinationService
则是指将事件发布到那些微服务实例。destinationService
配置的格式为:{serviceId}:{appContextId}
,在配置时serviceId
和appContextId
可以使用通配符,如果这两个变量都使用通配符的话(*:**
),则事件将发布到所有的微服务实例。如只省略appContextId
,则事件只会发布给指定微服务的所有实例,如:userservice:**
,则只会将事件发布给userservice
微服务。
3.1.3 实现事件发布
我们将商品微服务中商品变更中的代码修改为如下:
package io.twostepsfromjava.cloud.product.service;
import io.twostepsfromjava.cloud.bus.ProductEvent;
import io.twostepsfromjava.cloud.product.dto.ProductDto;
import io.twostepsfromjava.cloud.product.mq.ProductMsg;
import io.twostepsfromjava.cloud.product.util.ApplicationContextHolder;
import io.twostepsfromjava.cloud.product.util.RemoteApplicationEventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
/**
* 商品服务
*
* @author CD826(CD826Dong@gmail.com)
* @since 1.0.0
*/
@Service
public class ProductService {
protected Logger logger = LoggerFactory.getLogger(ProductService.class);
private List<ProductDto> productList;
@Autowired
public ProductService() {
this.productList = this.buildProducts();
}
// 省略了不相干的代码
/**
* 保存或更新商品信息
* @param productDto
* @return
*/
public ProductDto save(ProductDto productDto) {
// TODO: 实现商品保存处理
for (ProductDto sourceProductDto : this.productList) {
if (sourceProductDto.getItemCode().equalsIgnoreCase(productDto.getItemCode())) {
sourceProductDto.setName(sourceProductDto.getName() + "-new");
sourceProductDto.setPrice(sourceProductDto.getPrice() + 100);
productDto = sourceProductDto;
break;
}
}
// 发送商品消息
// this.sendMsg(ProductMsg.MA_UPDATE, productDto.getItemCode());
// 发布商品变更消息
this.fireEvent(ProductEvent.ET_UPDATE, productDto);
return productDto;
}
// 这里已不再使用该方法
protected void sendMsg(String msgAction, String itemCode) {
ProductMsg productMsg = new ProductMsg(msgAction, itemCode);
this.logger.debug("发送商品消息:{} ", productMsg);
// 发送消息
// this.source.output().send(MessageBuilder.withPayload(productMsg).build());
}
protected void fireEvent(String eventAction, ProductDto productDto) {
ProductEvent productEvent = new ProductEvent(productDto,
ApplicationContextHolder.getApplicationContext().getId(), "*:**",
eventAction, productDto.getItemCode());
// 发布事件
RemoteApplicationEventPublisher.publishEvent(productEvent);
}
}
其中RemoteApplicationEventPublisher
的源码如下:
package io.twostepsfromjava.cloud.product.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.springframework.context.ApplicationContext;
/**
* 远程事件发布者
*
* @author CD826(CD826Dong@gmail.com)
* @since 1.0.0
*/
public class RemoteApplicationEventPublisher {
protected static Logger logger = LoggerFactory.getLogger(RemoteApplicationEventPublisher.class);
/**
* 发布一个事件
* @param event
*/
public static void publishEvent(RemoteApplicationEvent event){
ApplicationContext context = ApplicationContextHolder.getApplicationContext();
if(null != context) {
context.publishEvent(event);
logger.debug("已发布事件:{}", event);
}else{
logger.warn("无法获取到当前Spring上下文信息,不能够发布事件");
}
}
}
3.1.4 开启远程消息扫描
最后,修改微服务启动类,添加@RemoteApplicationEventScan
注解:
package io.twostepsfromjava.cloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.bus.jackson.RemoteApplicationEventScan;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
/**
* TwoStepsFromJava Cloud -- ProductDto Service 服务器
*
* @author CD826(CD826Dong@gmail.com)
* @since 1.0.0
*/
@EnableDiscoveryClient
@RemoteApplicationEventScan
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
注意: 这里再次声明,远程事件必须定义在
@RemoteApplicationEventScan
注解所注解类的子包中,否则无法实现远程事件发布。
到这里我们的商品微服务重构就完成了。下面接着对Mall-Web微服务进行修改。
3.2 重构Mall-Web微服务
3.2.1 增加对Bus依赖
和商品微服务一样,就不重复了。
3.2.2 拷贝ProductEvent到本项目
呃,这个就不描述了。
3.2.3 实现事件监听处理
这个代码非常简单,不多说,具体如下:
package io.twostepsfromjava.cloud.web.mall.service;
import io.twostepsfromjava.cloud.bus.ProductEvent;
import io.twostepsfromjava.cloud.web.mall.dto.ProductDto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
/**
* 远程事件监听
*
* @author CD826(CD826Dong@gmail.com)
* @since 1.0.0
*/
@Component
public class ProductEventListener implements ApplicationListener<ProductEvent> {
protected Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
protected ProductService productService;
@Override
public void onApplicationEvent(ProductEvent productEvent) {
if (ProductEvent.ET_UPDATE.equalsIgnoreCase(productEvent.getAction())) {
this.logger.debug("Web微服务收到商品变更事件,商品货号: {}", productEvent.getItemCode());
// 重新获取该商品信息
ProductDto productDto = this.productService.loadByItemCode(productEvent.getItemCode());
if (null != productDto)
this.logger.debug("重新获取到的商品信息为:{}", productDto);
else
this.logger.debug("货号为:{} 的商品不存在", productEvent.getItemCode());
} else if (ProductEvent.ET_DELETE.equalsIgnoreCase(productEvent.getAction())) {
this.logger.debug("Web微服务收到商品删除事件,所要删除商品货号为: {}", productEvent.getItemCode());
} else {
this.logger.debug("Web微服务收到未知商品事件: {}", productEvent);
}
}
}
3.2.3 开启远程消息扫描
和商品微服务一样,不论是事件的发布还是事件的监听都需要开启远程消息扫描。直接在微服务引导类中增加@RemoteApplicationEventScan
注解即可。
3.3 测试
我们的重构到此就全部完成了,下面依次分别启动:
- Kafka服务器;
- 服务治理服务器: Service-discovery;
- 商品微服务: Product-Service;
- Mall-Web微服务。
然后,使用Postman访问原来的消息测试端点: http://localhost:2100/products/item-2。在商品微服务的控制台,可以看到类似下面输出:
商品微服务控制台输出从输出日志中可以看到商品事件已经发布出去。如果这个时候我们查看Mall-Web微服务的控制台,可以看到下图的输出:
Mall-Web微服务控制台输出从日志输出中可以看到Mall-Web微服务已经能够正确接收到商品变更事件,并进行相应的处理。
3.4 小结
从重构后的代码来说的确使用Bus会更容易理解,也更容易上手。这对于当使用场合比较简单会非常好,比如:广播。典型的应用就是Config中的配置刷新,当在项目中同时引入了Config和Bus时,就可以通过/bus/refresh
端点实现配置更改的广播,从而让相应的微服务重新加载配置数据。
当然,Bus简便性的另外一层含义就是不够灵活,因此具体是在项目中使用Bug还是直接使用Stream就看你的需要了,总起来一句就是:够用就好。
你可以到这里下载本篇的代码。