javaJAVA互联网科技

4-多线程-concurrent包

2018-02-28  本文已影响16人  宠辱不惊的咸鱼

概述

ReentrantLock

// 简单用法
public static ReentrantLock lock = new ReentrantLock();
    try {
        lock.lock();
        i++;
    } finally {
        lock.unlock(); // 保证异常情况下也释放;synchronized由JVM释放
    }

Condition

public static ReentrantLock lock = new ReentrantLock();
public static Condition condition = lock.newCondition();

Semaphore

概述

Semaphore semaphore = new Semaphore(5);
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
        NonfairSync(int permits) {
            super(permits);
        }
            Sync(int permits) {
                setState(permits);
            }
                AbstractQueuedSynchronizer:
                protected final void setState(int newState) {
                    state = newState;
                }

// 公平锁
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

acquire

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
            final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 || compareAndSetState(available, remaining))
                        return remaining;
                }
            }

release

public void release() {
    sync.releaseShared(1);
}
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

场景

CountDownLatch

概述

CountDownLatch countDownLatch = new CountDownLatch(10);

机制

CyclicBarrier

public CyclicBarrier(int parties) {
    this(parties, null);
}

// parties:栅栏放开前需要调用的线程数
// count:当前剩余需要调用的线程数
// barrierCommand:调用线程数满足后,栅栏放开前执行的命令
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

// 用法,每个线程调用await,耗完parties即可往下走
CyclicBarrier.await
    final ReentrantLock lock = this.lock; // this就是CyclicBarrier
    lock.lock();                          // 先拿锁
    int index = --count;
    if (index == 0) {                     // 判断index,为0表示线程数已达到
        boolean ranAction = false;
        try {
            final Runnable command = barrierCommand;
            if (command != null)          // 可能是空的,看构造函数了
                command.run();
            ranAction = true;
            nextGeneration();             // signalAll,重置count,重置generation
            return 0;                     // await结束
        } finally {
            if (!ranAction)               // command.run异常时,打破栅栏,释放线程
                breakBarrier();           // signalAll,重置count,设置broken属性
            }
        }
        
    trip.await                            // 自我阻塞

private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();  // trip:lock生成的Condition
    // set up next generation
    count = parties;
    generation = new Generation();
}

private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

LockSupport

/**
    * Makes available the permit for the given thread, if it
    * was not already available. If the thread was blocked on
    * {@code park} then it will unblock.  Otherwise, its next call
    * to {@code park} is guaranteed not to block. This operation
    * is not guaranteed to have any effect at all if the given
    * thread has not been started.
    *
    * @param thread the thread to unpark, or {@code null}, in which case
    *        this operation has no effect
*/
    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }
// 从上面注释可以看出,unpack可以释放park状态线程,或者将执行park的线程;不过对于还没开始执行的线程,unpark并不保证效果

/**
    * Disables the current thread for thread scheduling purposes unless the
    * permit is available.
    *
    * <p>If the permit is available then it is consumed and the call
    * returns immediately; otherwise the current thread becomes disabled
    * for thread scheduling purposes and lies dormant until one of three
    * things happens:
    *
    * <ul>
    *
    * <li>Some other thread invokes {@link #unpark unpark} with the
    * current thread as the target; or
    *
    * <li>Some other thread {@linkplain Thread#interrupt interrupts}
    * the current thread; or
    *
    * <li>The call spuriously (that is, for no reason) returns.
    * </ul>
    *
    * <p>This method does <em>not</em> report which of these caused the
    * method to return. Callers should re-check the conditions which caused
    * the thread to park in the first place. Callers may also determine,
    * for example, the interrupt status of the thread upon return.
*/
    public static void park() {
        UNSAFE.park(false, 0L);
    }
上一篇 下一篇

猜你喜欢

热点阅读