ActiveMQ应答方式

2017-08-30  本文已影响956人  米刀灵

JMS API中约定了Client端可以使用四种ACK_MODE,在javax.jms.Session接口中:

ACK_MODE描述了Consumer与broker确认消息的方式(时机),比如当消息被Consumer接收之后,Consumer将在何时确认消息。对于broker而言,只有接收到ACK指令,才会认为消息被正确的接收或者处理成功了。ACK在consumer与Broker之间建立一种简单的“担保”机制。

    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

我们需要在创建Session时指定ACK_MODE,由此可见,ACK_MODE将是session共享的,意味着一个session下所有的 consumer都使用同一种ACK_MODE。
但如果此session为事务类型,用户指定的ACK_MODE将被忽略,而强制使用"SESSION_TRANSACTED"类型。如果session非事务类型时,也将不能将ACK_MODE设定为"SESSION_TRANSACTED"。

Client端指定了ACK_MODE,但是在Client与broker在交换ACK指令的时候,还需要告知ACK_TYPE,ACK_TYPE表示此确认指令的类型,比如Consumer消费消息时出现异常,就需要向broker发送ACK指令,ACK_TYPE为"REDELIVERED_ACK_TYPE",那么broker就会重新发送此消息。在JMS API中并没有定义ACT_TYPE,因为它通常是一种内部机制,并不会面向开发者。ActiveMQ中定义了如下几种ACK_TYPE(参看MessageAck类):

ACK指令中不同的ACK_MODE,将意味着在不同的时机发送ACK指令,每个ACK指令中又会包含不同的ACK_TYPE,那么broker端就可以根据ACK_TYPE来决定此消息的后续操作。

ACK_MODE:

AUTO_ACKNOWLEDGE : 自动确认,这就意味着消息的确认时机将由consumer择机确认。对于consumer而言,optimizeAcknowledge(优化应答)属性只会在AUTO_ACK模式下有效。AUTO_ACK又分为两种情况:
在同步(receive)情况下,单条消息将立即确认,如果开发者在处理message过程中出现异常,会导致此消息也不会redelivery,可能造成消息的丢失。
在异步(messageListener)情况下,将会调用listener.onMessage(message),此后再ACK。如果onMessage方法异常且没有被catch,将导致client端补充发送一个ACK_TYPE为REDELIVERED_ACK_TYPE确认指令,此消息会被redelivery。如果重发次数到达阈值,此消息将会进入DLQ。

    public class Listener implements MessageListener {
        public void onMessage(Message message) {
            int i = 8/0;//会导致redelivery
            try {
                if(message instanceof ActiveMQTextMessage){
                    ActiveMQTextMessage textMessage = (ActiveMQTextMessage) message;
                    System.out.println("收到的消息:" + textMessage.getText());                   }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public class Listener implements MessageListener {
        public void onMessage(Message message) {              
            try {
                int i = 8/0;//不会导致redelivery
                if(message instanceof ActiveMQTextMessage){
                    ActiveMQTextMessage textMessage = (ActiveMQTextMessage) message;
                    System.out.println("收到的消息:" + textMessage.getText());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

因此当我们使用messageListener方式消费消息时,通常建议在onMessage方法中使用try-catch,这样可以在处理消息出错时记录一些信息,而不是让consumer不断去重发消息。

CLIENT_ACKNOWLEDGE : 客户端手动确认,这就意味着AcitveMQ将不会自动ACK任何消息。如果一个conmuser在消费结束前没有调用message.acknowledge()确认一个消息,之后调用其他conmuser时会再次消费它,因为对于broker而言,那些尚未真正ACK的消息被视为未消费,直到它被确认。

DUPS_ACKNOWLEGE:也是一种潜在的AUTO_ACK,在确认消息的条数和时间上有所不同。

SESSION_TRANSACTED : 当session使用事务时,就是使用此模式。
在事务开启之后,和session.commit()之前,所有消费的消息,要么全部正常确认,要么全部redelivery。commit()方法将会导致当前session的事务中所有消息立即被确认。 当session.commit()异常时,会自动调用inner-rollback回滚事务(也可以捕捉到异常,手动调用session.rollback()回滚事务),也可以在事务开始之后的任何时候手动调用session.rollback()。rollback意味着当前事务的结束,事务中所有的消息都将被重发。

上一篇 下一篇

猜你喜欢

热点阅读