Java并发编程-JDK并发包

2018-09-07  本文已影响29人  agile4j

参考资料:《Java高并发程序设计》


1.同步控制

1.扩展了synchronized功能的:重入锁

1.简介

import java.util.concurrent.locks.ReentrantLock;

public class Test {
    // 声明锁
    private static ReentrantLock lock = new ReentrantLock();
    private static int i = 0;

    private static final Runnable runnable = () -> {
        for (int j = 0; j < 100000; j++) {
            // 加锁
            lock.lock();
            try {
                i++;
            } finally {
                // 释放锁
                lock.unlock();
            }
        }
    };

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(runnable);
        Thread t2 = new Thread(runnable);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(i);
    }
}
lock.lock();
lock.lock();
try {
    i++;
} finally {
    lock.unlock();
    lock.unlock();
}

2.高级功能

1.中断

import java.util.concurrent.locks.ReentrantLock;
import lombok.AllArgsConstructor;

public class Test {
    private static final ReentrantLock lock1 = new ReentrantLock();
    private static final ReentrantLock lock2 = new ReentrantLock();

    @AllArgsConstructor
    private static class IntLock implements Runnable {
        private int lock;

        @Override
        public void run() {
            try {
                if (lock == 1) {
                    lock1.lockInterruptibly();
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        System.out.println("interrupted in sleep");
                    }
                    lock2.lockInterruptibly();
                } else {
                    lock2.lockInterruptibly();
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        System.out.println("interrupted in sleep");
                    }
                    lock1.lockInterruptibly();
                }
            } catch (InterruptedException e) {
                System.out.println("interrupted in business");
            } finally {
                if (lock1.isHeldByCurrentThread()) {
                    lock1.unlock();
                }
                if (lock2.isHeldByCurrentThread()) {
                    lock2.unlock();
                }
                System.out.println(Thread.currentThread().getId() + ":线程退出");
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(new IntLock(1));
        Thread t2 = new Thread(new IntLock(2));
        t1.start();
        t2.start();
        Thread.sleep(1000);
        t2.interrupt();
    }
}

// interrupted in business
// 12:线程退出
// 11:线程退出

2.锁申请等待限时

public class Test {
    public static class TimeLock implements Runnable {

        public static ReentrantLock lock = new ReentrantLock();

        @Override
        public void run() {
            try {
                if (lock.tryLock(5, TimeUnit.SECONDS)) {
                    Thread.sleep(6000);
                } else {
                    System.out.println("get lock failed");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                if (lock.isHeldByCurrentThread()) {
                    lock.unlock();
                }
            }
        }
    }

    public static void main(String[] args) {
        TimeLock lock = new TimeLock();
        Thread t1 = new Thread(lock);
        Thread t2 = new Thread(lock);
        t1.start();
        t2.start();
    }
}

// get lock failed
public class Test {
    public static class TryLock implements Runnable {

        public static ReentrantLock lock1 = new ReentrantLock();
        public static ReentrantLock lock2 = new ReentrantLock();
        int lock;

        public TryLock(int lock) {
            this.lock = lock;
        }

        public void tryDoubleLock(ReentrantLock firstLock,
                                  ReentrantLock secondLock,
                                  String param, Consumer<String> consumer) {
            while (true) {
                if (firstLock.tryLock()) {
                    try {
                        if (secondLock.tryLock()) {
                            try {
                                consumer.accept(param);
                                return;
                            } finally {
                                secondLock.unlock();
                            }
                        }
                    } finally {
                        firstLock.unlock();
                    }
                }
            }
        }

        @Override
        public void run() {
            String param = ":My Job done";
            Consumer<String> consumer = str ->
                    System.out.println(Thread.currentThread().getId() + str);
            if (lock == 1) {
                tryDoubleLock(lock1, lock2, param, consumer);
            } else {
                tryDoubleLock(lock2, lock1, param, consumer);
            }
        }
    }

    public static void main(String[] args) {
        Thread t1 = new Thread(new TryLock(1));
        Thread t2 = new Thread(new TryLock(2));
        t1.start();
        t2.start();
    }
}

// 11:My Job done
// 10:My Job done

3.公平锁

public ReentrantLock(boolean fair)
public class Test {
    public static class FairLock implements Runnable {

        public static ReentrantLock fairLock = new ReentrantLock(true);

        @Override
        public void run() {
            while (true) {
                try {
                    fairLock.lock();
                    System.out.println(Thread.currentThread().getName() + "获得锁");
                } finally {
                    fairLock.unlock();
                }
            }
        }
    }

    public static void main(String[] args) {
        FairLock r1 = new FairLock();
        Thread t1 = new Thread(r1, "Thread_t1");
        Thread t2 = new Thread(r1, "Thread_t2");
        t1.start();
        t2.start();
    }
}

// 程序运行一段时间(稳定后)的部分输出:
// Thread_t1获得锁
// Thread_t2获得锁
// Thread_t1获得锁
// Thread_t2获得锁
// Thread_t1获得锁
// Thread_t2获得锁
// Thread_t1获得锁
// Thread_t2获得锁

3.总结

  1. lock(): 获得锁,如果锁已经被占用,则等待。
  2. lockInterruptibly(): 获得锁,但优先响应中断。
  3. tryLock(): 尝试获得锁,如果成功返回true,失败返回false。该方法不等待,立即返回。
  4. tryLock(long time, TimeUnit unit): 在给定的时间内尝试获得锁。
  5. unlock(): 释放锁。
  1. 原子状态。原子状态使用CAS操作(CompareAndSwap)来存储当前锁的状态,判断锁是否已经被别的线程持有。
  2. 等待队列。所有没有请求到锁的线程,会进入等待队列进行等待。待有线程释放锁后,系统就能从等待队列中唤醒一个线程,继续工作。
  3. 阻塞原语park()和unpark()。这两个方法用来挂起和恢复线程。没有得到锁的线程将会被挂起。有关park()和unpark()可参考线程阻塞工具类LockSupport。

2.重入锁的好搭档:Condition条件

1.简介

2.方法

public class Test {

    public static class ReenterLockCondition implements Runnable {

        public static ReentrantLock lock = new ReentrantLock();
        public static Condition condition = lock.newCondition();

        @Override
        public void run() {
            try {
                lock.lock(); // 申请并获得锁
                condition.await();  // 释放锁
                                    // 收到signal信号后重新申请锁、获得锁
                System.out.println("thread is going on");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock(); // 释放锁
            }
        }
    }

    public static void main(String[] args) throws Exception {
        new Thread(new ReenterLockCondition()).start();
        Thread.sleep(2000);
        ReenterLockCondition.lock.lock(); // 获得锁
        ReenterLockCondition.condition.signal(); // signal通知
        ReenterLockCondition.lock.unlock(); // 释放锁,谦让给被唤醒的线程
    }
}

// thread is going on

3.允许多个线程同时访问:信号量(Semaphore)

1.简介

public Semaphore(int permits) // 参数为信号量的准入数
public Semaphore(int permits, boolean fair) // 第二个参数可以指定是否公平

2.方法

public class Test {

    public static class SemaphoreDemo implements Runnable {

        final Semaphore semaphore = new Semaphore(5);

        @Override
        public void run() {
            try {
                semaphore.acquire();
                Thread.sleep(2000);
                System.out.println(System.currentTimeMillis() + "-" +
                        Thread.currentThread().getId() + ":done!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newFixedThreadPool(20);
        final SemaphoreDemo demo = new SemaphoreDemo();
        for (int i = 0; i < 20; i++) {
            exec.submit(demo);
        }
    }
}

// 从输出可以观察出系统以5个线程一组为单位,依次进行输出

4.读写分离锁:ReadWriteLock

-
非阻塞 阻塞
阻塞 阻塞
public class Test {

    private static Lock lock = new ReentrantLock();
    private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private static Lock readLock = readWriteLock.readLock();
    private static Lock writeLock = readWriteLock.writeLock();
    private static CountDownLatch latch = new CountDownLatch(20);
    private int value;

    public Object handleRead(Lock lock) throws InterruptedException {
        try {
            lock.lock(); // 模拟读操作
            Thread.sleep(1000); // 读操作的耗时越多,读写锁的优势就越明显
            return value;
        } finally {
            latch.countDown();
            lock.unlock();
        }
    }

    public void handleWrite(Lock lock, int index) throws InterruptedException {
        try {
            lock.lock(); // 模拟写操作
            Thread.sleep(1000);
            value = index;
        } finally {
            latch.countDown();
            lock.unlock();
        }
    }

    public static void main(String[] args) throws Exception {
        final Test demo = new Test();
        Runnable readRunnable = () -> {
            try {
                demo.handleRead(readLock);
                //demo.handleRead(lock); // 使用重入锁而不是读写锁
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        Runnable writeRunnable = () -> {
            try {
                demo.handleWrite(writeLock, new Random().nextInt());
                //demo.handleRead(lock); // 使用重入锁而不是读写锁
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        long startTime = System.currentTimeMillis();
        for (int i = 0; i <= 18; i++) {
            new Thread(readRunnable).start();
        }
        for (int i = 19; i < 20; i++) {
            new Thread(writeRunnable).start();
        }
        latch.await();
        System.out.println("used time: " + (System.currentTimeMillis() - startTime));
    }
}

// 使用读写锁
// used time: 2022
// 使用重入锁
// used time: 20009

5.倒计数器:CountDownLatch

// 构造函数接收一个整数作为参数,即当前这个倒计时器的计数个数
public CountDownLatch(int count)

// 计数减一
public void countDown()

// 使线程等待,直到倒计数完成
public void await()

6.循环栅栏:CyclicBarrier

// parties:计数总数,也就是参与的线程总数
// barrierAction:当计数器完成一次计数后,系统要执行的动作
public CyclicBarrier(int parties, Runnable barrierAction)

// 等待一次计数完成
// 注意抛出的两种异常(见下一节的代码)
public int await() throws InterruptedException, BrokenBarrierException
public class Test {

    private static final int SOLDIER_NUM = 10;
    private volatile static boolean allAssembledFlag = false;
    private static final Runnable barrierRun = () -> {
        if (allAssembledFlag) {
            System.out.println("司令:[士兵" + SOLDIER_NUM + "个,任务完成!]");
        } else {
            System.out.println("司令:[士兵" + SOLDIER_NUM + "个,集合完毕!]");
        }
    };

    public static class Soldier implements Runnable {
        private final CyclicBarrier cyclic;
        private String soldierName;

        Soldier(CyclicBarrier cyclic, String soldierName) {
            this.cyclic = cyclic;
            this.soldierName = soldierName;
        }

        void doWork() {
            try {
                Thread.sleep(Math.abs(new Random().nextInt() % 10000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(soldierName + ":任务完成");
        }

        @Override
        public void run() {
            try {
                // 等待所有士兵到齐
                cyclic.await();
                allAssembledFlag = true;
                doWork();
                // 等待所有士兵完成工作
                cyclic.await();
            } catch (InterruptedException e) {
                System.out.println("线程等待时被中断——需要执行响应外部紧急事件的逻辑");
            } catch (BrokenBarrierException e) {
                System.out.println("当前的CyclicBarrier已经被损坏," +
                        "系统可能已经无法等待所有线程到齐——需要执行使等待线程就地解散的逻辑");
                // 例如当前循环栅栏的计数总数为10,若有一个线程被中断,
                // 那我们就会得到1个InterruptedException和9个BrokenBarrierException
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Thread[] allSoldier = new Thread[SOLDIER_NUM];
        CyclicBarrier cyclic = new CyclicBarrier(SOLDIER_NUM, barrierRun);
        // 设置屏障点,主要是为了执行这个方法
        System.out.println("集合队伍!");
        for (int i = 0; i < SOLDIER_NUM; ++i) {
            System.out.println("士兵 " + i + " 报道!");
            allSoldier[i] = new Thread(new Soldier(cyclic, "士兵 " + i));
            allSoldier[i].start();
        }
    }

}

// 集合队伍!
// 士兵 0 报道!
// 士兵 1 报道!
// 士兵 2 报道!
// 士兵 3 报道!
// 士兵 4 报道!
// 士兵 5 报道!
// 士兵 6 报道!
// 士兵 7 报道!
// 士兵 8 报道!
// 士兵 9 报道!
// 司令:[士兵10个,集合完毕!]
// 士兵 3:任务完成
// 士兵 5:任务完成
// 士兵 9:任务完成
// 士兵 7:任务完成
// 士兵 4:任务完成
// 士兵 1:任务完成
// 士兵 8:任务完成
// 士兵 0:任务完成
// 士兵 6:任务完成
// 士兵 2:任务完成
// 司令:[士兵10个,任务完成!]

7.线程阻塞工具类:LockSupport

  1. 和Thread.suspend()相比,它弥补了由于resume()在前发生,导致线程无法继续执行的情况。
  2. 和Object.wait()相比,它不需要先获得某个对象的锁,也不会抛出InterruptedException异常。
public class Test {

    private static final Object u = new Object();

    public static class ChangeObjectThread extends Thread {
        public ChangeObjectThread(String name) {
            super.setName(name);
        }

        @Override
        public void run() {
            synchronized (u) {
                try {
                    System.out.println("in " + getName());
                    Thread.sleep(1000);
                    LockSupport.park();
                    System.out.println("out " + getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {
        ChangeObjectThread t1 = new ChangeObjectThread("t1");
        ChangeObjectThread t2 = new ChangeObjectThread("t2");
        t1.start();
        t2.start();
        LockSupport.unpark(t1);
        LockSupport.unpark(t2);
        t1.join();
        t2.join();
    }

}

// in t1
// out t1
// in t2
// out t2
public class Test {

    private static final Object u = new Object();

    public static class ChangeObjectThread extends Thread {
        public ChangeObjectThread(String name) {
            super.setName(name);
        }

        @Override
        public void run() {
            synchronized (u) {
                System.out.println("in " + getName());
                LockSupport.park();
                if (Thread.interrupted()) {
                    System.out.println(getName() + " 被中断了");
                }
                System.out.println("out " + getName());
            }
        }
    }

    public static void main(String[] args) throws Exception {
        ChangeObjectThread t1 = new ChangeObjectThread("t1");
        ChangeObjectThread t2 = new ChangeObjectThread("t2");
        t1.start();
        Thread.sleep(100);
        t2.start();
        t1.interrupt();
        LockSupport.unpark(t2);
        t1.join();
        t2.join();
    }

}

// in t1
// t1 被中断了
// out t1
// in t2
// out t2

end

上一篇下一篇

猜你喜欢

热点阅读