第三章 JDK并发包

2018-03-06  本文已影响0人  icelovesummer

主要三部分:

  1. 同步控制工具
  2. 线程池
  3. 并发容器

1. 同步控制

1.1 重入锁(sychronized,wait和notify的替代品,增强版)

public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();

int lock;
public IntLock(int lock) {
    this.lock = lock;
}
@Override
public void run() {
    try {
        if(lock == 1) {
            lock1.lockInterruptibly();
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            lock2.lockInterruptibly();
        }else {
            lock2.lockInterruptibly();
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            lock1.lockInterruptibly();
        }
    }catch (InterruptedException e) {
        e.printStackTrace();
    }finally {
        if(lock1.isHeldByCurrentThread()) {
            lock1.unlock();
        }
        if(lock2.isHeldByCurrentThread()) {
            lock2.unlock();
        }
        System.out.println("线程退出"+Thread.currentThread().getId());
    }
    
}

public static void main(String[] args) {
    IntLock r1 = new IntLock(1);
    IntLock r2 = new IntLock(2);
    
    Thread t1 = new Thread(r1);
    Thread t2 = new Thread(r2);
    t1.start();t2.start();
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    t2.interrupt();
}
public class TimeLock implements Runnable{
    static ReentrantLock lock = new ReentrantLock();
    @Override
    public void run() {
        try {
            if(lock.tryLock(5, TimeUnit.SECONDS)) {
                Thread.sleep(6000);
            }else {
                System.out.println("申请锁失败,超时");
            }
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally {
            if(lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }
    
    public static void main(String[] args) {
        Thread t1 = new Thread(new TimeLock());
        Thread t2 = new Thread(new TimeLock());
        t1.start();t2.start();
    }
}
public ReentrantLock(boolean fair)

ReentrantLock的几个重要方法:

1.2 重入锁的好搭档:Condition

ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();

void await()//使当前线程等待,同时释放锁,可以响应中断
void awaitUninterruptibly()//不能被中断
long awaitNanos(long nanosTimeout)
boolean await(long time, TimeUnit unit)
boolean awaitUntil(Date deadline)
void signal()//唤醒一个等待的线程
void signalAll()//唤醒所有线程

1.3 允许多个线程同时访问:信号量(Semaphore)

//构造函数
public Semaphore(int permits)
public Semaphore(int permits, boolean fair)//公平
//方法
public void acquire()//尝试获得准入许可,若无法获得,则等待,可以被中断
public void acquireUninterruptibly()//无法被中断
public boolean tryAcquire()//尝试获得准入许可,不等待
public boolean tryAcquire(long timeout, TimeUnit unit)
public void release()//释放许可

1.4 ReadWriteLock读写锁

static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock readLock = readWriteLock.readLock();
static Lock writeLock = readWriteLock.writeLock();

//调用时传入相应的锁
public int handleRead(Lock lock) {
    try{
        lock.lock();
        //read something
        return 0;
    }finally {
        lock.unlock();
    }
}

public int handlerite(Lock lock) {
    try{
        lock.lock();
        //write something
        return 0;
    }finally {
        lock.unlock();
    }
}

1.5 倒计时器:CountDownLatch

//构造函数
public CountDownLatch(int count)
static CountDownLatch end = new CountDownLatch(10);

Runnable r = new Runnable() {
    public void run() {
        //do domething
        end.countDown();    //通知计数器减一
    }
};

public static void main(String[] args) throws InterruptedException {
    for (int i = 0; i < 10; i++) {
        //创建10个线程
    }
    end.await(); //主线程等待计数器归零
    //继续执行
}

1.6 循环栅栏:CyclicBarrier

1. 默认的构造方法是
CyclicBarrier(int parties)
其参数表示屏障拦截的线程数量,每个线程调用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
2. 带参构造方法
CyclicBarrier(int parties, Runnable barrierAction)
完成一次计数后,执行指定动作。

1.7 线程阻塞工具类:LockSupport

static park():静态方法,阻当前线程
static parkNanos()
static parkUntil()
static unpark(Thread thread)

1.8 线程间交换数据:Exchanger

public class ExchangerTest {
    private static final Exchanger<String> exgr = new Exchanger<String>();
    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String A = "银行流水A";// A录入银行流水数据
                    System.out.println("1号:" + exgr.exchange(A));
                } catch (InterruptedException e) {
                }
            }
        });

        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String B = "银行流水B";// A录入银行流水数据
                    System.out.println("2号:" + exgr.exchange(B));
                } catch (InterruptedException e) {
                }
            }
        });

        threadPool.shutdown();
    }
}
//输出:
//1号:银行流水B
//2号:银行流水A

2. 线程复用:线程池

2.1 什么是线程池?

线程池预先创建一些长期保持激活状态的线程对象,当需要使用线程时,从线程池中拿出一个线程对象直接使用,使用完毕后将该线程归还给线程池,从而免去线程对象创建和销毁的成本。


线程池

2.2 JDK线程池

JDK提供了一套Executor框架,本质就是一个线程池。


Executor框架结构图

【几种线程池】

  1. newFixedThreadPool:固定数量。当一个任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有空闲线程,便处理任务队列中的任务。
  2. newSingleThreadExecutor:数量为1。
  3. newCachedThreadPool:数量不确定。若有空闲线程,则优先使用空闲线程。若没有,则创建新的线程。任务完毕后,线程归还线程池。
  4. newSingleThreadScheduledExecutor:
  5. newScheduledThreadPool:与上面一个一样,返回一个ScheduledExecutorService对象,扩展了在给点时间执行某任务的功能。上面数量为1,下面数量可以指定。
//固定数量的线程池,同时只能处理5个任务,所以隔两秒打印5句。
public class ThreadPoolDemo {
    public static void main(String[] args) {
        Runnable r1 = ()->{
            System.out.println(System.currentTimeMillis()+":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(2000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        };
    
        ExecutorService es = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            es.submit(r1);
        }
    }
}

newScheduledThreadPool这个方法可以根据时间需要对线程进程调度

schedule(Runnable command, long delay, TimeUnit unit)
scheduleAtFixedRate(Runnable command, long initDelay, long period, TimeUnit unit)
scheduleWithFixedRate(Runnable command, long initDelay, long period, TimeUnit unit)
public class ScheduledExecutorServiceDemo {
    public static void main(String[] args) {
        Runnable r1 = ()->{
            try {
                Thread.sleep(3000);
                System.out.println(System.currentTimeMillis()/1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        
        ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
        ses.scheduleAtFixedRate(r1, 0, 2, TimeUnit.SECONDS);
    }
}
线程池核心实现
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

//核心ThreadPoolExecutor的一个构造函数
public ThreadPoolExecutor(int corePoolSize,    //指定线程池线程数量
                          int maximumPoolSize,    //最大线程数量
                          long keepAliveTime,  //线程数量超过corePoolSize时,空闲线程存货时间
                          TimeUnit unit,  
                          BlockingQueue<Runnable> workQueue,  //任务队列,被提交但是尚未被执行的任务如何存放
                          ThreadFactory threadFactory,   //线程过程,用来创建线程,一般默认
                          RejectedExecutionHandler handler)  //拒绝策略,任务太多如何拒绝

【任务队列】

  1. 直接提交队列:SychronousQueue对象实现,这个队列没有容量。总是将新任务交给线程执行,如果没有空闲线程,则尝试创建,如线程数量达到最大,则执行拒绝策略。所以要配合很大的maximumPoolSize使用。
  2. 有界任务队列:ArrayBlockingQueue对象实现,构造时要加上队列最大容量。逻辑与下图一样。
  3. 无界任务队列:LinkedBlockingQueue对象实现,除非系统资源耗尽,否则入队总可以成功。
  4. 优先任务队列:PriorityBlockingQueue对象实现,队列中的任务不是按照先进先出提交,有一个优先级。
ThreadPoolExecutor任务调度逻辑

【为什么先让任务进入队列,而不是直接创建线程?】

【拒绝策略】

  1. AbortPolicy:直接抛出异常,阻止系统正常执行。
  2. CallerRunsPolicy:只要线程池未关闭,直接在调用者线程中执行任务。任务提交线程性能可能会急剧下降。
  3. DiscardOldestPolicy:丢弃最老任务,就是即将被执行的下个任务,并且重新提交当前任务。
  4. DiscardPolicy:丢弃无法处理的任务,不予任何处理。
//自定义线程factory和拒绝策略
ExecutorService es = new ThreadPoolExecutor(5, 5, 
                    0L, TimeUnit.SECONDS, 
                    new LinkedBlockingQueue<Runnable>(10),
                    new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            Thread t = new Thread(r);
                            t.setDaemon(true);
                            return null;
                        }
                    },
                    new RejectedExecutionHandler() {
                        @Override
                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                            System.out.println(r.toString()+" is discard");
                        }
                    });

【扩展线程池】

public class ExtThreadPool {
    public static void main(String[] args) {
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MICROSECONDS, 
                    new LinkedBlockingQueue<Runnable>()) {
            
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备执行:"+((Mytask)r).name);
            }
            @Override
            protected void afterExecute(Runnable r,Throwable t) {
                System.out.println("执行完成:"+((Mytask)r).name);
            }
            @Override
            protected void terminated() {
                System.out.println("线程池退出");
            }
        };
        
        for (int i = 0; i < 5; i++) {
            Mytask mytask = new Mytask("线程"+i);
            es.execute(mytask);
        }
        es.shutdown();//关闭线程池,如果还有线程在运行,则等待,相当于只发送了一个关闭信号。
    }
}
class Mytask implements Runnable{
    public String name;
    public Mytask(String name) {
        this.name = name;
    }
    @Override
    public void run() {
        System.out.println("正在执行:Thread ID:"+Thread.currentThread().getId()+",taskName = "+name);
    }
}

【注意】

2.3 fork/join框架
执行逻辑
public class CountTask extends RecursiveTask<Long> {
    private static final int THRESHOLD = 10000;
    private long start;
    private long end;
    public CountTask(long start, long end) {
        this.start = start;
        this.end = end;
    }
    @Override
    protected Long compute() {
        long sum = 0;
        if(end - start < THRESHOLD) {
            for(long i = start; i <= end; i++) {
                sum += i;
            }
        }else {
            long step = (start + end) / 100;
            ArrayList<CountTask> subTasks = new ArrayList<>();
            long pos = start;
            for(int i = 0; i < 100; i++) {
                long lastOne = pos + step;
                if(lastOne > end)lastOne = end;
                CountTask subTask = new CountTask(pos, lastOne);
                pos += step + 1;
                subTasks.add(subTask);
                subTask.fork();
            }
            for(CountTask task : subTasks) {
                sum += task.join();
            }
        }
        return sum;
    }
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        CountTask task = new CountTask(0, 200000L);
        ForkJoinTask<Long> result = forkJoinPool.submit(task);
        System.out.println(result.get());
    }
}

3. JDK并发容器

3.1 几种并发容器简介

另外可以用Collections工具包装成线程安全。

//内部其实就是和HashMap,通过一个对象锁进行同步,效率堪忧。
Map map = Collections.synchronizedMap(new HashMap<>());

3.2 高效读取:不变模式下的CopyOnWriteArrayList

3.3 数据共享通道:BlockingQueue

例如:ArrayBlockingQueue,take()方法和put()方法,在空和满时会等待

final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();//take操作时,如果队列为空,则让当前线程等待在notEmpty上,新元素入队时,进行一次notEmpty的唤醒。
    } finally {
        lock.unlock();
    }
}

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)  //队列满,入队线程等待
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();  //插入元素,不空了,通知等待取元素的线程。
}

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();  //元素被挪走时,出现空位,通知等待入队的线程
    return x;
}
上一篇 下一篇

猜你喜欢

热点阅读