并发编程工具(四)Semaphore 信号量

2021-05-17  本文已影响0人  圆企鹅i

允许固定数量线程执行的锁

小漏斗
简而言之 ,就是能够控制固定数量线程同时执行的工具

image.png
demo and 方法说明
package com.threads.thread.jdk.semaphore;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

/**
 * @author zhangxuecheng4441
 * @date 2021/5/17/017 19:45
 */
@Slf4j
public class SemaphoreExample {
    public static void main(String[] args) {

        //execute task count in the same time
        final int parallelCount = 3;
        //task task controller
        final SemaphoreController semaphoreController = new SemaphoreController(parallelCount, true);
        //task count
        IntStream allTask = IntStream.range(0, 10);
        allTask.forEach(i ->
                new Thread(() -> {
                    try {
                        //todo debug 运行任务 其他任务拿不到凭证则
                        boolean isGetStamp = semaphoreController.executeTask(1, 10);
                        //获取凭证 则可以执行任务
                        if (isGetStamp) {
                            //执行任务
                            executeSomeTask(i);
                        } else {
                            log.warn("can not execute task {}", i);
                            //执行服务降级方案
                            return;
                        }
                        //todo debug 运行任务 如果拿不到凭证 则堵塞任务 等待可以获得凭证 则执行
                        //semaphoreController.executeTaskUtilSuccessOrInterrupted(1);
                        //executeSomeTask(i);
                    } catch (Exception e) {
                        log.error("execute error", e);
                        //其他任务发生异常时 return 不会执行下面的提交任务凭证
                        return;
                    }
                    //todo 因为semaphore是任务共享的 其他线程发送异常后
                    //todo 也会执行endTask(release) 会出现非执行任务线程 提交任务结束的bug
                    //拆分开 try-catch-finally 其他任务发生异常时 return 不会执行下面的提交任务凭证
                    try {
                        log.info("finish task");
                    } finally {
                        //任务声明结束 务必放在finally中 否则在排查问题上会有很多问题
                        semaphoreController.endTask(1);
                        log.info("NO.{} task end ", i);
                    }
                }, "thread-" + i).start()
        );


    }

    private static void executeSomeTask(int taskNumber) {
        try {
            int taskSpendTime = new Random().nextInt(20);
            log.info("execute NO.[{}] task need spend {} s", taskNumber, taskSpendTime);
            TimeUnit.SECONDS.sleep(taskSpendTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    @Data
    @Builder
    @AllArgsConstructor
    private static class SemaphoreController {
        final int initCount = 2;
        private final Semaphore semaphore;


        /**
         * 定义Semaphore指定许可证数量,并且指定非公平的同步器,
         * 因此new Semaphore(n)实际上是等价于newSemaphore(n,false)的。
         *
         * @param customStampCount customStampCount
         * @param failQueue        定义Semaphore指定许可证数量的同时给定非公平或是公平同步器。
         */
        public SemaphoreController(int customStampCount, boolean failQueue) {
            this.semaphore = new Semaphore(customStampCount, failQueue);
        }

        /**
         * 在使用无参的tryAcquire时只会向Semaphore尝试获取一个许可证,
         * 但是该方法会向Semaphore尝试获取指定数目的许可证。
         * <p>
         * 该方法与tryAcquire无参方法类似,同样也是尝试获取一个许可证,但是增加了超时参数。
         * 如果在超时时间内还是没有可用的许可证,那么线程就会进入[阻塞状态],
         * 直到到达超时时间或者在超时时间内有可用的证书(被其他线程释放的证书),
         * 或者阻塞中的线程被其他线程执行了中断。
         *
         * @return isTaskStart
         */
        public boolean executeTask(int stampCount, int timeoutSeconds) {
            boolean isTaskStart = false;
            try {
                isTaskStart = semaphore.tryAcquire(stampCount, timeoutSeconds, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return isTaskStart;
        }

        /**
         * 提交任务 返回标记
         *
         * @param stampCount stampCount
         */
        public void endTask(int stampCount) {
            try {
                semaphore.release(stampCount);
            } catch (Exception e) {
                log.error("error release");
            }
        }

        /**
         * acquire方法也是向Semaphore获取许可证,但是该方法比较偏执一些,
         * 获取不到就会一直等(陷入阻塞状态),Semaphore为我们提供了acquire方法的两种重载形式。
         * <p>
         * 直到Semaphore有可用的许可证为止,或者被其他线程中断。
         * 当然,如果有可用的许可证则会立即返回
         *
         * @param stampCount taskCount
         */
        public void executeTaskUtilSuccessOrInterrupted(int stampCount) {
            try {
                semaphore.acquire(stampCount);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        /**
         * 该方法会向Semaphore获取一个许可证,如果获取不到就会一直等待,
         * 与此同时对该线程的任何中断操作都会被无视,
         * 直到Semaphore有可用的许可证为止。当然,如果有可用的许可证则会立即返回。
         *
         * @param stampCount stampCount
         */
        public void forceExecuteTaskUtilSuccess(int stampCount) {
            semaphore.acquireUninterruptibly(stampCount);
        }

        /**
         * 对Semaphore许可证的争抢采用公平还是非公平的方式,
         * 对应到内部的实现类为FairSync(公平)和NonfairSync(非公平)。
         *
         * @return isFair
         */
        public boolean isFair() {
            return semaphore.isFair();
        }

        /**
         * 当前的Semaphore还有多少个可用的许可证。
         *
         * @return 当前的Semaphore还有多少个可用的许可证。
         */
        public int availablePermits() {
            return semaphore.availablePermits();
        }

        /**
         * 排干Semaphore的所有许可证,以后的线程将无法获取到许可证,
         * 已经获取到许可证的线程将不受影响。
         *
         * @return 排干Semaphore的所有许可证
         */
        public int drainPermits() {
            return semaphore.drainPermits();
        }

        /**
         * 当前是否有线程由于要获取Semaphore许可证而进入阻塞?(该值为预估值。)
         *
         * @return 当前是否有线程由于要获取Semaphore许可证而进入阻塞?(该值为预估值。)
         */
        boolean hasQueuedThreads() {
            return semaphore.hasQueuedThreads();
        }

        /**
         * 如果有线程由于获取Semaphore许可证而进入阻塞,那么它们的个数是多少呢?(该值为预估值。)
         *
         * @return 如果有线程由于获取Semaphore许可证而进入阻塞,那么它们的个数是多少呢?(该值为预估值。)
         */
        int getQueueLength() {
            return semaphore.getQueueLength();
        }
    }
}


重写之后 安全的小漏斗
package com.threads.thread.jdk.semaphore;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import static java.lang.Thread.currentThread;

/**
 * @author zhangxuecheng4441
 * @date 2021/5/17/017 12:00
 */
@Slf4j
public class MySemaphore extends Semaphore {

    private static final long serialVersionUID = 3184061773358565644L;
    /**
     * 定义线程安全的、存放Thread类型的队列 todo 此处可以是其他线程安全的集合
     */
    private final ConcurrentLinkedQueue<Thread> queue = new ConcurrentLinkedQueue<>();

    int parallelCount;

    public MySemaphore(int permits) {
        super(permits);
    }

    public MySemaphore(int permits, boolean fair) {
        super(permits, fair);
    }

    @Override
    public void acquire() throws InterruptedException {
        super.acquire();
        // 线程成功获取许可证,将其放入队列中
        this.queue.add(currentThread());
    }

    @Override
    public void acquireUninterruptibly() {
        super.acquireUninterruptibly();
        // 线程成功获取许可证,将其放入队列中
        this.queue.add(currentThread());
    }

    @Override
    public boolean tryAcquire() {
        final boolean acquired = super.tryAcquire();
        if (acquired) {
            // 线程成功获取许可证,将其放入队列中
            this.queue.add(currentThread());
        }
        return acquired;
    }

    @Override
    public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
        final boolean acquired = super.tryAcquire(timeout, unit);
        if (acquired) {
            // 线程成功获取许可证,将其放入队列中
            this.queue.add(currentThread());
        }
        return acquired;
    }

    @Override
    public void release() {
        final Thread currentThread = currentThread();
        // 当队列中不存在该线程时,调用release方法将会被忽略
        if (!this.queue.contains(currentThread)) {
            return;
        }
        super.release();
        // 成功释放,并且将当前线程从队列中剔除
        this.queue.remove(currentThread);
    }

    @Override
    public void acquire(int permits) throws InterruptedException {
        super.acquire(permits);
        // 线程成功获取许可证,将其放入队列中
        this.queue.add(currentThread());
    }

    @Override
    public void acquireUninterruptibly(int permits) {
        super.acquireUninterruptibly(permits);
        // 线程成功获取许可证,将其放入队列中
        this.queue.add(currentThread());
    }

    @Override
    public boolean tryAcquire(int permits) {
        boolean acquired = super.tryAcquire(permits);
        if (acquired) {
            // 线程成功获取许可证,将其放入队列中
            this.queue.add(currentThread());
        }
        return acquired;
    }

    @Override
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException {
        boolean acquired = super.tryAcquire(permits, timeout, unit);
        if (acquired) {
            // 线程成功获取许可证,将其放入队列中
            this.queue.add(currentThread());
        }
        return acquired;
    }

    @Override
    public void release(int permits) {
        final Thread currentThread = currentThread();
        // 当队列中不存在该线程时,调用release方法将会被忽略
        if (!this.queue.contains(currentThread)) {
            return;
        }
        super.release(permits);
        // 成功释放,并且将当前线程从队列中剔除
        this.queue.remove(currentThread);
    }


}




处理任务的方法
package com.threads.thread.jdk.semaphore;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

/**
 * @author zhangxuecheng4441
 * @date 2021/5/17/017 20:10
 */
@Slf4j
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MySemaphoreHandler<T, R> {
    private static final long serialVersionUID = 3184061773358565644L;
    Semaphore semaphore;
    /**
     * 需要执行的函数
     */
    Function<T, R> executeTaskFunction;
    /**
     * 线程堵塞降级方法
     */
    Function<T, R> dropMethodFunction;
    /**
     * 一次任务凭证数量
     */
    int permits;
    /**
     * 超时时间
     */
    long timeout;
    /**
     * 超时时间单位
     */
    TimeUnit unit;

    /**
     * 执行任务 如果任务失败失败
     * 则执行降级方法
     */
    public R handle(T parameter) {
        R result = null;
        try {
            //运行任务 其他任务拿不到凭证则
            boolean isGetStamp = semaphore.tryAcquire(permits, timeout, unit);
            //获取凭证 则可以执行任务
            if (isGetStamp) {
                //执行任务
                result = executeTaskFunction.apply(parameter);
            } else {
                result = dropMethodFunction.apply(parameter);
                //执行服务降级方案
                return result;
            }
        } catch (Exception e) {
            log.error("execute error", e);
            //其他任务发生异常时 return 不会执行下面的提交任务凭证
            return null;
        }
        //todo 因为semaphore是任务共享的 其他线程发送异常后
        //todo 也会执行endTask(release) 会出现非执行任务线程 提交任务结束的bug
        //todo 使用重写的 { MySemaphore } 可以直接避免问题
        //拆分开 try-catch-finally 其他任务发生异常时 return 不会执行下面的提交任务凭证
        try {
            log.info("No.{} finish task", parameter);
        } finally {
            //任务声明结束 务必放在finally中 否则在排查问题上会有很多问题
            semaphore.release(permits);
        }
        return result;
    }

    /**
     * 强行执行所有任务
     * 任务无法获得凭证 堵塞线程至能获得为止
     */
    public R forceHandle(T parameter) {
        R result = null;
        try {
            semaphore.acquire();
            result = executeTaskFunction.apply(parameter);
        } catch (Exception e) {
            log.error("execute error", e);
            //其他任务发生异常时 return 不会执行下面的提交任务凭证
            return null;
        }
        //todo 因为semaphore是任务共享的 其他线程发送异常后
        //todo 也会执行endTask(release) 会出现非执行任务线程 提交任务结束的bug
        //todo 使用重写的 { MySemaphore } 可以直接避免问题
        //拆分开 try-catch-finally 其他任务发生异常时 return 不会执行下面的提交任务凭证
        try {
            log.info("No.{} finish task", parameter);
        } finally {
            //任务声明结束 务必放在finally中 否则在排查问题上会有很多问题
            semaphore.release(permits);
        }
        return result;
    }

}


执行自定义的小漏斗的demo
package com.threads.thread.jdk.semaphore;

import lombok.extern.slf4j.Slf4j;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

/**
 * @author zhangxuecheng4441
 * @date 2021/5/17/017 21:45
 */
@Slf4j
public class MySemaphoreExample {
    /**
     * execute task count in the same time
     */
    final static int PARALLEL_COUNT = 3;
    /**
     * 一次任务获取凭证数量
     */
    final static int ONCE_PERMITS = 1;
    /**
     * 一共十个任务
     */
    final static IntStream ALL_TASK = IntStream.range(0, 10);
    /**
     * 任务获取超时时间10s
     */
    final static int OUT_TIME = 10;

    public static void main(String[] args) {
        MySemaphoreHandler<Integer, Integer> semaphoreHandler = new MySemaphoreHandler<>(new MySemaphore(PARALLEL_COUNT),
                MySemaphoreExample::executeSomeTaskFunction, MySemaphoreExample::dropMethodFunction,
                ONCE_PERMITS, OUT_TIME, TimeUnit.SECONDS);
        //执行任务
        ALL_TASK.forEach(i ->
                new Thread(//() -> semaphoreHandler.handle(i)
                        () -> semaphoreHandler.forceHandle(i)
                        , "thread-" + i).start()

        );
    }

    /**
     * 主要任务执行逻辑
     *
     * @param taskNumber
     * @return
     */
    public static Integer executeSomeTaskFunction(Integer taskNumber) {
        try {
            int taskSpendTime = new Random().nextInt(20);
            log.info("execute NO.[{}] task need spend {} s", taskNumber, taskSpendTime);
            TimeUnit.SECONDS.sleep(taskSpendTime);
            return taskSpendTime;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 降级方法
     *
     * @param taskNumber
     * @return
     */
    private static Integer dropMethodFunction(Integer taskNumber) {
        try {
            int taskSpendTime = new Random().nextInt(20);
            log.info("execute dropMethodFunction NO.[{}] task need spend {} s", taskNumber, taskSpendTime);
            TimeUnit.SECONDS.sleep(taskSpendTime);
            return taskSpendTime;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

}

感谢:《java并发变成详解:深入理解并发核心库》汪文军

上一篇下一篇

猜你喜欢

热点阅读