利用线程池调度维护设备会话状态

2020-05-21  本文已影响0人  大风过岗

WaterMachineThreadFactory

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @Author: chihaojie
 * @Date: 2020/5/21 15:14
 * @Version 1.0
 * @Note
 */
public class WaterMachineThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    public static WaterMachineThreadFactory forName(String name) {
        return new WaterMachineThreadFactory(name);
    }

    private WaterMachineThreadFactory(String name) {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                Thread.currentThread().getThreadGroup();
        namePrefix = name + "-" +
                poolNumber.getAndIncrement() +
                "-thread-";
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                namePrefix + threadNumber.getAndIncrement(),
                0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

SessionMetaData

package com.emqx;

import lombok.Data;

/**
 * @Author: chihaojie
 * @Date: 2020/5/21 14:19
 * @Version 1.0
 * @Note
 */
@Data
public class SessionMetaData {

    private volatile String deviceId;
    private volatile long lastActivityTime;

    public SessionMetaData(String deviceId, long lastActivityTime) {
        this.deviceId = deviceId;
        this.lastActivityTime = lastActivityTime;
    }

    void updateLastActivityTime() {
        this.lastActivityTime = System.currentTimeMillis();
    }

}

DeviceSessionManager

package com.emqx;

import java.util.concurrent.*;

/**
 * @Author: chihaojie
 * @Date: 2020/5/21 15:36
 * @Version 1.0
 * @Note
 */
public class DeviceSessionManager {
    //TODO code refactor
    //1.为心跳设备建立session
    //2.session中维护最新活跃时间
    //3.定期检查session中的僵尸设备,修改设备状态,踢出会话

    private long sessionInactivityTimeout=10000;

    private long sessionReportTimeout=5000;

    protected ScheduledExecutorService schedulerExecutor;

    private ConcurrentMap<String, SessionMetaData> sessions = new ConcurrentHashMap<>();

    /**
     * 注册会话
     */
    private void registerSession(){
        SessionMetaData currentSession = new SessionMetaData("deviceId",System.currentTimeMillis());
        sessions.putIfAbsent("deviceId", currentSession);
    }

    /**
     * 检查会话的活跃状态
     */
    private void checkInactivityAndReportActivity() {
        //System.out.println("【检查状态】");
        long expTime = System.currentTimeMillis() - sessionInactivityTimeout;
        sessions.forEach((k,v)->{
            if (v.getLastActivityTime() < expTime) {
                //过期
            }else {
                //活跃
                v.updateLastActivityTime();
            }
        });
    }

    /**
     *  更新活跃时间
     * @param sessionId
     * @return
     */
    private SessionMetaData reportActivityInternal(String sessionId) {
        SessionMetaData sessionMetaData = sessions.get(sessionId);
        if (sessionMetaData != null) {
            sessionMetaData.updateLastActivityTime();
        }
        return sessionMetaData;
    }



    /**
     * 清除会话
     * @param session
     */
    private void deregisterSession(SessionMetaData session) {
        SessionMetaData currentSession = sessions.get(session.getDeviceId());
        sessions.remove(currentSession.getDeviceId());
    }

    public void init() {
        //System.out.println("【构建线程池】");
        this.schedulerExecutor = Executors.newScheduledThreadPool(2,WaterMachineThreadFactory.forName("device-online-scheduler"));
        this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, 2000, sessionReportTimeout, TimeUnit.MILLISECONDS);
    }

    public void destroy() {
        //销毁线程池
        if (schedulerExecutor != null) {
            schedulerExecutor.shutdownNow();
        }

    }

}

上一篇 下一篇

猜你喜欢

热点阅读