RocketMQ消费者

2020-08-26  本文已影响0人  杨健kimyeung

官方文档

概要

MQ中Pull和Push的两种消费方式

Push方式:由消息中间件主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常;

Pull方式:由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息;采用Pull方式,如何设置Pull消息的频率需要重点去考虑,举个例子来说,可能1分钟内连续来了1000条消息,然后2小时内没有新消息产生(概括起来说就是“消息延迟与忙等待”)。如果每次Pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大;若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能;

消费模式:有序消费和并发消费。

有序消费模式是按照消息的顺序进行消费,并发消费的消费速度要比有序消费更快。

消息模式: Clustering 和Broadcasting

CLUSTERING:同组里的每个Consumer 只消费所订阅消息的一部分内容。

BROADCASTING:同组里的每个Consumer 消费所订阅消息的全部内容

核心注解

<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n24" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {
// 指定consumerGroup
String consumerGroup();
// 指定消费的topic
String topic();
// 指定消费过滤方式: TAG
SelectorType selectorType() default SelectorType.TAG;
// 根据过滤方式,定义选择表达式
String selectorExpress() default "*";
// 消费方式:并发,顺序
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
// 消费模式: 集群, 广播
MessageModel messageModel() default MessageModel.CLUSTERING;
//消费的并发线程数
int consumeThreadMax() default 64;
}</pre>

使用

application.yml配置

<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="yaml" cid="n27" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">rocketmq:
name-server: 127.0.0.1:9876</pre>

添加监听

<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n29" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">@Service
@Slf4j
@RocketMQMessageListener(consumerGroup = "hello", topic = "test-topic")
public class RocketMqConsumerListener implements RocketMQListener<String> {

@Override
public void onMessage(String message) {
log.error(message);
}
}
​</pre>

上一篇下一篇

猜你喜欢

热点阅读