架构学习mq

SpringBoot--实战开发--MQTT消息推送(六十)

2019-09-30  本文已影响0人  无剑_君

一、MQTT简介

  MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和制动器(比如通过Twitter让房屋联网)的通信协议。

主要特征:
MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:
1.使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
2.对负载内容屏蔽的消息传输;
3.使用TCP/IP 提供网络连接;
4.有三种消息发布服务质量:
“至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
“至少一次”,确保消息到达,但消息重复可能会发生。
“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
5.小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;

二、Maven依赖

<!--web-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
<!--mqtt-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!--lombok-->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>

三、apollo服务器

  1. 配置文件(application.properties)
#MQTT配置信息
spring.mqtt.username=admin
spring.mqtt.password=password
## MQTT-服务器连接地址,如果有多个,用逗号隔开,
# 如:tcp://127.0.0.1:61613,tcp://192.168.2.133:61613
spring.mqtt.url=tcp://192.168.77.132:61613
## MQTT-连接服务器默认客户端ID
spring.mqtt.client.id=clientId
## MQTT-连接服务器默认服务端ID
spring.mqtt.server.id=serverId
## MQTT-默认的消息推送主题,实际可在调用接口时指定
spring.mqtt.default.topic=topic

  1. 消息发送配置类
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttSenderConfig {

    @Value("${spring.mqtt.username}")
    private String username;

    @Value("${spring.mqtt.password}")
    private String password;

    @Value("${spring.mqtt.url}")
    private String hostUrl;

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    @Value("${spring.mqtt.default.topic}")
    private String defaultTopic;

    /**
     * 连接设置
     * @return
     */
    @Bean
    public MqttConnectOptions getMqttConnectOptions(){
        MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{hostUrl});
        mqttConnectOptions.setKeepAliveInterval(2);
        return mqttConnectOptions;
    }

    /**
     * 客户端工厂
     * @return
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    /**
     * 发布通知
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =  new MqttPahoMessageHandler(clientId, mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultTopic);
        return messageHandler;
    }

    /**
     * 发布通道为直连
     * @return
     */
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
}
  1. 消息发送接口
/**
 * 消息发送接口
 */
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MsgWriter {
    void sendToMqtt(String data);
    void sendToMqtt(String payload,@Header(MqttHeaders.TOPIC) String topic);
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

消息发送接口,需要发送消息的时候直接调用就行了,提供了几个重载方法payload或者data是发送消息的内容
topic是消息发送的主题,这里可以自己灵活定义,也可以使用默认的主题,就是配置文件的主题,qos是mqtt 对消息处理的几种机制分为0,1,2 其中0表示的是订阅者没收到消息不会再次发送,消息会丢失,1表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息,2相比多了一次去重的动作,确保订阅者收到的消息有一次
当然,这三种模式下的性能肯定也不一样,qos=0是最好的,2是最差的 。

  1. 测试
@RestController
public class MqttController {
    @Autowired
    private MsgWriter msgWriter;
    @RequestMapping("/send")
    public String sendMqtt(String  sendData){
        msgWriter.sendToMqtt(sendData,"hello");
        return "OK";
    }
}

接口测试
Web查看
  1. 消息消费(本测试用的是同一项目,建议创建单独消费项目进行测试)
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttConsumersConfig {
//    订阅的主题可以指定,我订阅的是刚才发的too主题,还有订阅方的id 别和发送方的id 一样
    @Value("${spring.mqtt.server.id}")
    private String serverId;
    /**
     * 使用MqttSenderConfig中生成的工厂对象。
     * 如果单独服务器,请使用以下@Bean代码。
     */
    @Autowired
    private MqttPahoClientFactory mqttClientFactory;

//    @Bean
//    public MqttPahoClientFactory mqttClientFactory() {
//        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
//        factory.setServerURIs("tcp://localhost:61613");
//        factory.setUserName("admin");
//        factory.setPassword("password");
//        return factory;
//    }

    /**
     * consumer 订阅者监听消息
     * @return
     */
    @Bean
    public IntegrationFlow mqttInFlow() {
        return IntegrationFlows.from(mqttInbound())
                .transform(p -> p + ", 收到消息,来自MQTT")
                .handle(logger())
                .get();
    }

    /**
     * 处理日志
     * @return
     */
    private LoggingHandler logger() {
        LoggingHandler loggingHandler = new LoggingHandler("INFO");
        loggingHandler.setLoggerName("siSample");
        return loggingHandler;
    }

    /**
     * 订阅主题
     * @return
     */
    @Bean
    public MessageProducerSupport mqttInbound() {
        /**
         * 订阅的主题可以指定,我订阅的是刚才发的too主题,还有订阅方的id 别和发送方的id 一样
         */
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(serverId,
                mqttClientFactory, "hello");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        return adapter;
    }
}

注意:
主题名称生产者与消费者一定要对应,否则取不到消息 。

消费结果

四、EMQ服务器

  1. 配置
# MQTT-密码
# MQTT-服务器连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:1883,tcp://192.168.2.133:61613
spring.mqtt.url=tcp://192.168.77.132:1883
# MQTT-连接服务器默认客户端ID
spring.mqtt.client.id=mqttId
# MQTT-连接服务器默认服务端ID
spring.mqtt.server.id=serverId
# MQTT-默认的消息推送主题,实际可在调用接口时指定
spring.mqtt.default.topic=topic
spring.mqtt.username=admin
spring.mqtt.password=public

  1. 代码
    与apollo服务器相同。
  2. 测试结果
    测试方法同apollo服务器。


    测试结果
上一篇下一篇

猜你喜欢

热点阅读