消息总线:Spring Cloud Bus

2020-05-28  本文已影响0人  睦月MTK

声明:
1.本节将会通过Spring Cloud Bus来将配置更新的事件进行发布,从而达到在更新配置后,使得所有服务都去更新配置的效果,由于配置中心集成在Eureka中,且会以Kafka作为Spring Cloud Bus的基础,所以本节将会使用Spring Cloud Netflix Eureka + Spring Cloud Config + Spring Cloud Bus + Spring Kafka来完成本节内容,Kafka也需要Zookeeper的环境基础,所以你还得整个Zookeeper。
2.入门级文档,更多内容会持续更新,不足之处,望不吝指点


一、Spring Cloud Bus介绍

Spring Cloud Bus就是一个消息总线,也就是一个广播,任何对象都可以接收这条总线上的任何广播消息,同样也可以发布消息出去。内部是使用Spring Cloud Stream来实现,也就是说Spring Cloud Bus不过是Spring Cloud Stream的一个广播性用法,主要用于在服务间共享事件,使得一个事件不单单只在一个服务上被处理,而是可以扩大到整个分布式应用上去。
目前Spring Cloud Bus支持RabbitMQKafka两种消息中间件。


二、Spring Cloud Bus自带事件

三、事件接收者

事件接收者指这个事件应当被哪个服务接收,这与广播机制并不冲突,就比如,我在广播中找“李四”,那么只有“李四”听到了消息应当回应,其他人其实也听得到,但是因为不是“李四”,所有没有必要回应罢了。
事件接收者和事件的发送者在Spring Cloud Bus中都由一串特殊的字符串构成,其格式为app:index:id,其中:
app指的是vcap.application.name或者是spring.application.name(写在前的优先级高)
indexvcap.application.instance_indexspring.application.indexlocal.server.portserver.port0
idvcap.application.instance_id或者是一个不重复的随机值
注:**是通配符
例如:service:**表示事件的接收者是叫service服务的所有实例


四、端点

Spring Cloud Bus一共开了4个端点,分别是/bus/refresh/bus/env/actuator/bus-refresh/actuator/bus-env,它们都只接受Post请求,后两者需要使用management.endpoints.web.exposure.include来开启。它们会分别触发RefreshRemoteApplicationEventEnvironmentChangeRemoteApplicationEvent事件。
附:
/actuator/bus-env可以接受一个Json格式的数据来进行环境的变更,其格式如下:

{
    "name": "key1",
    "value": "value1"
}

五、发布你的自定义事件

你肯定不满足只发布自带的那几个事件,你可能想发布自己的事件

public class TestRemoteEvent extends RemoteApplicationEvent {
    public TestRemoteEvent(){}
    public TestRemoteEvent(Object source, String originService, String destinationService){
        super(source , originService , destinationService);
    }
}

注意:事件的发送者和接受者都要有这个事件,唯一不同的是,发送者(如果不需要的话)可以不用注册该事件给Spring Cloud Bus


六、配置
#开启Spring Cloud Bus
spring.cloud.bus.enabled=true
#消息发送与接收的频道
spring.cloud.bus.destination=SpringCloudBus
#更多配置可以尝试spring.cloud.stream
#kafka使用者可以使用下列配置
spring.kafka.bootstrap-servers=localhost:9092

七、使用Spring Cloud Bus实现配置自动刷新功能
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
/**
 * @author  mtk
 * 针对WebHook的回调接口
 */
@RestController
@RequestMapping("/web-hook")
public class WebHookController {
    //自定义的Bus远端事件发布工具类
    private BusRemoteEventPublisher busRemoteEventPublisher;


    @Autowired
    public WebHookController(BusRemoteEventPublisher busRemoteEventPublisher){
        this.busRemoteEventPublisher = busRemoteEventPublisher;
    }

    /**
     * 针对Gitee的WebHook的回调接口
     * @param jsonInfo 回调数据
     * @return 简易的执行结果
     */
    @PostMapping("/refresh-config")
    public String refreshBus(@RequestBody Map<String,Object> jsonInfo){
        //解析json
        List<Object> commits;
        if((commits = (List<Object>) jsonInfo.get("commits")) != null){
            //获取修改的文件的文件名
            Set<String> modifiedFiles = new HashSet<>();
            commits.forEach(item -> {
                Map<String,Object> commit;
                if(item instanceof Map){
                    commit = (Map<String, Object>) item;
                    List<String> modified;
                    if((modified = (List<String>) commit.get("modified")) != null){
                        modifiedFiles.addAll(modified);
                    }
                }
            });
            //文件过滤
            //去除非配置文件
            List<String> modifiedSettingFileBaseNames = modifiedFiles.stream().filter(item -> {
                String e = FilenameUtils.getExtension(item);
                return "yaml".equals(e) || "yml".equals(e) || "properties".equals(e);
            }).map(FilenameUtils::getBaseName).collect(Collectors.toList());
            //是否变更了全局配置文件
            boolean isMatchGlobalEvent = modifiedSettingFileBaseNames.stream().anyMatch(item -> {
                if("application".equals(item)) {
                    busRemoteEventPublisher.publish(RefreshRemoteApplicationEvent.class, null);
                    return true;
                }
                return false;
            });
            if(isMatchGlobalEvent) return "refreshed all services";
            //对每个服务的刷新事件进行独立发布
            modifiedSettingFileBaseNames.forEach(item -> {
                busRemoteEventPublisher.publish(RefreshRemoteApplicationEvent.class , item+":**");
                refreshedService.add(item);
            });
            return "refreshed service: "+modifiedSettingFileBaseNames.toString();
        }
        //json格式错误
        return "error";
    }
}
@RestController
@RequestMapping("/hello")
@RefreshScope
public class HelloController {

    @Value("${cn.mtk.hello}")
    private String hello;

    @GetMapping("/ph")
    public String printHello(){
        return hello;
    }
}

如果你变更过远端仓库上的配置文件,并修改了cn.mtk.hello这一项配置,那么你将会在/hello/ph上看到更新后的结果

注意:如果你发现配置并没有刷新,但所有步骤都没有问题,那么你得考虑下是不是消费者没有正常连接到Kafka,你可以通过调整日志等级为来查看是否有隐藏掉的错误日志logging.level.root=DEBUG,或者开启一个Kafka消费者控制台来查看消息的发送情况(如果一切都是默认配置的话)kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic SpringCloudBus --partition 0,如果发现确实是Kafka问题,并且各种重启无效后,你可以尝试删除SpringCloudBus这个话题。

$ zkcli
$ rmr /brokers/topics/SpringCloudBus
$ quit

附:

/**
 * @author mtk
 * 便捷的bus远端事件发布工具
 */
public class BusRemoteEventPublisher {

    private ApplicationEventPublisher applicationEventPublisher;
    private BusProperties busProperties;

    public BusRemoteEventPublisher(ApplicationEventPublisher applicationEventPublisher , BusProperties busProperties){
        this.applicationEventPublisher = applicationEventPublisher;
        this.busProperties = busProperties;
    }

    public void publish(Class<? extends RemoteApplicationEvent> eventClass, String destinationService){
        try{
            RemoteApplicationEvent event = eventClass.getDeclaredConstructor(Object.class , String.class , String.class).newInstance(this , busProperties.getId() , destinationService);
            applicationEventPublisher.publishEvent(event);
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

参考文档:
[1] Spring Cloud Bus

上一篇 下一篇

猜你喜欢

热点阅读