Apache pulsar consumer接口详解

2019-02-22  本文已影响7人  wolf4j
consumer.png

receive

receive

接收单条消息,这是一个阻塞调用,直到有消息可用。

注意:

receive aysnc
receive timeout

这个方法和receive的功能一样,只是加入了timeout的限制,如果在指定的timeout到达之前还没有收到消息,就返回null

receive queue size

设置consumer端receive queue的大小。

这个相当于在consumer端增加了一个缓存,是一个拿空间换时间的例证,如果我queue的size越大,我的吞吐量越高,但同时需要消耗的内存也越多。

通过禁用pre-fetching message来降低消费者的吞吐量。 此方法通过仅将消息推送到准备处理它们的消费者来改进共享订阅上的消息分发。 如果使用者队列的 size为零,则不能使用receive(int,TimeUnit)和Partitioned Topics。 消费者队列大小为零时,我们同样不应该中断Consumer#receive()函数调用。

不支持Batch-Message:如果使用者收到任何批处理消息,那么它将关闭与broker的消费者连接,并且 Consumer#receive()调用将保持阻塞状态。在队列中中的批处理消息被删除之前,消费者将无法再接收任何消息

max total receiver queue size across partitions

设置跨分区中所有接收者队列的最大值。如果超过这个设定值(默认:5000),该设置将会减少各个分区接收者队列的大小(#receiverQueueSize(int))

crypto

crypto key reader
crypto failure action

将ConsumerCryptoFailureAction设置为指定的值,主要提供了三种类型,具体如下:

intercept

close

关闭consumer的拦截器,释放相应的资源

before consumer

允许该函数修改消息,在这种情况下将会返回被修改后的消息,该方法抛出的异常将会被调用者捕获,但是不会发送到客户端。
因为consumer可能会调用多个拦截器,因此会按照指定的调用顺序返回。在拦截器列表(List)中,第一个拦截器获取到的被消费的消息将会作为下面拦截器的输入,依此类推。因为我们允许一个拦截器修改消息,所以后面的拦截器可能会获取到前一个拦截器中修改过后的消息,所以我们在构建有依赖关系的拦截器时,尽量避免修改操作

on ack

在消费者向broker发送确认的时候,该函数会被调用,调用者将忽略该方法抛出的异常

on ack cumulative

在消费者向broker发送累积确认的时候,该函数会被调用,调用者将忽略该方法抛出的异常

ack

ack msg

确认已经消费了一条消息,如果consumer被提早closed,也会抛出异常

ack async msg
ack msgID

确认消费单条消息,这个消息的确认是根绝传入的msgID来进行确认的,如果consumer被closed,会抛出异常

ack async msgID
ack cumulative msg

确认接收到了消息流中提供的所有消息,在这个ack被发送给broker之前,该方法将一直阻塞。一旦接收成功,消息将不会被重新传递给该消费者。
注意:当消费者类型设置为ConsumerShared时,不能使用累积确认。它等同于调用asyncAcknowledgeCumulative(Message)并等待触发回调。
如果consumer被closed,会抛出异常

ack async cumulative msg
ack cumulative msgID

该方法的实现等同于ack cumulative msg,只是消息确认传入的参数为msgID。

ack async cumulative msgID
ack timeout

给没有确认的消息设置超时,截断为最接近的毫秒,超时需要大于10秒。

ack group time

将消费者的ack分组到指定的时间,默认情况下,消费者将使用100毫秒的分组时间向broker发送确认。如果将分组时间设置为0,将立即发送确认。

close

close

关闭consumer,阻止broker接收额外的消息。

close async

异步关闭consumer

auto update partition

如果启用,消费者将自动增加订阅的分区。
注意:这仅适用于分区的consumer。

consumer

consumer name

根据指定的name来消费消息

consumer event listener

为consumer设定一个监听器,主要是为failover的订阅模式服务的,监听一组consumer中哪一个consumer宕机之类的故障,应用程序可以根据此对consumer的状态做出改变。提供了以下两种状态,主要用于通知集群中其它的consumer:
active
inactive

seek

seek by msgID

将consumer消费的位置移到指定的mesID处。该方法提供了三种方式:

seek async by msgID
seek by time

每一个消息在发送的时候都有一个publish time的字段,该 seek方法可以根据消息被publish的time,将消费的位置移到指定的位置。同样的,该方法只能在non-partitioned 的topics上进行操作。

seek async by time

pattern auto discovery period

该方法也只支持consumer是正则匹配下的那种订阅形式。
自动发现的周期以分钟为单位,默认值和最小值为1分钟。

other options

clone

consumer的clone方法类似于producer的clone,会copy一个cosumer 的实例出来。

read compact

如果启用,消费者将从被压缩的topic中读取消息,而不是去读取topic积压的所有消息。 这意味着,如果topic已被压缩,则consumer将只看到topic中每个key的最新值,直到topic中积压的消息已经到达压缩的点。 除此之外,消息将照常发送。
readCompacted只能对持久化的topic使用,这些topics有单个active的消费者。如果在非持久化的topic中启用readCompacted,会抛出异常。

resume

该方法会继续从broker请求消息。

pause

在调用resume方法之前,停止向broker请求新消息。 请注意,这可能导致receive 阻塞,直到调用 resume 并且新的消息被推送到 broker。

redeliver unacknowledged messages

重新发送所有未确认的消息。 在Failover模式下,对于给定的topic如果consumer不处于active状态,则会忽略该请求。 在shared模式下,需要被重新发送的消息会通知到所有订阅了该topic的消费者中。 这是一个非阻塞调用,不会抛出异常。 如果连接中断,重新连接后将重新传递消息。

dead letter policy

为consumer设置“死信”政策。
默认情况下,某些消息会尽可能多的被重发,甚至可能永远不会停止。
通过使用“死信”机制,为消息的重传设置了一个计数器,当消息超过最大重新传送次数时,消息将发送到“死信”topic并自动确认。
如果指定了“死信”策略,但是未指定ackTimeoutMillis,则ack超时将设置为30000毫秒

max redeliver count

设定“死信”政策的最大次数为多少。

dead letter topic

指定某一个topic成为“死信”topic

msg listener

将为每个收到的消息调用的侦听器。

received

每当收到新消息时都会调用此方法。

保证消息按顺序传递,并从单个消费者的相同线程传递

除非应用程序或broker崩溃,否则只会为每条消息调用此方法一次。

应用程序负责处理在处理消息时可能抛出的任何异常。

reached end of topic

topic到期或结束时获取通知

property

property

给consumer设置单个属性

properties

给consumer设置多个属性

priority level

当我们调度消息时,为broker提供更多的优先级选项以供选择,优先级按照降序排列,0 为最大优先级。
在共享订阅的模式下,如果大家权限是相等的,broker会优先给优先级高的consumer发送消息。
举个例子:
现在有两个消费者A和B,同时订阅某一个topic,Consumer-A的优先级为0,Consumer-B的优先级为 1 ,那么在consumer-A所设定的权限没有到期之前,broker将仅向消费者A发送消息,然后才向Consumer-B发送消息。

Consumer PriorityLevel Permits
C1 0 2
C2 0 1
C3 0 1
C4 1 2
C5 1 1

broker向consumer发送消息的顺序:C1, C2, C3, C1, C4, C5, C4

topic

topic

指定该consumer将要订阅的topic

topics

指定该consumer将要订阅的topics,该方法可以是一个List

topics pattern

提供了正则匹配的方式来支持consumer订阅相关的topics

subscribe

unsubscribe

取消对一个costumer的订阅

subscribe

订阅该topic,直到该topic被成功创建。如果订阅的topic不存在,则会创建新订阅,并且在创建之后发布的所有消息将被保留,直到确认为止,即使这个时候没有消费者连接进来。

subscribe async

提供一个异步订阅topic的方法,其它功能和subscribe一样。

subscribe by name

为consumer指定一个具体要订阅的topic的名字

subscribe type
subscribe topics mode

确定该订阅者的所订阅的topics属于什么模式,总共提供了如下三种模式:

subscribe init pos

该consumer订阅的初始位置,提供了以下三种:

上一篇 下一篇

猜你喜欢

热点阅读