JUC

2020-07-21  本文已影响0人  Summer2077

1.开启多线程的方式:

同步代码块:就是被加上锁的那一段代码

2.sleep和wait之间的区别:

  1. 来自不同的类:

    ​ sleep来自Thread

    ​ wait来自Object

  2. 释放锁的问题:

    ​ sleep不会释放锁

    ​ wait会释放锁

  3. 使用的范围

    ​ wait必须使用在同步代码块中

    ​ sleep可以使用在任何地方

  4. 捕获异常

    ​ wait不必捕获异常

    ​ sleep必须捕获异常

3.Lock和synchronized:

synchronized:

卖ticket的例子sale

public class synchronizedTest {

    private int ticket = 1000;
    private boolean flag = true;

    public synchronized void  sale(){
        if (ticket>=0){
            System.out.println(Thread.currentThread().getName()+"===>买走了第"+ticket--+"张");
        }else {
            flag=false;
            System.out.println("票卖完了");
        }
    }


    public static void main(String[] args) {
        synchronizedTest synchronizedTest = new synchronizedTest();
        new Thread(()->{
            while (synchronizedTest.flag){
                synchronizedTest.sale();
            }
        },"A").start();
        new Thread(()->{
            while (synchronizedTest.flag){
                synchronizedTest.sale();
            }
        },"B").start();
        new Thread(()->{
            while (synchronizedTest.flag){
                synchronizedTest.sale();
            }
        },"C").start();
    }
}

Lock:

常用的实现类为ReentrantLock()

其实所有的锁都叫可重入锁。

public class LockTest {
    private int ticket = 1000;
    ReentrantLock lock = new ReentrantLock();
    public void sale(){
        lock.lock();
        try {
           if (ticket>0){
               System.out.println(Thread.currentThread().getName()+"===>买走了第"+ticket--+"张");
           }
        } finally {
            lock.unlock();
        }
    }
    public static void main(String[] args) {
        LockTest lockTest = new LockTest();

        new Thread(()->{
            for (int i = 0; i < 30; i++) {
                lockTest.sale();
            }
        }).start();

        new Thread(()->{
            for (int i = 0; i < 30; i++) {
                lockTest.sale();
            }
        }).start();

        new Thread(()->{
            for (int i = 0; i < 30; i++) {
                lockTest.sale();
            }
        }).start();
    }
}

Synchronized 和 Lock 的区别:

  1. Synchronized 是内置的java关键字,lock只是一个类
  2. synchronized是基于JVM层面的,Lock是基于JDK层面的
  3. Synchronized无法获取锁的状态,Lock可获取锁的状态
  4. synchronized是会自动上锁的,而lock需要自己手动释放锁,不然会死锁
  5. synchronized可重入锁,不可以中断,lock可重入锁,可中断
  6. synchronized适合少量代码同步问题,Lock适合大量同步代码同步的问题。
  7. lock锁可以更加细粒度的去控制整个程序。

4.生产者和消费者

synchronized版:

  1. 判断等待
  2. 业务
  3. 通知
public class PCSynchronized {

    private int number = 0;

    public synchronized void add() throws InterruptedException {
            if (number!=0){
                wait();
            }
            System.out.println(Thread.currentThread().getName()+"加"+number++);
            this.notifyAll();
    }

    public synchronized void subtract() throws InterruptedException {
            if (number!=1){
                wait();
            }
         System.out.println(Thread.currentThread().getName()+"减"+number--);
            this.notifyAll();
    }

    public static void main(String[] args) {
        PCSynchronized pcSynchronized = new PCSynchronized();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    pcSynchronized.add();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    pcSynchronized.subtract();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();
        
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    pcSynchronized.subtract();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"C").start();
    }
}

正常步骤:等待通知,执行业务,通知其他线程

如果只是执行AB线程是没有问题的,但是如果我加入了c线程就会出现虚假唤醒。

所以为什么会虚假唤醒?线程被唤醒,同时对应判断的值也发生了改变,但是我们使用的是if,if只会判断一次,再线程次被唤醒if是无法二次判断的。

解决虚假唤醒的方法:

吧if换成while()

Lock:

package Point03PC;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class PCLock {

    private int number = 5;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public  void add() throws InterruptedException {
        lock.lock();
        try {
            while (number==5){
               condition.await();
            }
            System.out.println(Thread.currentThread().getName()+"加"+number++);
            condition.signalAll();
        } finally {
        lock.unlock();
        }
    }

    public void subtract() throws InterruptedException {
        lock.lock();
        try {
            while (number==0){
                condition.await();
            }
            System.out.println(Thread.currentThread().getName()+"减"+number--);
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }
    
    public static void main(String[] args) {
        PCLock pcLock = new PCLock();

        new Thread(()->{
            try {
                for (int i = 0; i < 40; i++) {
                    pcLock.add();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"A").start();

        new Thread(()->{
            try {
                for (int i = 0; i < 40; i++) {
                    pcLock.add();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"B").start();

        new Thread(()->{
            try {
                for (int i = 0; i < 40; i++) {
                    pcLock.subtract();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"C").start();
    }
}

lock锁的实现类为ReentrantLock

其中的Condition类我是用来控制线程的堵塞等待和唤醒。

5.8锁现象:

  1. 标准情况下谁先获得锁谁就先执行。

  2. 没有锁的方法是不会被锁住的。

  3. 两个对象,两个同步方法中,不同对象的锁是不会互通的。一个对象的锁是不会影响到另外一个对象。

  4. 静态的方法全局唯一, 类一加载就有了!锁的是Class

总结:

​ new this 具体的一个手机
​ static Class 唯一的一个模板

6、集合类不安全

ConcurrentModificationException(并发修改异常)

list不安全:

List<String> list = new ArrayList<>();

解决方案:

  1. List<String> list = new Vector<>();
    

底层是数组加上synchronized锁

  1. List<String> list = Collections.synchronizedList(new ArrayList<>());
    

通过这个Collections工具类将线程不安全的ArrayList变成了线程安全的。

  1. List<String> list = new CopyOnWriteArrayList<>();
    

CopyOnWrite写入时复制的计算机思想
低层是使用lock锁实现的。

set不安全:

解决方法:

  1. Set<String> safeSet = Collections.synchronizedSet(set);
    
  2. CopyOnWriteArraySet<String> safeSet = new CopyOnWriteArraySet<>();
    

Map 不安全:

  Map<String, Object> map = new HashMap<>();

解决方案:

  1.   Map<String, Object> map1 = new HashMap<>();
      Map<String, Object> map = Collections.synchronizedMap(map1);
    
  2.   Map<String, Object> map1 = new HashMap<>();
      Map<String, Object> map = new ConcurrentHashMap<String, Object>(map1);
    

7.Callable:

我们之前使用Callable是使用ExecutorService。

这里探究的是如何使用Thread来执行Callable。

思考过程:

  1. Thread的构造方法里面只能放Runable

  2. 一个类实现什么接口,那么这个类就是对应的接口!!!如实现了HttpServlet的类就是servlet!!!

  3. 我们想要使用Thread来启动Callable我们就必须在Runable的实现类中去寻找!!

    4. TIM截图20200325201257.png
  1. 在Runable的实现类中找到了FutureTask,这个实现类的构造方法放入的是Callable
TIM截图20200325201637.png
  1. public class CallableTest implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            System.out.println(1024);
            return 1024;
        }
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CallableTest callableTest = new CallableTest();
            FutureTask futureTask = new FutureTask<Integer>(callableTest);
            new Thread(futureTask).start();
            Integer res = (Integer) futureTask.get();//获取结果
            System.out.println(res);
        }
    }
    
7. TIM截图20200325203336.png

注意点:

  1. Callable接口类似于Runnable,因为它们都是为其实例可能由另一个线程执行的类设计的。

  2. Runnable不返回结果,也不能抛出被检查的异常。

8、常用的辅助类(必会)

辅助类来检查线程是否都执行完成。

1.CountDownLatch(减法计数器)

public class CountDownLatchTest {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(6);
        for (int i = 0; i < 6; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"======>");
                countDownLatch.countDown();//-1
            },String.valueOf(i)).start();
        }
        System.out.println("hhh我不堵塞");
        countDownLatch.await();//堵塞,等待计数器到0
        System.out.println("结束");
    }
}
countDownLatch.countDown();
countDownLatch.await();
TIM截图20200325212112.png TIM截图20200325213803.png

2.CyclicBarrier(加分计数器)

public class CyclicBarrierTest {
    public static void main(String[] args){
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
            System.out.println("召唤神龙");
        });
        for (int i = 1; i <= 7; i++) {
            new Thread(()->{
                System.out.println("集其了"+Thread.currentThread().getName()+"个龙珠======>");
                try {
                    cyclicBarrier.await();//堵塞等待
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();
        }
    }
}
TIM截图20200325214000.png

3.Semaphore

达到的效果就是限流,通一时间只有指定数量的线程可以运行

public class SemaphoreTest {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(2);
        for (int i = 0; i < 6; i++) {
            new Thread(()->{
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+"抢到了车位");
                    TimeUnit.SECONDS.sleep(2);
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();
        }
    }
}

9.读写锁

读写锁和lock的ReentrantLock是一个东西,只是读写锁粒度更细。

读写锁和lock锁之间的关系:

TIM截图20200325220543.png

10、阻塞队列:

TIM截图20200326084454.png
方法 抛出异常 有返回值,不抛出异常 堵塞等待 超时等待
添加 add offer() put() offer(,)
移除 remove poll() take() poll(,)
检测队首元素 element peek
BlockingQueue<String> ArrayQueue = new ArrayBlockingQueue<String>(4);
BlockingQueue<String> LinkedQueue = new LinkedBlockingDeque<String>(4);

SychronousQueue

是一个容量为一的堵塞队列,进去一个出来一个。

对应的方法为put和take

  BlockingQueue<String> SynchronousQueue = new SynchronousQueue<String>();

11.线程池:

3大方法,7大参数,4大拒绝策略

线程池的好处:

  1. 3大方法
ExecutorService executorService = Executors.newSingleThreadExecutor();
ExecutorService executorService = Executors.newFixedThreadPool(5);
ExecutorService executorService = Executors.newCachedThreadPool();
  1. 7大参数

通过源码分析发现三大方法的实现是用ThreadPoolExecutor来实现的。

所以本质就是ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler){
    .......
}
  1. corePoolSize 核心线程

  2. maximumPoolSize 最大线程

  3. keepAliveTime 处于活跃状态的时间

  4. unit 时间单位

  5. workQueue 堵塞队列,任务等待

  6. threadFactory 线程工厂,创建线程

  7. RejectedExecutionHandler 拒绝策略

四大拒绝策略:RejectedExecutionHandler

拒绝策略会在线程池满了的情况使用

  1. AbortPolicy(),不处理任务抛出异常
  2. CallerRunsPolicy(),从哪里来回哪里去
  3. DiscardOldestPolicy(),和最先执行的线程去争抢资源
  4. DiscardPolicy(),丢弃任务,不抛出异常

线程池大小的设置:(CPU密集型,io密集型)

1、CPU 密集型,几核,就是几,可以保持CPu的效率最高!
2、IO 密集型 > 判断你程序中十分耗IO的线程,就设置其线程数量的2倍。

四大函数式接口:

Function函数式接口

public interface Function<T, R> {
    /**
     * Applies this function to the given argument.
     *
     * @param t the function argument
     * @return the function result
     */
    R apply(T t);
}

断定型接口:有一个输入参数,返回值只能是 布尔值!

public interface Predicate<T> {
      /**
     * Evaluates this predicate on the given argument.
     *
     * @param t the input argument
     * @return {@code true} if the input argument matches the predicate,
     * otherwise {@code false}
     */
    boolean test(T t);
}

Consumer 消费型接口

public interface Consumer<T> {
     /**
     * Performs this operation on the given argument.
     *
     * @param t the input argument
     */
    void accept(T t);
}

Supplier 供给型接口

public interface Supplier<T> {

    /**
     * Gets a result.
     *
     * @return a result
     */
    T get();
}

13.Stream流式计算

14.ForkJoin

分支合并,程序并行执行,增加运行速度。

将大问题分解成为一个一个独立的小问题,递归去执行

其中用到的思想就是分治法的思想,分而治之。

ForkJoin 特点:工作窃取

提前完成的线程会去窃取未完成线程的一个任务来执行。(双端队列)

例子 求和计算

package Point12_FrokJoin;
import java.util.concurrent.RecursiveTask;

public class ForkJoinTest extends RecursiveTask<Long> {
    /**
     * 求和计算
     */
    private Long start;
    private Long end;
    private Long temp = 10000L;

    public ForkJoinTest(Long start, Long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        Long sum = 0L;
        if ((end-start)<temp){
            for (Long i = start; i <= end; i++) {
                sum+=i;
            }
            //System.out.println(Thread.currentThread().getName());
            return sum;
        }else {
            Long middle =  (start + end)/2;
            ForkJoinTest forkJoinTest1 = new ForkJoinTest(start,middle);
            forkJoinTest1.fork();
            ForkJoinTest forkJoinTest2 = new ForkJoinTest(middle+1,end);
            forkJoinTest2.fork();
            return forkJoinTest1.join()+forkJoinTest2.join();
        }
    }
}
package Point12_FrokJoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;

public class test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        test3();
    }

    public static void test() throws ExecutionException, InterruptedException {
//        结果为:500000000500000000
//        时间为:4012
        Long start = System.currentTimeMillis();
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTest forkJoinTest = new ForkJoinTest(0L, 10_0000_0000L);
        ForkJoinTask<Long> submit = forkJoinPool.submit(forkJoinTest);
        Long res = submit.get();
        System.out.println("结果为:"+res);
        Long end = System.currentTimeMillis();
        System.out.println("时间为:"+(end-start));
    }

    public static void test2() throws ExecutionException, InterruptedException {
//        结果为:500000000500000000
//        时间为:2997
        Long start = System.currentTimeMillis();
        Long sum = 0L;
        for (int i = 0; i <= 10_0000_0000; i++) {
            sum+=i;
        }
        System.out.println("结果为:"+sum);
        Long end = System.currentTimeMillis();
        System.out.println("时间为:"+(end-start));
    }

    public static void test3() throws ExecutionException, InterruptedException {
//        结果为:500000000500000000
//        时间为:165
        Long start = System.currentTimeMillis();
        Long res = LongStream.rangeClosed(0L,10_0000_0000L).parallel().reduce(0,Long::sum);
        System.out.println("结果为:"+res);
        Long end = System.currentTimeMillis();
        System.out.println("时间为:"+(end-start));
    }
}

15、异步回调

Future 设计的初衷: 对将来的某个事件的结果进行建模

上一篇 下一篇

猜你喜欢

热点阅读