多线程开发编程

2018-08-03  本文已影响0人  换个昵称好难

带着目的性去学习源码

并行与并发
并行线程
public void  Thread.interrupt();
public boolean Thread.isInterrupted();
public static boolean Thread.interrupted();  // 判断是否中断并清除当前中断状态

使用interrupt同样可以结束线程但是效果更加的强劲。如果线程在sleep期间被中断则会运行期异常InterruptedException如果这时捕获到异常则需开始处理。

  public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread() {
            @Override
            public void run() {
                while (true) {
                    if (Thread.currentThread().isInterrupted()) {
                        System.out.println("thread break");
                        break;
                    }
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        System.out.println("interrupted when sleep");
                        //再次中断自己,表示退出。置上标志位
                        Thread.currentThread().interrupt();
                    }
                    Thread.yield();
                }
            }
        };
        thread.start();
        Thread.sleep(2000);
        thread.interrupt();
    }
并发包

1 锁
1.1 重入锁ReetrantLock
对于同一个线程可以多次进入同一个锁而不会产生死锁的问题。但是同时lock几次也要释放几次不然可能造成死锁

Condition
Semaphore
读写锁ReadWriteLock
CountDownLatch

1.2 线程池

public void createPool(){
        BlockingDeque<Object> taskQueue = new LinkedBlockingDeque<>(50);
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10,100,10L,TimeUnit.SECORD,taskQueue, new TaskRejectedExecutionHandler());
}
//  饱和策略 
class TaskRejectedExecutionHandler implements RejectedExecutionHandler{
        void rejectedExecution(Runnable r, ThreadPoolExecutor executor){ ... }
}

线程池参数

    corePoolSize:线程池核心数量
    workQueue:任务队列,被提交但是尚未执行。
              `ArrayBlockingQueue`:基于数组结构的有界阻塞队列FIFO;
              `LinkedBlockingQueue`: 基于链表结构的无界阻塞队列FIFO,若线程增加的速度和处理的速度相差过大则队列会一直保持增长直至内存消耗过尽,如静态工厂方法Executors.newFixedThreadPool() ;
              `SynchronizedQueue`:不存储元素的阻塞队列每个插入操作都必须等待一个移除操作,它不会涉及任务的真实保存如果没有线程了则尝试创建线程如果已经达到最大容量则拒绝,所以它的maximumPoolSize必须设置大一点。Executors.newCachedThreadPool() ;
              `PriorityBlockingQueue`:具有优先级别的无界阻塞队列
    maximumPoolSize:最大线程数
    keepAliveTime:超过corePoolSize的线程或者`allowCoreThreadTimeOut`为true的主线程使用这个作为超时时间。
    threadFactory:设置线程池中线程的参数比如name辨识启动线程,daemo守护主线程一旦主线程退出则线程池停止。
    handler:拒绝策略
    核心线程 & 最大线程数:

线程池创建和关闭

线程池的创建不能使用Executors去创建而是通过ThreadPoolExecutor的方式。Executors的各个方面的弊端如下:
1.`newFixedThreadPool `,提供固定数量的线程池,使用的队列是无界阻塞LinkedBlockingQueue<>()默认Integer_MAX当提交频繁队列可能迅速膨胀从而消耗资源。
2.`newCacheThreadPool` ,new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>()); 它的corePoolSize大小为0,maximumPoolSize无限大当有线程提交直接进入SynchronousQueue队列,直接提交给线程池创建新的线程执行任务。当有大量任务被提交时线程池就会不断的创建新的线程处理那么线程池的资源就会被消耗。
3.`newSingleThreadExecutor`  ,单一把线程池corePoolSize和miximumPoolSize大小设置为1。
4. newScheduledThreadPool` ,

线程池的关闭

线程池实现步骤

  1. 当前线程数小于 corePoolSize创建新的线程去处理任务。
  2. 到达核心线程corePoolSize则加入阻塞队列 BlockingQueue<Runable>
  3. 阻塞队列满了则创建新的线程处理任务。
  4. 运行线程大于maximumPoolSize则实行拒绝策略。

线程池的选择策略

corePoolSize = Ncpu * Ucup(100%) * (1+ w/c ) ; 
        w/c == 阻塞时间 / 计算时间

IO密集型线程池:如果存在IO则阻塞时间大于计算时间w/c>1,但是考虑到系统内存有限每次开启一个线程都需要内存空间,保守取1就是2Ncpu。
计算密集型的线程池:假设没有等待,则W/C = 0即 Nthreads = Ncpu.

线程池底层原理
**** 你的思维习惯正在限制你的成长

 if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
          1.如果运行的线程少于corePoolSize,尝试开启一个新线程去运行command,command作为这个线程的第一个任务.
          2.如果任务成功放入队列,我们仍需要一个双重校验去确认是否应该新建一个线程(因为可能存在有些线程在我们上次检查后死了) 或者 从我们进入这个方法后,pool被关闭了所以我们需要再次检查state,如果线程池停止了需要回滚入队列,如果池中没有线程了,新开启 一个线程
          3.如果无法将任务入队列(可能队列满了),需要新开区一个线程(自己:往maxPoolSize发展)如果失败了,说明线程池shutdown 或者 饱和了,所以我们拒绝任务
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);

1.3 JDK并发容器

public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                if (p.casNext(null, newNode)) {  
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
            }
            else if (p == q)
                p = (t != (t = tail)) ? t : head;
            else
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }
锁的优化

使用锁的建议

JVM对锁的优化

另一种"锁" -- ThreadLocal

无锁
无锁是一种乐观策略它会假设对资源的访问时没有冲突的,既然没有冲突就不需要等待所以所有的线程都可以在不停顿的状态下持续执行。如果遇到冲突就会Compare And Swap比较然后交换。CAS主要包含三个参数CAS(V,E,N),V表示要更新的变量,E表示预期值,N表示新值。
以AtomicInteger为例

public final int incrementAndGet(){
         for( ; ; ){
                int current = get()'
                int next = current+1;
                // 在将新值next写入的时候当前值要等于刚刚取得的current
                if(compareAndSet(current,next))   return next;
          }
}

死锁

并行模型与算法

生产者与消费者

上一篇 下一篇

猜你喜欢

热点阅读