技术Code

使用redis结合aop提取用户的行为数据

2018-09-16  本文已影响17人  Briseis

场景描述

在项目中有这样一个需求,用户下载app并打开进入首页之前,会让用户选择自己感兴趣的主题分类,后台根据用户的选择提取出用户的行为数据并作出统计,以图形或者表格的形式展现出来,后期就可以根据这些数据做去做一些类似个性化或精准化的推送了,例如在进入豆瓣app首页前提示用户先选择感兴趣的主题内容.


image.png

具体实现

由于项目中主要使用了Spring框架,所以首先想到的最简单的方式就是使用aop切面拦截用户选择主题分类的接口,然后在数据库中建一张单独的表t_user_interest来存储用户的行为数据,由于是统计型数据所以实时性要求不高,允许有一定的延迟,并且为避免频繁的操作数据库,在切面拦截数据后起独立的线程推送到redis队列,后台再起调度任务每隔1小时异步持久化到数据库中.下面来看看实现步骤:
1.定义aop拦截器拦截方法调用,把用户行为数据推送到redis队列中,其中TaskPo为封装好的与用户相关的行为数据对象.

/**
 * aop拦截器拦截方法调用
 */
@Component
@Aspect
public class UserInterestAspect {

    private final static Logger logger = LoggerFactory.getLogger(UserInterestAspect.class);

    private final LinkedBlockingQueue<TaskPo> queue = new LinkedBlockingQueue<>();

    private final static ExecutorService pool = Executors.newFixedThreadPool(5);

    private final static String QUEUE_NAME = "USER_INTEREST";

    /**
     * 前置通知:在目标方法开始之前执行
     * 
     * @param joinPoint
     * @throws Exception
     */
    @Before("execution(* xxx.service.impl.xxxServiceImpl.xxx(..))")
    public void saveRequiresLog(JoinPoint joinPoint) throws Exception {
        try {
            TaskPo taskPo = getTaskPo(joinPoint);
            if (taskPo != null) {
                queue.add(taskPo);
                TaskPo taskMsge;
                while ((taskMsge = queue.poll()) != null) {
                    pool.execute(new PushRedisWorker(QUEUE_NAME, taskMsge));
                }
            }
        } catch (Exception e) {
            throw e;
        }
    }

    /**
     * 解析参数映射为Java对象
     * 
     * @param joinPoint
     * @return
     */
    public TaskPo getTaskPo(JoinPoint joinPoint) {
        Object[] paramsValue = joinPoint.getArgs();
        String[] paramsName = ((CodeSignature) joinPoint.getStaticPart()
                .getSignature()).getParameterNames();
        JSONObject entityJSON = new JSONObject();
        for (int i = 0; i < paramsName.length; i++) {
            entityJSON.put(paramsName[i], paramsValue[i]);
        }
        if (entityJSON != null && !entityJSON.isEmpty()) {
            TaskPo taskPo = JSONObject.toJavaObject(entityJSON, TaskPo.class);
            taskPo.setOperatorDate(new Date());
            return taskPo;
        }
        return null;
    }
}

2.接着起一个任务线程接口worker继承Runnable.

public interface Worker extends Runnable {

}

3.然后创建一个PushRedisWorker类实现Worker接口,并将封装好的数据写到redis队列中.

public class PushRedisWorker implements Worker {

    private String QUEUE_NAME;

    private TaskPo taskPo;

    public PushRedisWorker(String QUEUE_NAME, TaskPo taskPo) {
        this.QUEUE_NAME = QUEUE_NAME;
        this.taskPo = taskPo;
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        try (Jedis jedis = JedisUtils.getJedis()) {
            jedis.lpush(QUEUE_NAME, JSON.toJSONString(taskPo));
        }
    }
}

4.然后再定义一个调度任务器类每隔一段时间间隔轮询redis任务队列,不断的从队列中消费数据,再结合线程池异步发送数据到mysql中.

/**
 * 任务调度定时到Redis队列中拉取对象持久化到数据库
 */
@Component
public class RedisTaskJob {

    private final static Logger logger = LoggerFactory.getLogger(RedisTaskJob.class);

    private final static String QUEUE_NAME = "USER_INTEREST";

    private final static ExecutorService es = Executors.newFixedThreadPool(5);

    private volatile static boolean isRun = true;

    /**
     * 每隔一小时执行一次
     */
    @Scheduled(cron = "0 0 0/1 * * ? ")
    public void getRedisTask() {
        if (logger.isDebugEnabled()) {
            logger.debug("调度开始");
        }
                //取出spring受管的业务service实例
        IUserInterestService userInterestService = SpringContextHolder.getBean("userInterestService",IUserInterestService.class);
        try (Jedis jedis = JedisUtils.getJedis()) {
            if (jedis.exists(QUEUE_NAME)) {
                start();
            }
            while (isRun) {
                if (!jedis.exists(QUEUE_NAME)) {
                    stop();
                    break;
                }
                try {
                    String task = jedis.lpop(QUEUE_NAME);
                    if (StringHelpUtils.isNotBlank(task)) {
                        TaskPo taskPo = JSONObject.toJavaObject(JSON.parseObject(task), TaskPo.class);
                                                //任务对象转化为实体model
                        UserInterest model = EntityUtils.convert(taskPo,UserInterest.class);
                        es.submit(() -> {
                                                         //执行数据入库操作
                            userInterestService.save(model);
                        });
                    }
                } catch (Exception e) {
                    stop();
                    throw e;
                }
            }
        }
    }

    public static void stop() {
        isRun = false;
    }

    public static void start() {
        isRun = true;
    }
}

5.最后统计mysql中的数据并生成报表.

上一篇下一篇

猜你喜欢

热点阅读