3.rocketmq
2020-09-24 本文已影响0人
_少年不知愁
1.mq
消息中间件
2.特点
异步处理
流量肖锋填谷
解耦微服务
3.快速搭建单机模式
下载地址https://rocketmq.apache.org/dowloading/releases/
1.启动
nohup mqnamesrv &
2查看日志
logs/rocketmqlogs/namesrv.log
3.启动broker
nohup mqbroker -n localhost:9876 &
4.查看日志
logs/rocketmqlogs/broker.log
控制台搭建
1.下载
https://github.com/apache/rocketmq-externals
进入rocketmq-console
的application.properties
rocketmq.config.namesrvAddr=localhost:9876
rocketmq.config.isVIPChannel=false
2. mvn clean package -D skipTests
找到target的jar
然后直接
java -jar console-rocketmq.jar --server.port=8818 &
然后打开http://192.168.1.102:8080/
3.1 代码quick start
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
编写生产者
配置
rocketmq:
name-server: 192.168.0.109:9876
producer:
group: test-group
代码
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping
public void test() {
ShareDTO dto = new ShareDTO();
dto.setAuditStatus("youyou");
dto.setAuthor("summit");
rocketMQTemplate.convertAndSend("my-topic", dto);
}
编写消费者
配置
rocketmq:
name-server: 192.168.0.109:9876
代码
@Service
@RocketMQMessageListener(consumerGroup = "consumer-study", topic = "my-topic")
@Slf4j
public class RocketMqLister implements RocketMQListener<ShareDTO> {
@Override
public void onMessage(ShareDTO shareDTO) {
log.info("message ======= {},{}", shareDTO.getAuthor(), shareDTO.getAuditStatus());
}
}
4.spring cloud stream
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<version>0.9.0.RELEASE</version>
</dependency>
编写生产者
stream:
rocketmq:
binder:
name-server: 192.168.0.109:9876
bindings:
output:
destination: my-str-test-topic
加注解
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
@EnableBinding(Source.class)
public class ContentApplication {
public static void main(String[] args) {
SpringApplication.run(ContentApplication.class, args);
}
}
发送消息
@Autowired
private Source source;
@GetMapping("/send")
public String send() {
System.out.println("send =====================");
source.output().send(MessageBuilder.withPayload("message ti").build());
return "send ok";
}
编写消费者
配置文件
stream:
rocketmq:
binder:
name-server: 192.168.0.109:9876
bindings:
input:
destination: my-str-test-topic
group: my-steam-test
加注解
@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(Sink.class)
public class UserApplication {
public static void main(String[] args) {
SpringApplication.run(UserApplication.class, args);
}
}
@Service
public class TestStreamLister {
@StreamListener(Sink.INPUT)
public void recive(String message) {
System.out.println("=========" + message);
}
}
5.自定义生产者消费者
生产者
my_output:
destination: my-custom-topic
public interface CustomOutput {
String MY_OUTPUT = "my_output";
@Output(MY_OUTPUT)
MessageChannel output();
}
@EnableBinding({Source.class, CustomOutput.class})
@Autowired
private CustomOutput customOutput;
@GetMapping("/send2")
public Result<String> send2() {
System.out.println("send =====================");
customOutput.output().send(MessageBuilder.withPayload("message ti").build());
return Result.success("send ok");
}
消费者
my-input:
destination: my-custom-topic
group: my-custom-group
public interface CustomInput {
String MY_INPUT = "my-input";
@Input(MY_INPUT)
SubscribableChannel input();
}
@Service
@RocketMQMessageListener(consumerGroup = "my-custom-group", topic = "my-custom-topic")
@Slf4j
public class CustomRocketMqLister implements RocketMQListener<String> {
@Override
public void onMessage(String dto) {
log.info("custom input message ======= {}", dto);
}
}
@EnableBinding({Sink.class, CustomInput.class})