spring cloud 12消息驱动
一、spring cloud stream
使用场景:消息驱动的微服务应用。
同步的方式:http也是一种消息,包含消息头和消息体,请求响应模型。
异步的方式:消息中间件
对比reactive streams
publisher
subscriber
processor既是消费者也是生产者
topic
二、主要概念
• 应⽤模型:中间件核心
• Binder 抽象
• 持久化 发布/订阅⽀持:MQ的基本特性
• 消费分组⽀持:MQ的基本特性
• 分区⽀持:Kafka
三、基本概念
• Source: Stream 发送源
• 近义词: Producer、 Publisher
• Sink: Stream 接收器
• 近义词: Consumer、 Subscriber
• Processor:既是Producer也是Consumer??
消息模型的组件都可以看作是管道
四、编程模型
• 激活 : @EnableBinding
• @Configuration
• @EnableIntegration
• Source:发送源
• @Output
• MessageChannel:发布消息
通过JMS规范,无论是源还是接受端,都是从中间件(Kafka,rabbitMQ)中寻址,所以我们的消息中间件就像一个路由器,路由到我们的目的地。发送者->中间件->订阅者。
五、编程模型
• Sink:接收器,订阅者
• @Input就会有SubscribableChannel,订阅管道,订阅某个消息。
一个SubscribableChannel能订阅几个topic?
三种监听topic的写法:
• SubscribableChannel:订阅管道
• @ServiceActivator:监听器
• @StreamListener:监听器
六、整合kafka
image.png
1,启动zookeeper,端口2181
进到zookeeper bin目录下
sh zkServer.sh start
2,Kafka:启动kafka
进到Kafka bin目录下
sh kafka-server-start.sh ../config/server.properties
3,修改UserApi下的User对象,增加序列化ID
private static final long serialVersionUID = 4050384138695906201L;
4,改造user-service-client,消息发送器(采用Kafka原生API)
(1)引入kafka依赖,这个是spring整合kafka的依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
(2)利用kafkaTemplate实现消息发送,spring的Template套路,JDBCTemplate,RedisTemplate,hibernateTemplate,RestTemplate,在UserServiceClientController添加如下代码:
private final KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
public UserServiceClientController(KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@PostMapping("/saveUserByMessage")
public boolean save(@RequestBody User user) {
//topic是string,传输对象user是Object类型
ListenableFuture<SendResult<String, Object>> future =
kafkaTemplate.send("sf-users",0,user);
return future.isDone();
}
(3)实现java 序列化器,com.xixi.spring.cloud12.server.user.ribbon.client.serializer.ObjectSerializer
package com.xixi.spring.cloud12.server.user.ribbon.client.serializer;
import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.Map;
/**将对象转化成字节数组
* user 序列化器
*/
public class ObjectSerializer implements Serializer<Object> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
/**
* user 对象序列化过程:User对象转成byte[](二进制字节流)
* @param topic
* @param data
* @return
*/
@Override
public byte[] serialize(String topic, Object data) {
System.out.println("topic : "+topic+",object "+data);
byte[] objectArray = null;
//构造ObjectOutputStream的时候必须要传一个OutputStream,所以先new一个ByteArrayOutputStream
OutputStream outputStream = new ByteArrayOutputStream();
try {
ObjectOutputStream objectOutputStream =
new ObjectOutputStream(outputStream);
objectOutputStream.writeObject(data);//将对象写入outputStream
objectArray = ((ByteArrayOutputStream) outputStream).toByteArray();//将对象赋值给userArray
} catch (IOException e) {
e.printStackTrace();
}
return objectArray;
}
@Override
public void close() {
}
}
(4)顺序启动eureka,config server
(5)用kafka-console-consumer订阅一个topic
sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sf-users
然后我们客户端发送这个topic,kafka-console-consumer就能监控得到。
5,改造user-service-provider,消息接收器(采用Stream binder的方式)
(1)引入依赖:这个是spring-cloud-stream整合kafka的依赖,和上面那个不同哦。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
(2)定义用户消息接口
package com.xixi.spring.cloud12.service.stream;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
* 用户消息stream 接口
*
* sink,@Input,SubscribableChannel
*
*
*/
public interface UserMessage {
@Input
SubscribableChannel input();
}
(3)激活用户消息Stream接口
@EnableHystrix
@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(UserMessage.class)//激活stream binding 到UserMessage
public class UserServiceProviderApplication {
public static void main(String[] args) {
SpringApplication.run(UserServiceProviderApplication.class,args);
}
}
(4)在application.properties中配置kafka以及stream destination
###spring cloud stream集成kafka配置
###spring cloud stream binding配置sink管道的topic
#spring.cloud.stream.bindings.${channelid}
###destination指定Kafka topic
spring.cloud.stream.bindings.input.destination = sf-users
##Kafka的配置:
# 配置消息中间件服务,Kafka默认端口9092,localhost:9092
spring.kafka.bootstrap-servers = localhost:9092
spring.kafka.consumer.group-id = sf-group
spring.kafka.consumer.clientId = user-service-provider
配置
• Source : spring.cloud.stream.bindings.{sink}.*
(5)用户消息监听器
1,第一种实现方式:通过SubscribableChannel实现
package com.xixi.spring.cloud12.service.provider.service;
import com.xixi.spring.cloud12.server.api.UserService;
import com.xixi.spring.cloud12.server.domain.User;
import com.xixi.spring.cloud12.service.stream.UserMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import static com.xixi.spring.cloud12.service.stream.UserMessage.INPUT;
/**
* 用户消息-服务
* 当接收到消息的时候,listen()和init()都是回调,是交替执行
*/
@Service
public class UserMessagingService {
@Autowired
private UserMessage userMessage;
@Autowired
@Qualifier("inMemoryUserService")
private UserService userService;
/**第一种实现方式:SubscribableChannel
* 订阅者接收到消息以后存起来
*/
@PostConstruct
public void init(){
SubscribableChannel subscribableChannel = userMessage.input();
//消息订阅回调,消费消息
subscribableChannel.subscribe(message -> {
//这里的message body应该是字节流,因为我们从user-service-client传的user对象在序列化成字节流以后传过来的
System.out.println(message);
byte[] body = (byte[]) message.getPayload();//就是message body
saveUser(body);
});
}
private void saveUser(byte[] body){
//反序列化后输出
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body);
ObjectInputStream objectInputStream = null;
try {
objectInputStream = new ObjectInputStream(byteArrayInputStream);
User user = (User) objectInputStream.readObject();
System.out.println(user.getId()+","+user.getName());
//保存user
userService.saveUser(user);
} catch (Exception e) {
throw new RuntimeException();
}
}
}
2,第二种实现方式:
• @ServiceActivator:监听器
//第二种实现方式
//input是channel的ID
@ServiceActivator(inputChannel = INPUT)
public void listen(byte[] data){
System.out.println("listen from @ServiceActivator...");
saveUser(data);
}
3,第三种实现方式:
• @StreamListener:监听器
//第三种实现方式:
@StreamListener(INPUT)
public void onMessage(byte[] data){
System.out.println("listen from @StreamListener...");
saveUser(data);
}
总结:
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.SubscribableChannel;
三个callback交替执行,机会均等。
这三种实现只用到了Kafka的destination,在配置文件中
spring.cloud.stream.bindings.user-message.destination = sf-users
spring cloud stream屏蔽了消息中间件的具体实现(这里是Kafka),将来如果你换成其他的中间件比如rabbitMQ,也不需要改太多的代码。
spring cloud stream的优势:解耦
七、整合rabbitMQ
1,改造user-service-client,消息发送器(采用Stream binder的方式,用消息管道进行消息发送)
(1)增加依赖org.springframework.cloud:spring-cloud-stream-binder-rabbit
(2)配置发送源管道
###spring cloud stream集成rabbit配置
###destination指定 rabbit topic,user-message-out是管道名称
spring.cloud.stream.bindings.user-message-out.destination = sf-users
(3)增加用户消息输出源stream接口
package com.xixi.spring.cloud12.server.user.ribbon.client.stream;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import java.nio.channels.Channel;
/**
* 用户消息 Stream接口
*/
public interface UserMessage {
String OUTPUT= "user-message-out";
@Output(value = OUTPUT)
MessageChannel output();
}
(4)激活@EnableBinding
@EnableBinding(UserMessage.class)
public class UserServiceClientApplication
(5)发送请求到rabbitmq
@Autowired
private UserMessage userMessage;
@Autowired
private ObjectMapper objectMapper;
@PostMapping("/saveUserByRabbit")
public boolean saveUserByRabbit(@RequestBody User user) throws JsonProcessingException {
//topic是string,传输对象user是Object类型
MessageChannel messageChannel = userMessage.output();
//将user对象序列化成JSON
String payload =objectMapper.writeValueAsString(user);
GenericMessage<String> message = new GenericMessage<String>(payload);
//发送消息到rabbitmq
return messageChannel.send(message);
}
(6)启动rabbitmq
./rabbitmq-server
重点:
user-service-client的channel:user-message-out
user-service-provider的channel:user-message
因为它们监听的同一个topic,所以客户端发送的消息能够被user-service-provider监听到。
2,改造user-service-provider,消息接收器(采用Stream binder的方式)
(1)替换Kafka的依赖为rabbitmq,
依赖org.springframework.cloud:spring-cloud-stream-binder-rabbit
(2)配置文件无需修改
(3)修改UserMessagingService,因为这次传过来的消息是String,之前写的是byte[],所以加个处理方式。
@Autowired
private ObjectMapper objectMapper;
/**第一种实现方式:SubscribableChannel
* 订阅者接收到消息以后存起来
*/
@PostConstruct
public void init()throws IOException {
SubscribableChannel subscribableChannel = userMessage.input();
//消息订阅回调,消费消息
subscribableChannel.subscribe(message -> {
//这里的message body应该是字节流,因为我们从user-service-client传的user对象。在序列化成字节流以后传过来的
System.out.println("listen from subscribableChannel...");
MessageHeaders messageHeaders = message.getHeaders();
String contentType = messageHeaders.get("contentType",String.class);
if ("text/plain".equals(contentType)){
try {
saveUser2((String) message.getPayload());
} catch (IOException e) {
throw new RuntimeException();
}
}else{
byte[] body = (byte[]) message.getPayload();//就是message body
saveUser(body);
}
});
}
八、使用spring cloud stream,不需要关注你使用的是哪种中间件,只关注topic即可。强烈解耦!!!!我们的项目现在既有RPC调用,也有远程消息服务。远程消息服务适用于重要不紧急的功能,重要紧急就RPC调用,不重要也不紧急可以考虑定时任务。
九、问答
1,消息的一致性和实时性如何取舍?
消息是允许丢失的,与数据库有点不太一样,大多数消息中间件都有重试机制,同时消息不太强调实时性。比如聊天软件(spring boot聊天室websocket那节),看上去很快,其实不是实时的,是异步的。
消息中间件有持久化,相对于内存型(消费完就完事,不会存储),更加有保障,它会有一个持久化的过程。
2,spring cloud stream里面的消息分区是怎么实现的?
spring cloud stream消息分区依赖于消息中间件,比如说Kafka有分区,partition,
但是rabbitmq就没有分区。
比如数据库的分区,同一个表分别存在3个库里,比如,前10000号在库1,中间10000号在库2,最后10000在库3,根据偏移量分区。
一个请求过来的时候,从它的上下文中就应该知道它去访问哪个库,
oracle中每个库挂不同的磁盘,并行io,可提高读写的速度
3,如果我启动多个consumer进程,使用同样的consumer group进行实时聚合计算,动态增加consumer进程时,如何保证消费的消息仍在原来相同的consumer进程上?
队列模式:如果生产消费模型是点对点,producer A发出message A专门给consumer A用的,producer B专门给consumer B用的,consumer B是消费不到message A。
广播模式:如果是发布订阅者模式,一个消息有多个订阅者,一个producer A发出message A,有多个consumerA,consumerB,consumerC订阅者,大家都监听到这个消息,