国信证券RocketMQ+SpringBoot在配置热加载的实践

2019-11-26  本文已影响0人  邓启翔

  在进行微服务架构研发的过程中,不少研发人员都提出配置热加载的需求,一方面是为了提升研发效率,另一方面是为了线上紧急事件,可以快速恢复。于是我采用rocketmq消息队列,实现一套解耦的配置热加载机制。这里我将思路简单介绍下,希望大家能通过这篇文章,实现自己的配置热加载。

1 设计方案

image.png

1、配置中心修改配置后,将修改的配置发送到rocketmq
2、微服务启动时,监控配置修改的topic,发现自身配置已修改,则发起配置热加载
3、服务需要设置热加载开关,否则配置误修改可能带来

1.1 RocketMQ消息设计

topic tag body
confCenter confModify 服务名称和配置内容

采用spring的event机制处理消息消息消费

1.2 RocketmqEvent代码

public class RocketmqEvent extends ApplicationEvent {
   private static final long serialVersionUID = -4468405250074063206L;
   private DefaultMQPushConsumer consumer;
   private List<MessageExt> msgs;

   public RocketmqEvent(List<MessageExt> msgs, DefaultMQPushConsumer consumer) throws Exception {
       super(msgs);
       this.consumer = consumer;
       this.setMsgs(msgs);
   }

   public String getMsg(int idx) {
       try {
           return new String(getMsgs().get(idx).getBody(), "utf-8");
       } catch (UnsupportedEncodingException e) {
           return null;
       }
   }

   public String getMsg(int idx,String code) {
       try {
           return new String(getMsgs().get(idx).getBody(), code);
       } catch (UnsupportedEncodingException e) {
           return null;
       }
   }

   public DefaultMQPushConsumer getConsumer() {
       return consumer;
   }

   public void setConsumer(DefaultMQPushConsumer consumer) {
       this.consumer = consumer;
   }

   public MessageExt getMessageExt(int idx) {
       return getMsgs().get(idx);
   }


   public String getTopic(int idx) {
       return getMsgs().get(idx).getTopic();
   }


   public String getTag(int idx) {
       return getMsgs().get(idx).getTags();
   }


   public byte[] getBody(int idx) {
       return getMsgs().get(idx).getBody();
   }


   public String getKeys(int idx) {
       return getMsgs().get(idx).getKeys();
   }

   public List<MessageExt> getMsgs() {
       return msgs;
   }

   public void setMsgs(List<MessageExt> msgs) {
       this.msgs = msgs;
   }
}

1.3 RocketMQ的message的Event处理

@Autowired
ApplicationEventPublisher publisher 
publisher.publishEvent(new RocketmqEvent(msgs, consumer));

2 spring配置热加载

微服务端收到消息后,采用spring的RefreshScope机制进行配置热加载
1、需要注入RefreshScope和ConfigurableWebApplicationContext两个对象

ConfigurableWebApplicationContext applicationContext;\\采用ApplicationContextAware机制进行注入
@Autowired
private RefreshScope refreshScope = null;

2、解析RocketMQ的message,然后进行配置刷新

@EventListener
   public void envListener(RocketmqEvent event) {
       event.getMsgs().forEach(msg -> {
           String body = new String(msg.getBody());
           JSONObject json = JSON.parseObject(body);
           MutablePropertySources target = applicationContext.getEnvironment().getPropertySources();//获取Sources
           for (PropertySource<?> source : target) {
               if ("defaultProperties".equals(source.getName())) {//对默认配置进行刷新
                   @SuppressWarnings("unchecked")
                   Map<String, String> map = (Map<String, String>) source.getSource();
                   Map<String, String> after = JSONObject.parseObject(json.toJSONString(),
                           new TypeReference<Map<String, String>>() {
                           });
                   map.putAll(after);
                   PropertiesContent.refleshAll(after);
                   this.refreshScope.refreshAll();
                   return;
               }
           }
       });
   }

最后,这里只是简单介绍了配置热加载的思路,当然还有很多细节需要思考,如果有兴趣的可以一起交流交流。

上一篇下一篇

猜你喜欢

热点阅读