java 实现redis消息队列

2019-10-30  本文已影响0人  等等ChEnH

业务场景:由于项目需求需要对接多个PLC设备,获取PLC设备数据,网络传输得数据获取后直接做业务逻辑处理,可能会对数据库造成一定压力,甚至会导致程序崩溃。

解决方案:
PLC设备数据过来,Netty创建得服务端,处理接收客户端发送得数据,先存入到Redis队列,后台再开启多个线程处理redis队列中得数据。

Service层业务代码

package com.company.netty.service.redis;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

/**
 * Created by C.H on 2019/8/23.
 */
@Component
public class RedisService {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 获取对应key左侧第一个元素并删除
     * @param key
     * @return
     */
    public Object leftPop(String key){
        return stringRedisTemplate.opsForList().leftPop(key);
    }

    /**
     * 右侧插入集合中
     * @param key
     * @param value
     */
    public void rightPush(String key, String value){
        stringRedisTemplate.opsForList().rightPush(key, value);
    }
}

消费者代码

package com.company.netty.service.consumer;

import com.company.netty.common.Const;
import com.company.netty.service.redis.RedisService;
import com.company.netty.util.dataAnalysis.DataAnalysisUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * Created by C.H on 2019/8/24.
 */
@Slf4j
@Component
public class RedisQueueConsumer extends Thread{

    @Resource
    private RedisService redisService;

    @Override
    public void run() {
        log.info("启动消费者线程处理");
        while (true) {
            try {
                processMessage();
                this.sleep(50);//防止CPU空转
            } catch (InterruptedException e) {
                log.error("消费处理线程异常"+ e.getMessage());
            }
        }
    }

    /**
     * 消费者处理业务逻辑
     */
    private void processMessage() {
        Object msg = redisService.leftPop(Const.Redis_LIST_KEY);
        if (msg != null) {
            log.info("├ [消费客户端存放Redis消息队列]:"+ msg.toString());
            //TODO处理业务逻辑,入库更新操作
        
        }
    }
}

生产者方法

/**
     * 接收客户端发送的消息
     *
     * @param ctx ChannelHandlerContext
     * @param msg 消息
     */
    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("\t├ [收到客户端消息类型存放Redis队列]: {} - {}\n", msg.getClass(), msg);
        //抽象方法中通过beanLoad获取service注入
        PublisherService publisherService = SpringBeanLoader.getSpringBean(PublisherServiceImpl.class);
        try{
          publisherService.rightPush(Const.Redis_LIST_KEY, msg.toString());
        }catch(Exception e){
            log.error("存放消息失败" + e.getMessage());
        }
    }
上一篇下一篇

猜你喜欢

热点阅读