spring cloud 12消息驱动

2019-06-25  本文已影响0人  西西_20f6

一、spring cloud stream
使用场景:消息驱动的微服务应用。
同步的方式:http也是一种消息,包含消息头和消息体,请求响应模型。
异步的方式:消息中间件
对比reactive streams
publisher
subscriber
processor既是消费者也是生产者
topic

二、主要概念
• 应⽤模型:中间件核心
• Binder 抽象
• 持久化 发布/订阅⽀持:MQ的基本特性
• 消费分组⽀持:MQ的基本特性
• 分区⽀持:Kafka

三、基本概念
• Source: Stream 发送源
• 近义词: Producer、 Publisher
• Sink: Stream 接收器
• 近义词: Consumer、 Subscriber
• Processor:既是Producer也是Consumer??
消息模型的组件都可以看作是管道

四、编程模型
• 激活 : @EnableBinding
• @Configuration
• @EnableIntegration
• Source:发送源
• @Output
• MessageChannel:发布消息
通过JMS规范,无论是源还是接受端,都是从中间件(Kafka,rabbitMQ)中寻址,所以我们的消息中间件就像一个路由器,路由到我们的目的地。发送者->中间件->订阅者。

五、编程模型
• Sink:接收器,订阅者
• @Input就会有SubscribableChannel,订阅管道,订阅某个消息。
一个SubscribableChannel能订阅几个topic?

三种监听topic的写法:
• SubscribableChannel:订阅管道
• @ServiceActivator:监听器
• @StreamListener:监听器

六、整合kafka


image.png

1,启动zookeeper,端口2181
进到zookeeper bin目录下
sh zkServer.sh start

2,Kafka:启动kafka
进到Kafka bin目录下
sh kafka-server-start.sh ../config/server.properties

3,修改UserApi下的User对象,增加序列化ID
private static final long serialVersionUID = 4050384138695906201L;

4,改造user-service-client,消息发送器(采用Kafka原生API)
(1)引入kafka依赖,这个是spring整合kafka的依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

(2)利用kafkaTemplate实现消息发送,spring的Template套路,JDBCTemplate,RedisTemplate,hibernateTemplate,RestTemplate,在UserServiceClientController添加如下代码:

    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Autowired
    public UserServiceClientController(KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }


    @PostMapping("/saveUserByMessage")
    public boolean save(@RequestBody User user) {

        //topic是string,传输对象user是Object类型
        ListenableFuture<SendResult<String, Object>> future =
                kafkaTemplate.send("sf-users",0,user);

        return future.isDone();
    }

(3)实现java 序列化器,com.xixi.spring.cloud12.server.user.ribbon.client.serializer.ObjectSerializer

package com.xixi.spring.cloud12.server.user.ribbon.client.serializer;

import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.Map;

/**将对象转化成字节数组
 * user 序列化器
 */
public class ObjectSerializer implements Serializer<Object> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    /**
     * user 对象序列化过程:User对象转成byte[](二进制字节流)
     * @param topic
     * @param data
     * @return
     */
    @Override
    public byte[] serialize(String topic, Object data) {

        System.out.println("topic : "+topic+",object "+data);

        byte[] objectArray = null;

        //构造ObjectOutputStream的时候必须要传一个OutputStream,所以先new一个ByteArrayOutputStream
        OutputStream outputStream = new ByteArrayOutputStream();


        try {
            ObjectOutputStream objectOutputStream =
                    new ObjectOutputStream(outputStream);

            objectOutputStream.writeObject(data);//将对象写入outputStream

            objectArray = ((ByteArrayOutputStream) outputStream).toByteArray();//将对象赋值给userArray


        } catch (IOException e) {
            e.printStackTrace();
        }


        return objectArray;
    }

    @Override
    public void close() {

    }
}

(4)顺序启动eureka,config server
(5)用kafka-console-consumer订阅一个topic
sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sf-users
然后我们客户端发送这个topic,kafka-console-consumer就能监控得到。

5,改造user-service-provider,消息接收器(采用Stream binder的方式)
(1)引入依赖:这个是spring-cloud-stream整合kafka的依赖,和上面那个不同哦。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

(2)定义用户消息接口

package com.xixi.spring.cloud12.service.stream;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

/**
 * 用户消息stream 接口
 *
 * sink,@Input,SubscribableChannel
 *
 *
 */
public interface UserMessage {

    @Input
    SubscribableChannel input();
}

(3)激活用户消息Stream接口

@EnableHystrix
@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(UserMessage.class)//激活stream binding 到UserMessage
public class UserServiceProviderApplication {

    public static void main(String[] args) {

        SpringApplication.run(UserServiceProviderApplication.class,args);
    }
}

(4)在application.properties中配置kafka以及stream destination

###spring cloud stream集成kafka配置
###spring cloud stream binding配置sink管道的topic
#spring.cloud.stream.bindings.${channelid}
###destination指定Kafka topic
spring.cloud.stream.bindings.input.destination = sf-users
##Kafka的配置:
# 配置消息中间件服务,Kafka默认端口9092,localhost:9092
spring.kafka.bootstrap-servers = localhost:9092
spring.kafka.consumer.group-id = sf-group
spring.kafka.consumer.clientId = user-service-provider

配置
• Source : spring.cloud.stream.bindings.{source}.* • Sink : spring.cloud.stream.bindings.{sink}.*

(5)用户消息监听器
1,第一种实现方式:通过SubscribableChannel实现

package com.xixi.spring.cloud12.service.provider.service;

import com.xixi.spring.cloud12.server.api.UserService;
import com.xixi.spring.cloud12.server.domain.User;
import com.xixi.spring.cloud12.service.stream.UserMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;

import static com.xixi.spring.cloud12.service.stream.UserMessage.INPUT;

/**
 * 用户消息-服务
 * 当接收到消息的时候,listen()和init()都是回调,是交替执行
 */
@Service
public class UserMessagingService {

    @Autowired
    private UserMessage userMessage;

    @Autowired
    @Qualifier("inMemoryUserService")
    private UserService userService;

    /**第一种实现方式:SubscribableChannel
     * 订阅者接收到消息以后存起来
     */
    @PostConstruct
    public void init(){
        SubscribableChannel subscribableChannel = userMessage.input();
        //消息订阅回调,消费消息
        subscribableChannel.subscribe(message -> {
            //这里的message body应该是字节流,因为我们从user-service-client传的user对象在序列化成字节流以后传过来的
            System.out.println(message);
            byte[] body = (byte[]) message.getPayload();//就是message body
            saveUser(body);
        });

    }




    private void saveUser(byte[] body){
        //反序列化后输出
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body);
        ObjectInputStream objectInputStream = null;
        try {
            objectInputStream = new ObjectInputStream(byteArrayInputStream);
            User user = (User) objectInputStream.readObject();
            System.out.println(user.getId()+","+user.getName());
            //保存user
            userService.saveUser(user);


        } catch (Exception e) {
            throw new RuntimeException();
        }

    }

}

2,第二种实现方式:
• @ServiceActivator:监听器

    //第二种实现方式
    //input是channel的ID
    @ServiceActivator(inputChannel = INPUT)
    public void listen(byte[] data){
        System.out.println("listen from @ServiceActivator...");
        saveUser(data);
    }

3,第三种实现方式:
• @StreamListener:监听器

    //第三种实现方式:
    @StreamListener(INPUT)
    public void onMessage(byte[] data){
        System.out.println("listen from @StreamListener...");
        saveUser(data);
    }

总结:
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.SubscribableChannel;
三个callback交替执行,机会均等。
这三种实现只用到了Kafka的destination,在配置文件中
spring.cloud.stream.bindings.user-message.destination = sf-users
spring cloud stream屏蔽了消息中间件的具体实现(这里是Kafka),将来如果你换成其他的中间件比如rabbitMQ,也不需要改太多的代码。
spring cloud stream的优势:解耦

七、整合rabbitMQ
1,改造user-service-client,消息发送器(采用Stream binder的方式,用消息管道进行消息发送)
(1)增加依赖org.springframework.cloud:spring-cloud-stream-binder-rabbit
(2)配置发送源管道

###spring cloud stream集成rabbit配置
###destination指定 rabbit topic,user-message-out是管道名称
spring.cloud.stream.bindings.user-message-out.destination = sf-users

(3)增加用户消息输出源stream接口

package com.xixi.spring.cloud12.server.user.ribbon.client.stream;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

import java.nio.channels.Channel;

/**
 * 用户消息 Stream接口
 */
public interface UserMessage {


    String OUTPUT= "user-message-out";

    @Output(value = OUTPUT)
    MessageChannel output();

}

(4)激活@EnableBinding

@EnableBinding(UserMessage.class)
public class UserServiceClientApplication

(5)发送请求到rabbitmq

@Autowired
    private UserMessage userMessage;
    @Autowired
    private ObjectMapper objectMapper;

    @PostMapping("/saveUserByRabbit")
    public boolean saveUserByRabbit(@RequestBody User user) throws JsonProcessingException {

        //topic是string,传输对象user是Object类型
        MessageChannel messageChannel = userMessage.output();
        //将user对象序列化成JSON
        String payload =objectMapper.writeValueAsString(user);
        GenericMessage<String> message = new GenericMessage<String>(payload);
        //发送消息到rabbitmq
        return messageChannel.send(message);
    }

(6)启动rabbitmq
./rabbitmq-server

重点
user-service-client的channel:user-message-out
user-service-provider的channel:user-message
因为它们监听的同一个topic,所以客户端发送的消息能够被user-service-provider监听到。

2,改造user-service-provider,消息接收器(采用Stream binder的方式)
(1)替换Kafka的依赖为rabbitmq,
依赖org.springframework.cloud:spring-cloud-stream-binder-rabbit

(2)配置文件无需修改

(3)修改UserMessagingService,因为这次传过来的消息是String,之前写的是byte[],所以加个处理方式。

@Autowired
    private ObjectMapper objectMapper;
    
    /**第一种实现方式:SubscribableChannel
     * 订阅者接收到消息以后存起来
     */
    @PostConstruct
    public void init()throws IOException {
        SubscribableChannel subscribableChannel = userMessage.input();
        //消息订阅回调,消费消息
        subscribableChannel.subscribe(message -> {
            //这里的message body应该是字节流,因为我们从user-service-client传的user对象。在序列化成字节流以后传过来的
            System.out.println("listen from subscribableChannel...");

            MessageHeaders messageHeaders = message.getHeaders();
            String contentType = messageHeaders.get("contentType",String.class);
            if ("text/plain".equals(contentType)){
                try {
                    saveUser2((String) message.getPayload());
                } catch (IOException e) {
                    throw new RuntimeException();
                }
            }else{
                byte[] body = (byte[]) message.getPayload();//就是message body
                saveUser(body);
            }
        });
    }

八、使用spring cloud stream,不需要关注你使用的是哪种中间件,只关注topic即可。强烈解耦!!!!我们的项目现在既有RPC调用,也有远程消息服务。远程消息服务适用于重要不紧急的功能,重要紧急就RPC调用,不重要也不紧急可以考虑定时任务。

九、问答
1,消息的一致性和实时性如何取舍?
消息是允许丢失的,与数据库有点不太一样,大多数消息中间件都有重试机制,同时消息不太强调实时性。比如聊天软件(spring boot聊天室websocket那节),看上去很快,其实不是实时的,是异步的。
消息中间件有持久化,相对于内存型(消费完就完事,不会存储),更加有保障,它会有一个持久化的过程。

2,spring cloud stream里面的消息分区是怎么实现的?
spring cloud stream消息分区依赖于消息中间件,比如说Kafka有分区,partition,
但是rabbitmq就没有分区。
比如数据库的分区,同一个表分别存在3个库里,比如,前10000号在库1,中间10000号在库2,最后10000在库3,根据偏移量分区。

一个请求过来的时候,从它的上下文中就应该知道它去访问哪个库,
oracle中每个库挂不同的磁盘,并行io,可提高读写的速度

3,如果我启动多个consumer进程,使用同样的consumer group进行实时聚合计算,动态增加consumer进程时,如何保证消费的消息仍在原来相同的consumer进程上?
队列模式:如果生产消费模型是点对点,producer A发出message A专门给consumer A用的,producer B专门给consumer B用的,consumer B是消费不到message A。
广播模式:如果是发布订阅者模式,一个消息有多个订阅者,一个producer A发出message A,有多个consumerA,consumerB,consumerC订阅者,大家都监听到这个消息,

上一篇下一篇

猜你喜欢

热点阅读