android面试录JavaAndroid开发经验谈

关于Java多线程的一些常考知识点

2017-12-11  本文已影响1850人  cmazxiaoma

前言

Java多线程也是面试中经常会提起到的一个点。面试官会问:实现多线程的两种方式以及区别,死锁发生的4个条件以及如何避免发生死锁,死锁和活锁的区别,常见的线程池以及区别,怎么理解有界队列与无界队列,多线程生产者消费者模型,怎么设计一个线程池,线程池的大致实现,ReetrantLockSynchronizedReadWriteLock的源码和区别、具体业务场景分析等等。

ashin_is_handsome.jpg

生产者消费者模型

其实生产者消费者模型挺像观察者模式的,对于该模型我们应该明确以下4点:

wait()和notify()实现
public class Producter implements Runnable {

    private Storage resource;

    public Producter(Storage resource) {
        super();
        this.resource = resource;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            resource.produce();
        }
    }
}
public class Consumer implements Runnable {

    private Storage resource;

    public Consumer(Storage resource) {
        super();
        this.resource = resource;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource.cosume();
        }
    }
}

public class Storage {

    private int MAX_SIZE = 20;

    private int count = 0;

    public synchronized void cosume() {
        while (count == 0) {
            try {
                System.out.println("【消费者】库存已经为空了,暂时不能进行消费任务!");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        count--;
        System.out.println("【消费者】" + Thread.currentThread().getName() + "消费产品, 库存:" + count);
        this.notify();
    }

    public synchronized void produce() {
        while (count >= MAX_SIZE) {
            try {
                System.out.println("【生产者】库存已经满了,暂时不能进行生产任务!");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        count++;
        System.out.println("【生产者】" + Thread.currentThread().getName() + "生产产品, 库存" + count);
        this.notify();
    }
}

public class ProducterCosumerXDemo {

    public static void main(String[] args) {
        Storage storage = new Storage();
        ExecutorService service = Executors.newCachedThreadPool();

        for (int i = 0; i < 5; i++) {
            service.submit(new Producter(storage));
        }

        for (int i = 0; i < 5; i++) {
            service.submit(new Consumer(storage));
        }

        service.shutdown();
    }
}
image.png
image.png
image.png

我们这里创建了5个生产者,5个消费者。生产者生产的速度比消费者消费的速度要快,从图中很明显看到生产者率先生产出20个产品,已是库存极大值,往后不能再去生产了,然后通知消费者去消费。

用BlockingQueue实现

BlockingQueue是一个阻塞队列,也是面试官常喜欢问的一个点。BlockingQueue是线程安全的,内部可以自动调用wait()notify()方法。在多线程环境下,我们可以使用BlockingQueue去完成数据的共享,同时可以兼顾到效率和线程安全。

如果生产者生产商品的速度远大于消费者消费的速度,并且生产的商品累积到一定的数量,已经超过了BlockingQueue的最大容量,那么生产者就会被阻塞。那为什么时候撤销生产者的阻塞呢?只有消费者开始消费累积的商品,且累积的商品数量小于BlockingQueue的最大容量,才能撤销生产者的阻塞。

如果库存为0的话,消费者自动被阻塞。只有生产者生产出商品,才会撤销消费者的阻塞。

public class Storage {

    private BlockingQueue<Product> queues = new LinkedBlockingQueue<Product>(10);

    public void push(Product p) throws InterruptedException {
        queues.put(p);
    }

    public Product pop() throws InterruptedException {
        return queues.take();
    }
}
public class Product {
    private int id;

    public static int MAX = 20;

    public Product(int id) {
        super();
        this.id = id;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }
}
public class Producer implements Runnable {

    private String name;
    private Storage storage = null;

    public Producer(String name, Storage storage) {
        this.name = name;
        this.storage = storage;
    }

    @Override
    public void run() {
        int i = 0;
        try {
            while (true) {
                System.out.println(name + "已经生产一个: id为" + i + "的商品");
                System.out.println("===========================");

                Product product = new Product(i++);

                storage.push(product);

                Thread.sleep(100);
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class Consumer implements Runnable {

    private String name;
    private Storage storage = null;

    public Consumer(String name, Storage s) {
        this.name = name;
        this.storage = s;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Product product = storage.pop();

                System.out.println(name + "已经消费一个: id为" + product.getId() + "的商品");
                System.out.println("===========================");
                Thread.sleep(500);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class ProducterConsumerDemo {

    public static void main(String[] args) {
        Storage storage = new Storage();

        ExecutorService service = Executors.newCachedThreadPool();
        Producer p0 = new Producer("腾讯", storage);
        Consumer c0 = new Consumer("cmazxiaoma", storage);
        
        service.submit(p0);
        service.submit(c0);

        service.shutdown();

    }
}
image.png image.png

我们可以清晰看到在"腾讯已经生产一个:id为13的商品"之前,生产者是随便的生产,消费者是随便的消费,生产者的速度远远大于消费者的速度。然而在"腾讯已经生产一个:id为13的商品"之后,累积的商品数量已经要到达BlockingQueue的最大容量10了。此时生产者已经被阻塞了,已经不能再生产了,只有当消费者开始产品时,才能唤醒生产者。所以之后的打印内容就很有规律了:腾讯一生产商品,cmazxiaoma就开始消费。

从这个例子中,我们可以看到BlockingQueue通过平衡生产者和消费者的处理能力,因此提高了整体处理数据的速度。


死锁和活锁


死锁产生的原因以及四个必要条件

public class DeadLockDemo {
    public static String obj1 = "obj1";
    public static String obj2 = "obj2";

    public static void main(String[] args) {
        Thread a = new Thread(new LockThread1());
        Thread b = new Thread(new LockThread2());
        a.start();
        b.start();
    }
}

class LockThread1 implements Runnable {

    @Override
    public void run() {

        System.out.println("lockThread1 running");
        while (true) {
            synchronized (DeadLockDemo.obj1) {
                System.out.println("lockThrea1 lock obj1");
                try {
                    Thread.sleep(3000);
                    synchronized (DeadLockDemo.obj2) {
                        System.out.println("lockThrea1 lock obj2");
                    }
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }
}

class LockThread2 implements Runnable {

    @Override
    public void run() {

        System.out.println("lockThread2 running");
        while (true) {
            synchronized (DeadLockDemo.obj2) {
                System.out.println("lockThrea2 lock obj2");
                try {
                    Thread.sleep(3000);
                    synchronized (DeadLockDemo.obj1) {
                        System.out.println("lockThrea2 lock obj1");
                    }
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }
}
image.png

很明显,发生了死锁现象,美滋滋。


线程池

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default thread factory.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

通过阅读注释,我们可以知道参数的含义:

    /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
    /**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
   /**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
}
 /**
     * A handler for rejected tasks that runs the rejected task
     * directly in the calling thread of the {@code execute} method,
     * unless the executor has been shut down, in which case the task
     * is discarded.
     */
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
    /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     * Calls to {@code execute} will reuse previously constructed
     * threads if available. If no existing thread is available, a new
     * thread will be created and added to the pool. Threads that have
     * not been used for sixty seconds are terminated and removed from
     * the cache. Thus, a pool that remains idle for long enough will
     * not consume any resources. Note that pools with similar
     * properties but different details (for example, timeout parameters)
     * may be created using {@link ThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    /**
     * Creates a new {@code ScheduledThreadPoolExecutor} with the
     * given core pool size.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

    /**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue. (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * {@code newFixedThreadPool(1)} the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     *
     * @return the newly created single-threaded Executor
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

怎么理解无界队列和有界队列

ArrayBlockingQueue就是一个有界队列,而LinkedBlockingQueueSynchronousQueue则是无界队列。

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

    /**
     * Shared state for currently active iterators, or null if there
     * are known not to be any.  Allows queue operations to update
     * iterator state.
     */
    transient Itrs itrs = null;

无界队列和有界队列其实会给ThreadPoolExecutor造成一定的影响。


ReetrantLock、Synchronized、ReadWriteLock

Lockjdk1.5引进的,ReetrantLockjdk1.6引进的。上一次二面机器人公司,就是挂在这里了。面试官让我说说它们的源码实现以及需求场景。

使用场景:


参考文章


尾言

书山有路勤为径,学海无涯苦作舟。

上一篇 下一篇

猜你喜欢

热点阅读