3.4 接收消息

2018-03-30  本文已影响0人  nuist_kevin

3.4.1 同步接收

通常 JMS

3.4.2 异步接收

Spring 还支持通过 @JmsListener 注解来配置监听器,这是目前为止设置异步接收器最方便的方式

EJB世界的消息驱动的Bean(MDB)类似,消息驱动的POJO(MDP)也是JMS消息的接收者,而MDP的唯一的限制就是,它必须实现javax.jms.MessagListener接口。要注意的是,这种情况下,我们的POJO会在多线程环境中接收消息,所以一定要确保我们的实现是线程安全的。

下面是一个简单的实现:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ExampleListener implements MessageListener {
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                System.out.println(((TextMessage) message).getText());
            }
            catch (JMSException ex) {
                throw new RuntimeException(ex);
            }
        }
        else {
            throw new IllegalArgumentException("Message must be of type TextMessage");
        }
    }
}

一旦实现了我们自己的MessageListener,就该创建一个消息监听器容器了。

下面就是一个使用Spring带给我们的众多消息监听器容器之一的示例(用的是DefaultMessageListenerContainer):

<!-- 这是我们自己实现的消息驱动POJO -->
<bean id="messageListener" class="jmsexample.ExampleListener" />

<!-- 这是消息监听器容器 -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener" />
</bean>

要想了解各个消息监听器容器实现的完整功能描述,可以去参考 Spring 的 API 文档。

3.4.3 SessionAwareMessageListener接口

SessionAwareMessageListener接口是 Spring 特有的接口,它提供了和JMS的MessageListener类似的功能,而且它还提供了对消息Session的访问。

package org.springframework.jms.listener;

public interface SessionAwareMessageListener {

    void onMessage(Message message, Session session) throws JMSException;

}

如果希望我们的MDP能够对任何收到的消息进行响应,可以选择实现这个接口。
Spring提供的所有消息监听器容器都支持实现了MessageListenerSessionAwareMessageListener 的 MDP,实现SessionAwareMessageListener接口的类的不利点就在于它和Spring绑定了,使不使用它完全看我们的实际使用情况。

要注意,SessionAwareMessageListeneronMessage(..)方法会抛出JMSException,因此,使用者要负责处理它抛出的异常。

3.4.4 MesssageListenerAdapter

MesssageListenerAdapter类是 Spring 对异步消息支持的最终杀器:一句话,它允许我们用几乎任何类作为一个MDP。

看看下面的这个接口定义,这个接口既没有继承MessageListener也没有继承 SessionAwareMessageListener,但它仍然能够通过MesssageListenerAdapter类将其当成MDP来使用。

public interface MessageDelegate {

    void handleMessage(String message);

    void handleMessage(Map message);

    void handleMessage(byte[] message);

    void handleMessage(Serializable message);

}
public class DefaultMessageDelegate implements MessageDelegate {
    // implementation elided for clarity...
}

上面这个类对于JMS完全没有依赖,就是一个POJO,通过如下配置将其作为MDP来使用:

<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
        <bean class="jmsexample.DefaultMessageDelegate"/>
    </constructor-arg>
</bean>

<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener" />
</bean>

下面的这个类只能接收TextMessage类型的 JMS 消息。注意这个消息处理器的名称是receive(而 MessageListenerAdapter内的默认消息处理器名称是handleMessage),这是可配置的。还要注意的是, receive(..) 方法强制了消息类型是 TextMessage

public interface TextMessageDelegate {

    void receive(TextMessage message);
}
public class DefaultTextMessageDelegate implements TextMessageDelegate {
    // implementation elided for clarity...
}

对应的MessageListenerAdapter配置应该是这样:

<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
        <bean class="jmsexample.DefaultTextMessageDelegate"/>
    </constructor-arg>
    <property name="defaultListenerMethod" value="receive"/>
    <!-- 我们不希望对消息内容进行自动转换 -->
    <property name="messageConverter">
        <null/>
    </property>
</bean>

如果上面这个messageListener接收到一个不是TextMessage类型的JMS Message ,会抛出 IllegalStateExceptionMessageListenerAdapter的另一个能力是能够自动发送一个响应的Message,只要处理方法返回一个非void类型的值。考虑下面的接口和类:

public interface ResponsiveTextMessageDelegate {

    // 注意返回类型...
    String receive(TextMessage message);
}
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
    // implementation elided for clarity...
}

如果上面的DefaultResponsiveTextMessageDelegateMessageListenerAdapter配合使用,那么receive(..) 方法返回的任何非空值都会被转化为一个TextMessage(默认配置),然后发送给原始Message中的Reply-To属性定义的Destination(如果有定义的话),或者是发送给MessageListenerAdapter中设置的默认Destination(如果有定义的话),如果找不到任何的Destination,那么会抛出一个InvalidDestinationException

上一篇下一篇

猜你喜欢

热点阅读