java线程之Semaphore类基本使用

2018-03-28  本文已影响0人  dimdark

Semaphore 类的主要作用就是 控制线程并发的数量

1. 内部类 Sync FairSync NonfairSyn

Sync 内部类:

abstract static class Sync extends AbstractQueuedSynchronizer {
        /**
         * 构造函数
         */
        Sync(int permits) {
            setState(permits);
        }

        /** 
         * 返回许可证的个数
         */
        final int getPermits() {
            return getState();
        }

        /**
         * 减少reductions个许可证
         */
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) { // 发生溢出
                    throw new Error("Permit count underflow");
                }
                if (compareAndSetState(current, next)) { // 调用父类AQS的原子操作方法来更新state字段
                    return;
                }
            }
        }

        /**
         * 消耗完所有的许可证, 返回所消耗的许可证的个数
         */
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0)) {
                    return current;
                }
            }
        }

        /**
         * 非公平模式下尝试获取许可证的辅助方法
         */
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 || compareAndSetState(available, remaining)) {
                    return remaining;
                }
            }
        }

        /**
         * 重写AQS的tryReleaseShared方法来指出如何释放"锁"
         * sync的公平版本和非公平版本均用到这个方法
         */
        protected final boolean tryReleaseShared(int releases) {
             for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) { // 发生溢出
                    throw new Error("Maximum permit count exceeded");
                }
                if (compareAndSetState(current, next)) {
                    return true;
                }
             }
        }

    }

FairSync 内部类:

FairSync内部类
NonfairSync 内部类: NonfairSync内部类
2. Semaphore 类的构造方法
Semaphore类的构造方法
3. Semaphore 类的常用方法
acquire系列方法
release系列方法
其他常用方法
4. 例子

使用 Semaphore 实现 多生产者/多消费者模式

/**
 * @author dimdark
 */
public class CommonUtil {

    /**
     * 生产者数量
     */
    public static final int PRODUCER_COUNT = 10;

    /**
     * 消费者数量
     */
    public static final int CONSUMER_COUNT = 20;

    /**
     * 生产者与消费者在临界资源所用到的锁及相应的condition
     */
    public static final Lock lock = new ReentrantLock();
    public static final Condition consumeCondition = lock.newCondition();
    public static final Condition produceCondition = lock.newCondition();

}
/**
 * 容器, 用来存放食物
 * @author dimdark
 */
public class Container {

    public static final String[] food = new String[5]; // 只能存放5份食物

    /**
     * 判断容器是否已经没有食物
     */
    public static boolean isFoodEmpty() {
        return IntStream.range(0, food.length).noneMatch(i -> food[i] != null);
    }

    /**
     * 判断容器是否已经装满食物
     */
    public static boolean isFoodFull() {
        return IntStream.range(0, food.length).allMatch(i -> food[i] != null);
    }

    /**
     * 往容器里添加食物
     * 注意: 该方法只能被生产者调用以确保容器此时至少有空间可以放下食物
     */
    public static void putFood() {
        for (int i = 0; i < food.length; ++i) {
            if (food[i] == null) {
                food[i] = "food-" + i;
                return;
            }
        }
    }

    /**
     * 向容器中取出食物
     * 注意: 该方法只能被消费者调用以确保容器此时至少有一份食物
     */
    public static void getFood() {
        for (int i = 0; i < food.length; ++i) {
            if (food[i] != null) {
                food[i] = null;
                return;
            }
        }
    }

}
/**
 * @author dimdark
 */
public class Producer extends Thread {

    private static final Semaphore semaphore;
    private static final Lock lock;
    private static final Condition consumeCondition;
    private static final Condition produceCondition;

    static {
        semaphore = new Semaphore(CommonUtil.PRODUCER_COUNT);
        lock = CommonUtil.lock;
        consumeCondition = CommonUtil.consumeCondition;
        produceCondition = CommonUtil.produceCondition;
    }

    public void produce() {
        try {
            semaphore.acquire();
            lock.lock();
            while (Container.isFoodFull()) {
                produceCondition.await();
            }
            Container.putFood();
            System.out.println("producer " + Thread.currentThread().getName() + " produce food");
            consumeCondition.signalAll();
        } catch(InterruptedException e) {
            System.err.println("producer " + Thread.currentThread().getName() + " happens some error!");
        } finally {
            lock.unlock();
            semaphore.release();
        }
    }

    @Override
    public void run() {
        while (true) {
            produce();
        }
    }

}
/**
 * @author dimdark
 */
public class Consumer extends Thread {

    private static final Semaphore semaphore;
    private static final Lock lock;
    private static final Condition consumeCondition;
    private static final Condition produceCondition;

    static {
        semaphore = new Semaphore(CommonUtil.CONSUMER_COUNT);
        lock = CommonUtil.lock;
        consumeCondition = CommonUtil.consumeCondition;
        produceCondition = CommonUtil.produceCondition;
    }

    public void consume() {
        try {
            semaphore.acquire();
            lock.lock();
            while (Container.isFoodEmpty()) {
                consumeCondition.await();
            }
            Container.getFood();
            System.out.println("consumer " + Thread.currentThread().getName() + " consume food");
            produceCondition.signalAll();
        } catch(InterruptedException e) {
            System.err.println("consumer " + Thread.currentThread().getName() + " happens some error!");
        } finally {
            lock.unlock();
            semaphore.release();
        }
    }

    @Override
    public void run() {
        while (true) {
            consume();
        }
    }

}
/**
 * @author dimdark
 */
public class Main {

    public static void main(String[] args) {
        Thread[] producers = new Thread[CommonUtil.PRODUCER_COUNT];
        Thread[] consumers = new Thread[CommonUtil.CONSUMER_COUNT];
        for (Thread consumer : consumers) {
            consumer = new Consumer();
            consumer.start();
        }
        for (Thread producer : producers) {
            producer = new Producer();
            producer.start();
        }
    }

}
上一篇下一篇

猜你喜欢

热点阅读