Semaphore 的作用、应用场景和实战

2021-04-13  本文已影响0人  wuchao226

CyclicBarrier 的作用、应用场景和实战
CountDownLatch 的作用、应用场景和实战

概述

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

过程如下图所示:

Semaphore 常用方法

构造方法:

public Semaphore(int permits)
public Semaphore(int permits, boolean fair)

常用方法:

public void acquire() throws InterruptedException
public boolean tryAcquire()
public void release()
public int availablePermits()
public final int getQueueLength() 
public final boolean hasQueuedThreads()
protected void reducePermits(int reduction)
protected Collection<Thread> getQueuedThreads()

Semaphore 使用场景

可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求, 要读取几万个文件的数据,因为都是 IO 密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有 10 个,这时我们必须控制只有 10 个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用 Semaphore 来做流量控制。

代码示例:

/**
 * 类说明:演示Semaphore用法,一个数据库连接池的实现
 */
public class DBPoolSemaphore {

    private final static int POOL_SIZE = 10;
    //两个指示器,分别表示池子还有可用连接和已用连接
    private final Semaphore useful, useless;
    //存放数据库连接的容器
    private static LinkedList<Connection> pool = new LinkedList<Connection>();

    //初始化池
    static {
        for (int i = 0; i < POOL_SIZE; i++) {
            pool.addLast(SqlConnectImpl.fetchConnection());
        }
    }

    public DBPoolSemaphore() {
        this.useful = new Semaphore(10);
        this.useless = new Semaphore(0);
    }

    /*归还连接*/
    public void returnConnect(Connection connection) throws InterruptedException {
        if (connection != null) {
            System.out.println("当前有" + useful.getQueueLength() + "个线程等待数据库连接!!"
                    + "可用连接数:" + useful.availablePermits());
            useless.acquire();
            synchronized (pool) {
                pool.addLast(connection);
            }
            useful.release();
        }
    }

    /*从池子拿连接*/
    public Connection takeConnect() throws InterruptedException {
        useful.acquire();
        Connection connection;
        synchronized (pool) {
            connection = pool.removeFirst();
        }
        useless.release();
        return connection;
    }
}
/**
 * 类说明:测试数据库连接池
 */
public class AppTest {

    private static DBPoolSemaphore dbPool = new DBPoolSemaphore();

    private static class BusiThread extends Thread {
        @Override
        public void run() {
            Random r = new Random();//让每个线程持有连接的时间不一样
            long start = System.currentTimeMillis();
            try {
                Connection connect = dbPool.takeConnect();
                System.out.println("Thread_" + Thread.currentThread().getId()
                        + "_获取数据库连接共耗时【" + (System.currentTimeMillis() - start) + "】ms.");
                SleepTools.ms(100 + r.nextInt(100));//模拟业务操作,线程持有连接查询数据
                System.out.println("查询数据完成,归还连接!");
                dbPool.returnConnect(connect);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 50; i++) {
            Thread thread = new BusiThread();
            thread.start();
        }
    }
}

打印结果:

Thread_12_获取数据库连接共耗时【0】ms.
Thread_14_获取数据库连接共耗时【0】ms.
Thread_17_获取数据库连接共耗时【0】ms.
Thread_15_获取数据库连接共耗时【0】ms.
Thread_18_获取数据库连接共耗时【0】ms.
Thread_20_获取数据库连接共耗时【0】ms.
Thread_13_获取数据库连接共耗时【0】ms.
Thread_16_获取数据库连接共耗时【0】ms.
Thread_11_获取数据库连接共耗时【0】ms.
Thread_19_获取数据库连接共耗时【0】ms.
查询数据完成,归还连接!
当前有5个线程等待数据库连接!!可用连接数:0
Thread_21_获取数据库连接共耗时【121】ms.
查询数据完成,归还连接!
当前有4个线程等待数据库连接!!可用连接数:0
Thread_22_获取数据库连接共耗时【133】ms.
查询数据完成,归还连接!
当前有3个线程等待数据库连接!!可用连接数:0
查询数据完成,归还连接!
当前有2个线程等待数据库连接!!可用连接数:0
Thread_23_获取数据库连接共耗时【145】ms.
Thread_24_获取数据库连接共耗时【144】ms.
查询数据完成,归还连接!
查询数据完成,归还连接!
当前有1个线程等待数据库连接!!可用连接数:0
当前有1个线程等待数据库连接!!可用连接数:0
Thread_25_获取数据库连接共耗时【158】ms.
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:1
查询数据完成,归还连接!
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:2
当前有0个线程等待数据库连接!!可用连接数:2
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:4
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:5
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:6
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:7
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:8
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:9

从打印结果可以看出,一次只有 10 个线程执行 acquire(),只有线程进行 release() 方法后才会有别的线程执行 acquire()。

注意: Semaphore 只是对资源并发访问的线程数进行监控,并不会保证线程安全。

上一篇 下一篇

猜你喜欢

热点阅读