Spring Cloud Bus 入门教程 补充

2020-03-07  本文已影响0人  Wind哥

说是入门教程,其实不是,真正的教程在此 https://www.jianshu.com/p/bab2da26f282,不知道作者使用什么版本,里面的例子最新版【Hoxton.SR1】走不通,本文只是例子存在的问题进行补充,同时,向该文作者致敬!

原文例子中Event的例子主要分为两部分
1.本应用发布,本应用内监听,这个是基于Spring的消息机制的,例子能走通
2.本应用发布,其他应用监听,例子走不通,原文下面的留言可证

鉴于大家反馈是本地发Event后本地能监听,其他应用不能监听,而本地和其他应用通过kafka监听,并且我们知道topic名字,检查日志目录得知文件夹springCloudBus-0是存在的(topic存在,也可以通过指令去确认),说明应用已经链接kafka,并且自动创建好topic了
于是直接通过命令行订阅

kafka-console-consumer.sh --topic springCloudBus --from-beginning --bootstrap-server localhost:9092

发现里面一条数据都没有,通过生成者指令进行输入,队列是通的

kafka-console-producer.bat --broker-list localhost:9092 --topic springCloudBus

那么kafka没问题,那就确认是源应用没成功发布出去了
提示:这里手工输入的消息,是不符合bus的格式要求的,会导致订阅的应用报错,需要清理掉,我本机做法是直接关掉kafka服务后删掉它的日志文件了事


从源应用入手

首先要知道的是:RemoteApplicationEvent(下称RemoteEvent) 是继承 ApplicationEvent(或者说相对于remote,可以理解成是local event,其实是spring内部的消息机制,为了清晰区分,下称LocalEvent)

RemoteEvent 的发布过程如下
1.创建 并且 发布 LocalEvent

//原文中的代码
protected void fireEvent(String eventAction, ProductDto productDto) {
    ProductEvent productEvent = new ProductEvent(productDto,
            ApplicationContextHolder.getApplicationContext().getId(), "*:**",
            eventAction, productDto.getItemCode());
    // 发布事件
    RemoteApplicationEventPublisher.publishEvent(productEvent);
}

这里其实是发布了一个ApplicationEvent(LocalEvent),在String的消息机制里流转

2.Bus监听了LocalEvent(左右继承自RemoteApplicationEvent的Event,目前还是在Local里流转)
3.判断是不是自己(本应用)发出的(isFromSelf)
4.如果是自己(本应用)发出的,发送到outboundChannel

//step 2,3,4的代码如下
@EventListener(classes = RemoteApplicationEvent.class)
public void acceptLocal(RemoteApplicationEvent event) {
  if (this.serviceMatcher.isFromSelf(event)
      && !(event instanceof AckRemoteApplicationEvent)) {   
      this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());
  }
}

原文中的问题点就出在这里了,判断isFromSelf时候可能是因为版本的原因匹配失败,导致跳过了发送代码

继续挖代码

public boolean isFromSelf(RemoteApplicationEvent event) {
  String originService = event.getOriginService();
  String serviceId = getServiceId();
  return this.matcher.match(originService, serviceId);
}

isFromSelf的判断其实是判断Event中的originService 和Bus自己的id是否一致,其中originService 原文使用的是applicationContext.id,而从ServiceMatcher 的初始化代码得知,serviceId是通过BusProperties获取的,而我本地调试时候两个获取到的id格式不一样;

//ServiceMatcher 初始化
@Bean
public ServiceMatcher serviceMatcher(@BusPathMatcher PathMatcher pathMatcher,
    BusProperties properties, Environment environment) {
  String[] configNames = environment.getProperty(CLOUD_CONFIG_NAME_PROPERTY,
      String[].class, new String[] {});
  ServiceMatcher serviceMatcher = new ServiceMatcher(pathMatcher,
      properties.getId(), configNames);
  return serviceMatcher;
} 

既然知道问题那就简单了,直接在fire事件时候用BusProperties.getId 代替原来的applicationContext.getId即可
调整后的代码如下

//调整后的代码
@autowired
BusProperties busProperties;//该注入方式是不推荐的,这里仅仅为了表达意思
protected void fireEvent(String eventAction, ProductDto productDto) {
    ProductEvent productEvent = new ProductEvent(productDto,
            busProperties.getId(), "*:**",
            eventAction, productDto.getItemCode());
    // 发布事件
    RemoteApplicationEventPublisher.publishEvent(productEvent);
}

现在是不是本文开始时候用命令行订阅的消费者,也读到数据了?

当然,还有另外一种做法是把bus的id设置成和spring的一致

#配置文件(具体值我没测试,要自己试试,我用的是上面那种方法)
spring.cloud.bus.id=${spring.application.name}-${server.port}

顺便提一下远端应用接受的过程
1.BusAutoConfiguration.acceptRemote,注意,这里是StreamListener,监听的是kafka的流
2.判断是不是发给自己的serviceMatcher.isForSelf
3.若上述条件符合,判断是不是自己发的serviceMatcher.isFromSelf,若是,跳过
从发送流程得知,如果自己发的,本来就在LocalEvent里流转了,若自己订阅也已经消费过一次了;而且如果这里重新入Local的话,会导致发布流程会重新触发,死循环了!
4.若上述条件符合,发布LocalEvent,丢进Spring的消息机制流转,监听了该Event的地方自然能收到信息

整体代码(简略版)如下

//(接受到)RemoteEvent 转 LocalEvent(进行消费)过程
@StreamListener(SpringCloudBusClient.INPUT)
public void acceptRemote(RemoteApplicationEvent event) {
    ......
    if (this.serviceMatcher.isForSelf(event)
            && this.applicationEventPublisher != null) {
        if (!this.serviceMatcher.isFromSelf(event)) {
            this.applicationEventPublisher.publishEvent(event);
        }
        ......
    }
    ......
}

另外,RemoteEvent需要序列化处理的(Json),而对于ApplicationEvent(LocalEvent)来说,source代表事件的触发者(而非数据),所以序列化时候是跳过的了

//声明了序列化时候跳过source
@JsonIgnoreProperties("source")
public abstract class RemoteApplicationEvent 

所以原文中直接用soucre传递数据是不妥的(估计以前可以,新版意识到source使用错误,修正了)
同时,因为涉及到json反序列化,默认构造函数是必须的!别和我一样犯傻!

private static final Object TRANSIENT_SOURCE = new Object();
...
protected RemoteApplicationEvent() {
    // for serialization libs like jackson
    this(TRANSIENT_SOURCE, null, null);
}

人家代码里已经说清楚了,默认构造函数是用来反序列化的,并且TRANSIENT_SOURCE也充分说明了不能用soucre传递数据:)

至此 整个流程接受,祝顺利调通

上一篇下一篇

猜你喜欢

热点阅读