springboot2.x监听beanstalk实现异步任务

2020-11-06  本文已影响0人  骑蚂蚁上高速_jun

项目中使用springboot 监听beanstalk 队列 ,接收到任务后将其异步投递到 线程池中运行。

1.添加 java操作 beanstalk坐标

<dependency>
      <groupId>com.dinstone</groupId>
      <artifactId>beanstalkc</artifactId>
      <version>2.3.0</version>
           
     <exclusions>
              <!-- springboot使用logback,需要排除 log4j -->
         <exclusion>
              <groupId>org.slf4j</groupId>
              <artifactId>slf4j-log4j12</artifactId>
         </exclusion>
    </exclusions>
</dependency>
  1. 定时任务多线程配置。
    由于springboot中所有的定时任务都是单线程执行,就算是多个定时任务在一起也是单线程,所有只要其中的一个定时器造成阻塞,那么其他的所有定时任务都不会执行了。此时就必须配置定时任务的多线程模式运行。 由于定时任务监听 beanstalk,会造成阻塞,会影响到项目中其他的所有定时器,故而需要配置定时任务的线程池。。
package com.configuration;

import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;

import java.util.concurrent.Executors;

@Configuration
public class ScheduledConfiguration implements SchedulingConfigurer /**/{

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        //设定一个长度10的定时任务线程池
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("scheduled@");
        taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10,executor));
    }
}

  1. 使用 Schedule 定时任务线程监听队列。
package com.task;

import com.alibaba.fastjson.JSONObject;
import com.dinstone.beanstalkc.BeanstalkClientFactory;
import com.dinstone.beanstalkc.Configuration;
import com.dinstone.beanstalkc.Job;
import com.dinstone.beanstalkc.JobConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Objects;

@Component
@Slf4j
@Lazy(false)
public class CrontabTask {


    @Autowired
    SmtpSendMailer smtpSendMailer; // 线程池对象
  // 链接 beanstalk队列
   private JobConsumer connectBeanstalkConsumer(){
        Configuration config = new Configuration();
        config.setServiceHost("8.129.0.115");
        config.setServicePort(11300);
        BeanstalkClientFactory factory = new BeanstalkClientFactory(config);
        return factory.createJobConsumer("smtp-mail");
    }

    @Scheduled(fixedDelay = 1)
    public void executeInternal()  {
        JobConsumer consumer = connectBeanstalkConsumer();
        System.out.println("beanstalk连接的线程是:"+SystemUtils.getCurrentThreadName());
        while(true){
            Job job = null;
            try{
                job = consumer.reserveJob(3);
            }catch(Throwable e){
                System.out.println("beanstalk重连一次......");
                consumer.close(); // 关闭原来的链接
                consumer = connectBeanstalkConsumer();
            }

            if (Objects.isNull(job)) continue;
            String jobString = new String(job.getData());
            try{
                DataDto dataDto = JSONObject.parseObject(jobString,  DataDto.class);
                System.out.println(dataDto);
                // 投递到异步队列线程池执行
            }catch(Throwable e){
                System.out.println("非法的消费数据 : " + jobString);
            }
            consumer.deleteJob(job.getId());
        }
    }
}
  1. 配置beanstalk任务线程池
package com.configuration;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
@EnableAsync // 开启异步任务
@EnableScheduling // 开启定时任务
public class TaskConfiguration {


    @Bean("smtpMailer")
    public Executor smtpMailer() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor .setCorePoolSize(10);// 设置最小的线程数量
        executor .setMaxPoolSize(50);// 设置最大的线程数量
        executor .setQueueCapacity(25);// 等待队列
        executor .setKeepAliveSeconds(60);
        executor .setThreadNamePrefix("smtpMailer@"); // 设置线程名称
        executor.initialize();
        return executor ;
    }

}

4 . 执行beanstalk 任务投递过来的异步任务

@Async("smtpMailer")
    public void sendMailer(String jobString) {
        // 得到任务数据。。。。
        JSONObject jsonObject= JSONObject.parseObject(jobString);
        // TODO.. 继续任务
    }
上一篇下一篇

猜你喜欢

热点阅读