ActiveMQ 异步、延迟、定时投递,消息后置处理器

2020-02-24  本文已影响0人  笨鸡

ActiveMQ设置异步投递用来解决slow consumer。

1.ActiveMQConfig.java

package com.ctgu.demo.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;

import javax.jms.Queue;
import javax.jms.Topic;
import java.util.ArrayList;
import java.util.List;

@Configuration
@EnableJms
public class ActiveMQConfig {

    @Value("${my-queue}")
    private String myQueue;

    @Value("${my-delay-queue}")
    private String myDelayQueue;

    @Value("${my-topic}")
    private String myTopic;

    @Value("${spring.activemq.user}")
    private String userName;

    @Value("${spring.activemq.password}")
    private String password;

    @Value("${spring.activemq.broker-url}")
    private String brokerURL;

    private static List<String> trustList = new ArrayList<>();

    static {
        trustList.add("java.lang");
        trustList.add("java.util");
        trustList.add("com.ctgu.demo");
    }

    @Bean
    public Queue queue() {
        return new ActiveMQQueue(myQueue);
    }

    @Bean
    public Queue delayQueue() {
        return new ActiveMQQueue(myDelayQueue);
    }

    @Bean
    public Topic topic() {
        return new ActiveMQTopic(myTopic);
    }

    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory(userName, password, brokerURL);
    }

    @Bean
    public JmsListenerContainerFactory<?> topicListenerFactory(ActiveMQConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }

    @Bean
    public JmsListenerContainerFactory<?> queueListenerFactory(ActiveMQConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(false);
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }

    @Bean
    public JmsTemplate jmsTemplate(ActiveMQConnectionFactory connectionFactory){
        connectionFactory.setTrustAllPackages(true);
//        connectionFactory.setTrustedPackages(trustList);
        RedeliveryPolicy policy = new RedeliveryPolicy();
        policy.setMaximumRedeliveries(0);
        connectionFactory.setRedeliveryPolicy(policy);
        return new JmsTemplate(connectionFactory);
    }


}

2.User.java

package com.ctgu.demo.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {

    private Integer id;

    private String name;

    private Integer age;
}

3.ActiveMQReceiveService.java

package com.ctgu.demo.service;

import org.joda.time.DateTime;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;

import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;

@Service
public class ActiveMQReceiveService {

    @JmsListener(destination = "${my-queue}", containerFactory = "queueListenerFactory")
    public void receiveQueue(TextMessage textMessage) throws JMSException {
        System.out.println("*****receive Queue message: " + textMessage.getText()
                + DateTime.now().toDate());
    }

    @JmsListener(destination = "${my-delay-queue}", containerFactory = "queueListenerFactory")
    public void receiveQueue(ObjectMessage objectMessage) throws JMSException {
        System.out.println("*****receive Delay Queue message: " + objectMessage.getObject()
                + DateTime.now().toDate());
    }

    @JmsListener(destination = "${my-topic}", containerFactory = "topicListenerFactory")
    public void receiveTopic(TextMessage textMessage) throws JMSException {
        System.out.println("*****receive Topic message: " + textMessage.getText());
    }

    @JmsListener(destination = "${my-topic}", containerFactory = "topicListenerFactory")
    public void receiveTopic1(TextMessage textMessage) throws JMSException {
        System.out.println("*****receive Topic message1: " + textMessage.getText());
    }
}

4.DefinePostProcessor.java

package com.ctgu.demo.config;

import lombok.Data;
import org.apache.activemq.ScheduledMessage;

import org.springframework.jms.core.MessagePostProcessor;
import org.springframework.util.StringUtils;

import javax.jms.JMSException;
import javax.jms.Message;
import java.io.Serializable;

@Data
public class DefinePostProcessor implements MessagePostProcessor {

    private long delay = 0L;

    private long period = 0L;

    private int repeat = 0;

    private String corn = null;

    public Message postProcessMessage(Message message) throws JMSException{
        if (delay > 0) {
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
        }
        if (period > 0) {
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
        }
        if (repeat > 0) {
            message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
        }
        if (!StringUtils.isEmpty(corn)) {
            message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, corn);
        }
        return message;
    }

}

5.ActiveMQSendService.java

package com.ctgu.demo.service;

import com.ctgu.demo.config.DefinePostProcessor;
import com.ctgu.demo.entity.User;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.AsyncCallback;
import org.apache.activemq.ScheduledMessage;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jms.JmsProperties;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.jms.*;

@Service
public class ActiveMQSendService {

    private final static Logger log = LoggerFactory.getLogger(ActiveMQSendService.class);

    @Autowired
    private JmsTemplate jmsTemplate;

    @Qualifier(value = "queue")
    @Autowired
    private Queue queue;

    @Qualifier(value = "delayQueue")
    @Autowired
    private Queue delayQueue;

    @Autowired
    private Topic topic;

    public void sendMessage(Destination destination, String message) {
        jmsTemplate.convertAndSend(destination, message);
    }

    public void sendMessage(String message) {
        jmsTemplate.convertAndSend(queue, message);
    }

    //    @Scheduled(fixedRate = 3000)
    @PostConstruct
    public void sendMessageScheduled() {
        String message = "*****Scheduled:" + DateTime.now().toString();
        jmsTemplate.convertAndSend(queue, message);
        System.out.println("*****send Queue MessageScheduled is OK!");
    }

    //    @Scheduled(fixedRate = 3000)
    @PostConstruct
    public void sendAsyncMessageScheduled() throws JMSException {
        String message = "*****AsyncScheduled:" + DateTime.now().toString();

        asyncSendMessage(queue, message);
        System.out.println("*****send Queue AsyncScheduled is OK!");
    }


    @PostConstruct
    public void sendDelayMessageScheduled() throws JMSException {
        User user = new User(1, "xx", 20);
        long delay = 5 * 1000;
        long period = 2 * 1000;
        int repeat = 5;
        
//        DefinePostProcessor msgPostProcess = new DefinePostProcessor();
//        msgPostProcess.setDelay(delay);
//        msgPostProcess.setPeriod(period);
//        msgPostProcess.setRepeat(repeat);
//        jmsTemplate.setDefaultDestination(delayQueue);
//        jmsTemplate.convertAndSend(user, msgPostProcess);
        
        jmsTemplate.convertAndSend(user, (message -> {
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
            message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
            return message;
        }));

//        delaySendMessage(delayQueue, message, 500L);
        System.out.println("*****send Delay Queue MessageScheduled is OK!");
    }

    public void sendTopic(String msg) {
        jmsTemplate.convertAndSend(topic, msg);
    }

    //    @Scheduled(fixedRate = 3000)
    @PostConstruct
    public void sendTopicScheduled() {
        String message = "*****Topic Scheduled:" + DateTime.now().toString();
        jmsTemplate.convertAndSend(topic, message);
        System.out.println("*****send Topic MessageScheduled is OK!");
    }

    private void asyncSendMessage(Destination destination, String msg) throws JMSException {

        Connection connection = jmsTemplate.getConnectionFactory().createConnection();
        connection.start();
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

        ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
        // 设置持久化
        producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
        Message message = session.createTextMessage(msg);

        producer.send(message, new AsyncCallback() {
            @Override
            public void onSuccess() {
                log.info("消息发送成功!{}", DateTime.now().toDate());
            }

            @Override
            public void onException(JMSException e) {
                log.error("消息发送异常!{}", DateTime.now().toDate());
            }
        });
        session.commit();
        producer.close();
        session.close();
        connection.close();
    }


    private void delaySendMessage(Destination destination, String msg, long delayTime) throws JMSException {
        Connection connection = jmsTemplate.getConnectionFactory().createConnection();
        connection.start();
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

        ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
        producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
        Message message = session.createTextMessage(msg);
        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delayTime);
        producer.send(message);

        session.commit();

        producer.close();
        session.close();
        connection.close();
    }
}

6.application.yml

server:
  port: 7777

spring:
  activemq:
    broker-url: tcp://localhost:61616
    user: admin
    password: admin
    in-memory: true
    pool:
      enabled: true
      max-connections: 100
#  jms:
#    pub-sub-domain: false

my-queue: boot-active-queue

my-delay-queue: boot-active-delay-queue

my-topic: boot-active-topic

7.pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.ctgu</groupId>
    <artifactId>boot_mq_produce</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>boot_mq_produce</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-pool -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.9</version>
        </dependency>
        <dependency>
            <groupId>org.messaginghub</groupId>
            <artifactId>pooled-jms</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/joda-time/joda-time -->
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.10</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
上一篇下一篇

猜你喜欢

热点阅读