RabbitMQ学习(八)与SpringCloudStream整

2018-11-30  本文已影响0人  kobe0429

一、Spring Cloud Stream介绍

Spring Cloud Stream是一个建立在Spring Boot和Spring Integration之上的框架,有助于创建事件驱动或消息驱动的微服务。在本文中,我们将通过一些简单的例子来介绍Spring Cloud Stream的概念和构造。
在 SpringBoot 的之中为了方便开发者去整合消息组件,也提供有一系列的处理支持,整合方式无非是在pom.xml中引入jar包,在配置文件定义好配置,然后通过@Autowired注解将RabbitTemplate引入,之后做业务处理即可。只是在 SpringCloud 里面将消息整合的处理操作进行了进一步的抽象操作, 实现了更加简化的消息处理。在整个 SpringCloud 之中支持有 RabbitMQ、Kafka 组件的消息系统。使得我们可以实现消息的生产端和消费端分别使用RabbitMQ、Kafka。

rmq与kafka.JPG
利用 SpringCloudStream 可以实现更加方便的消息系统的整合处理。SpringCloudStream 就是实现了 MDB 功能,同时可以更加简化方便的整合消息组件。

二、Spring Cloud Stream与RMQ整合原理
SpringCloudStream核心架构.JPG
说明:最底层是消息服务,中间层是绑定层,绑定层和底层的消息服务进行绑定,顶层是消息生产者和消息消费者,顶层可以向绑定层生产消息和和获取消息消费

三、Spring Cloud Stream与RMQ整合的代码实现

因为Spring Cloud为微服务架构,在领域设计中一般将消息的生产者和消费者分开,此处我们也按2个微服务项目来讨论整合步骤。

针对消息生产者:

1、在pom.xml中添加maven依赖

<dependency>
     <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
         <version>1.3.4.RELEASE</version>
</dependency>

2、在application.properties或application.yml中添加自定义配置

spring.cloud.stream.bindings.output_channel.destination=exchange-3
spring.cloud.stream.bindings.output_channel.group=queue-3
spring.cloud.stream.bindings.output_channel.binder=rabbit_cluster

spring.cloud.stream.binders.rabbit_cluster.type=rabbit
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=192.168.11.76:5672
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/

3、创建Barista.java类

package com.bfxy.rabbitmq.stream;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * <B>中文类名:</B><BR>
 * <B>概要说明:</B><BR>
 * 这里的Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。
 * 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
 */
public interface Barista {
    String OUTPUT_CHANNEL = "output_channel";  

    //注解@Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。这一名字与上述配置app2的配置文件中position1应该一致,表明注入了一个名字叫做input_channel的通道,它的类型是input,订阅的主题是position2处声明的mydest这个主题  

    //注解@Output声明了它是一个输出类型的通道,名字是output_channel。这一名字与app1中通道名一致,表明注入了一个名字为output_channel的通道,类型是output,发布的主题名为mydest。  
    @Output(Barista.OUTPUT_CHANNEL)
    MessageChannel logoutput();  

//  String INPUT_BASE = "queue-1";  
//  String OUTPUT_BASE = "queue-1";  
//  @Input(Barista.INPUT_BASE)  
//  SubscribableChannel input1();  
//  MessageChannel output1();  
      
}  

4、创建生产者发送消息的类
首先给类加上@EnableBinding(Barista.class),将我们之前创建的Barista类初始化,然后 @Autowired private Barista barista; 注入进来,最后写send方法的具体实现。

package com.bfxy.rabbitmq.stream;

import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@EnableBinding(Barista.class)
@Service  
public class RabbitmqSender {  
    @Autowired  
    private Barista barista;      
    // 发送消息
    public String sendMessage(Object message, Map<String, Object> properties) throws Exception {  
        try{
            MessageHeaders mhs = new MessageHeaders(properties);
            Message msg = MessageBuilder.createMessage(message, mhs);
            boolean sendStatus = barista.logoutput().send(msg);
            System.err.println("--------------sending -------------------");
            System.out.println("发送数据:" + message + ",sendStatus: " + sendStatus);
        }catch (Exception e){  
            System.err.println("-------------error-------------");
            e.printStackTrace();
            throw new RuntimeException(e.getMessage());       
        }  
        return null;
    }      
}  

针对消息的消费端

1、在pom.xml中添加maven依赖

<dependency>
     <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
         <version>1.3.4.RELEASE</version>
</dependency>

2、在application.properties或application.yml中添加自定义配置

spring.cloud.stream.bindings.input_channel.destination=exchange-3
spring.cloud.stream.bindings.input_channel.group=queue-3
spring.cloud.stream.bindings.input_channel.binder=rabbit_cluster
spring.cloud.stream.bindings.input_channel.consumer.concurrency=1
spring.cloud.stream.rabbit.bindings.input_channel.consumer.requeue-rejected=false
spring.cloud.stream.rabbit.bindings.input_channel.consumer.acknowledge-mode=MANUAL
spring.cloud.stream.rabbit.bindings.input_channel.consumer.recovery-interval=3000
spring.cloud.stream.rabbit.bindings.input_channel.consumer.durable-subscription=true
spring.cloud.stream.rabbit.bindings.input_channel.consumer.max-concurrency=5

spring.cloud.stream.binders.rabbit_cluster.type=rabbit
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=192.168.11.76:5672
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/

消费端有个性化设置,配置比生产端要多
3、创建Barista.java类

package com.bfxy.rabbitmq.stream;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

/**
 * <B>中文类名:</B><BR>
 * <B>概要说明:</B><BR>
 * 这里的Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。
 * 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
 * @author ashen(Alienware)
 */

public interface Barista {
      
    String INPUT_CHANNEL = "input_channel";  

    //注解@Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。这一名字与上述配置app2的配置文件中position1应该一致,表明注入了一个名字叫做input_channel的通道,它的类型是input,订阅的主题是position2处声明的mydest这个主题  
    @Input(Barista.INPUT_CHANNEL)  
    SubscribableChannel loginput();       
}  

4、创建消费者消费消息的类

package com.bfxy.rabbitmq.stream;

import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

import com.rabbitmq.client.Channel;


@EnableBinding(Barista.class)
@Service
public class RabbitmqReceiver {  

    @StreamListener(Barista.INPUT_CHANNEL)  
    public void receiver(Message message) throws Exception {  
        Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        System.out.println("Input Stream 1 接受数据:" + message);
        System.out.println("消费完毕------------");
        channel.basicAck(deliveryTag, false);
    }  
}  

最后新建一个测试类,

package com.bfxy.rabbitmq;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.apache.http.client.utils.DateUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.bfxy.rabbitmq.stream.RabbitmqSender;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

    @Autowired
    private RabbitmqSender rabbitmqSender;
    
    
    @Test
    public void sendMessageTest1() {
       for(int i = 0; i < 1; i ++){
           try {
               Map<String, Object> properties = new HashMap<String, Object>();
               properties.put("SERIAL_NUMBER", "12345");
               properties.put("BANK_NUMBER", "abc");
               properties.put("PLAT_SEND_TIME", DateUtils.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS"));
               rabbitmqSender.sendMessage("Hello, I am amqp sender num :" + i, properties);
              
           } catch (Exception e) {
               System.out.println("--------error-------");
               e.printStackTrace(); 
           }
       }
       //TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
    }
    
}

启动测试类,消费者和生产这款,查看生产者控制台发送数据成功,消费者控制台消费数据成功,rabbitmq管控台消息发送消费成功。


生产者日志信息.JPG
消费者日信息.JPG
RabbitMQ管控台.JPG
上一篇下一篇

猜你喜欢

热点阅读