J.U.C 之AQS

2018-11-03  本文已影响0人  磊_5d71
图片.png

CountDownLatch

package com.alan.concurrency.example.aqs;


import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


@Slf4j
public class CountDownLatchExample1 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws InterruptedException {

        ExecutorService exec = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            {
                int threadNum = i;
                exec.execute(() -> {
                    try {
                        test(threadNum);
                    } catch (InterruptedException e) {
                        log.error("InterruptedException", e);
                    } finally {
                        countDownLatch.countDown();
                    }
                });
            }
        }

        //通过countDown()和await()能保证所有线程执行完成后,再调用log.info("finish")
        countDownLatch.await();
        log.info("finish");
        exec.shutdown();

    }

    public static void test(int threadNum) throws InterruptedException {
        Thread.sleep(100);
        log.info("{}",threadNum);
    }

}
package com.alan.concurrency.example.aqs;


import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


@Slf4j
public class CountDownLatchExample2 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws InterruptedException {

        ExecutorService exec = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            {
                int threadNum = i;
                exec.execute(() -> {
                    try {
                        test(threadNum);
                    } catch (InterruptedException e) {
                        log.error("InterruptedException", e);
                    } finally {
                        countDownLatch.countDown();
                    }
                });
            }
        }

        //通过countDown()和await()能保证所有线程执行完成后,再调用log.info("finish")
        //设置超时时间10毫秒
        countDownLatch.await(10,TimeUnit.MILLISECONDS);
        log.info("finish");
        //是先让当前线程任务都执行完成后,才进行shutdown操作
        exec.shutdown();

    }

    public static void test(int threadNum) throws InterruptedException {
        Thread.sleep(100);
        log.info("{}",threadNum);
    }

}

Semaphore 同步组件-信号量

package com.alan.concurrency.example.aqs;


import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;


@Slf4j
public class SemaphoreExample1 {

    private final static int threadCount = 200;

    //设置允许的并发数为20
    private final static Semaphore semaphore = new Semaphore(20);

    public static void main(String[] args) throws InterruptedException {

        ExecutorService exec = Executors.newCachedThreadPool();


        for (int i = 0; i < threadCount; i++) {
            {
                int threadNum = i;
                exec.execute(() -> {
                    try {
                        semaphore.acquire();  //获取一个许可
                        test(threadNum);
                        semaphore.release();  //释放一个许可
                    } catch (InterruptedException e) {
                        log.error("InterruptedException", e);
                    }
                });
            }
        }

        exec.shutdown();

    }

    public static void test(int threadNum) throws InterruptedException {
        log.info("{}",threadNum);
        Thread.sleep(1000);
    }

}
package com.alan.concurrency.example.aqs;


import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;


@Slf4j
public class SemaphoreExample1 {

    private final static int threadCount = 200;

    //设置允许的并发数为20
    private final static Semaphore semaphore = new Semaphore(20);

    public static void main(String[] args) throws InterruptedException {

        ExecutorService exec = Executors.newCachedThreadPool();


        for (int i = 0; i < threadCount; i++) {
            {
                int threadNum = i;
                exec.execute(() -> {
                    try {
                        semaphore.acquire(20);
                        test(threadNum);
                        semaphore.release(20);
                    } catch (InterruptedException e) {
                        log.error("InterruptedException", e);
                    }
                });
            }
        }

        exec.shutdown();

    }

    public static void test(int threadNum) throws InterruptedException {
        log.info("{}",threadNum);
        Thread.sleep(1000);
    }

}

CyclicBarrier

package com.alan.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;



@Slf4j
public class CyclicBarrierExample1 {


    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor= Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {

            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(()->{
                try {
                    race(threadNum);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }


    private static  void race(int threadNum) throws Exception{
        Thread.sleep(1000);
        log.info("{} is ready",threadNum);
        barrier.await();
        log.info("{} continue",threadNum);

    }

}

ReentrantLock 与锁

锁的细粒度和灵活度:很明显ReenTrantLock优于Synchronized

package com.alan.concurrency.example.lock;


import com.alan.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
@ThreadSafe
public class LockExample2 {


    //请求数1000
    public static int clientTotal = 5000;
    //同时并发执行的线程数
    public static int threadTotal = 200;

    public static int count = 0;


    //通过Lock接口实现
    private static Lock lock = new ReentrantLock();


    private  static void add(){

        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {

        //定义线程池ExecutorService接口
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定义信号量,传入并发线程数 final修饰不允许重新赋值
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定义计数器闭锁。传入请求总数
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal; i++) {
            //通过匿名内部类方式
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //semaphore控制并发数量
                        semaphore.acquire();
                        add();
                        semaphore.release();
                    } catch (InterruptedException e) {
                        log.error("exception",e);
                    }
                    //每次执行计数器减掉一个
                    countDownLatch.countDown();
                }

            });

        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}",count);
    }
}
package com.alan.concurrency.example.lock;


import com.alan.concurrency.annoations.ThreadSafe;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.Date;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

@Slf4j
public class LockExample3 {


    private final Map<String, Data> map = new TreeMap<>();

    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    //分别定义读锁和写锁

    private final Lock readLock = lock.readLock();

    private final Lock writeLock = lock.writeLock();

    public Data get(String key) {
        readLock.lock();
        try {
            return  map.get(key);
        } finally {
            readLock.unlock();
        }
    }

    public Set<String> getAllKeys(){
        readLock.lock();
        try {
            return  map.keySet();
        } finally {
            readLock.unlock();
        }
    }

    public Data put(String key, Data value){
        writeLock.lock();
        try {
            return  map.put(key,value);
        } finally {
            writeLock.unlock();
        }
    }
}

package com.alan.concurrency.example.lock;

import java.util.concurrent.locks.StampedLock;

public class LockExample4 {

    class Point {
        private double x, y;
        private final StampedLock sl = new StampedLock();

        void move(double deltaX, double deltaY) { // an exclusively locked method
            long stamp = sl.writeLock();
            try {
                x += deltaX;
                y += deltaY;
            } finally {
                sl.unlockWrite(stamp);
            }
        }

        //下面看看乐观读锁案例
        double distanceFromOrigin() { // A read-only method
            long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁
            double currentX = x, currentY = y;  //将两个字段读入本地局部变量
            if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生?
                stamp = sl.readLock();  //如果没有,我们再次获得一个读悲观锁
                try {
                    currentX = x; // 将两个字段读入本地局部变量
                    currentY = y; // 将两个字段读入本地局部变量
                } finally {
                    sl.unlockRead(stamp);
                }
            }
            return Math.sqrt(currentX * currentX + currentY * currentY);
        }

        //下面是悲观读锁案例
        void moveIfAtOrigin(double newX, double newY) { // upgrade
            // Could instead start with optimistic, not read mode
            long stamp = sl.readLock();
            try {
                while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合
                    long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁
                    if (ws != 0L) { //这是确认转为写锁是否成功
                        stamp = ws; //如果成功 替换票据
                        x = newX; //进行状态改变
                        y = newY;  //进行状态改变
                        break;
                    } else { //如果不能成功转换为写锁
                        sl.unlockRead(stamp);  //我们显式释放读锁
                        stamp = sl.writeLock();  //显式直接进行写锁 然后再通过循环再试
                    }
                }
            } finally {
                sl.unlock(stamp); //释放读锁或写锁
            }
        }
    }
}
package com.alan.concurrency.example.lock;

import com.alan.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.StampedLock;

@Slf4j
@ThreadSafe
public class LockExample5 {

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    public static int count = 0;

    private final static StampedLock lock = new StampedLock();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private static void add() {
        long stamp = lock.writeLock();
        try {
            count++;
        } finally {
            lock.unlock(stamp);
        }
    }
}
上一篇 下一篇

猜你喜欢

热点阅读