编程开发收藏一些收藏

Java并发编程基础知识

2020-10-18  本文已影响0人  后来丶_a24d

目录

目录.png

概念

饥饿,死锁, 活锁

ForkJoinPool

// newWorkStealingPool线程池的实现用到了ForkJoinPool,用到了分而治之,递归计算的算法, 抢占式
ExecutorService exec = Executors.newWorkStealingPool();
public class TestForkJoinCalculator {
    private final ForkJoinPool pool;

    //执行任务RecursiveTask:有返回值  RecursiveAction:无返回值
    private static class SumTask extends RecursiveTask<Long> {
        private final long[] numbers;
        private final int from;
        private final int to;

        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }

        //此方法为ForkJoin的核心方法:对任务进行拆分  拆分的好坏决定了效率的高低
        @Override
        protected Long compute() {

            // 当需要计算的数字个数小于6时,直接采用for loop方式计算结果
            if (to - from < 6) {
                long total = 0;
                for (int i = from; i <= to; i++) {
                    total += numbers[i];
                }
                return total;
            } else { // 否则,把任务一分为二,递归拆分(注意此处有递归)到底拆分成多少分 需要根据具体情况而定
                int middle = (from + to) / 2;
                SumTask taskLeft = new SumTask(numbers, from, middle);
                SumTask taskRight = new SumTask(numbers, middle + 1, to);
                taskLeft.fork();
                taskRight.fork();
                return taskLeft.join() + taskRight.join();
            }
        }
    }

    public TestForkJoinCalculator() {
        // 也可以使用公用的线程池 ForkJoinPool.commonPool():
        // pool = ForkJoinPool.commonPool()
        pool = new ForkJoinPool();
    }

    public long sumUp(long[] numbers) {
        Long result = pool.invoke(new SumTask(numbers, 0, numbers.length - 1));
        pool.shutdown();
        return result;
    }

    public static void main(String[] args) {
        long[] numbers = LongStream.rangeClosed(1, 10000000).toArray();

        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        TestForkJoinCalculator calculator = new TestForkJoinCalculator();
        long result = calculator.sumUp(numbers);
        stopWatch.stop();
        System.out.println("耗时:" + stopWatch.getTotalTimeMillis() + "ms");

        System.out.println("结果为:" + result);
    }
}

  1. 每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
  2. 每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式
  3. 在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成
  4. 线程默认: 如果没有指定,则默认为Runtime.getRuntime().availableProcessors() - 1. 或者设置启动参数:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=8
  1. 当使用ThreadPoolExecutor时,使用分治法(分治时任务会很多)会存在问题,因为ThreadPoolExecutor中的线程无法像任务队列中再添加一个任务并且在等待该任务完成之后再继续执行。而使用ForkJoinPool时,就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。ThreadPoolExecutor中的Thread无法选择优先执行子任务

基础

while true

线程等待其他线程

join,countDownLatch, CyclicBarrier

CompletableFuture

构造方法非线程安全

public class ThisEscape {
    public ThisEscape() {
        new Thread(new EscapeRunnable()).start();
        // ...其他代码
    }
    
    private class EscapeRunnable implements Runnable {
        @Override
        public void run() {
            // 在这里通过ThisEscape.this就可以引用外围类对象, 但是此时外围类对象可能还没有构造完成, 即发生了外围类的this引用的逃逸
        }
    }
}
改造:
public class ThisEscape {
    private Thread t;
    public ThisEscape() {
        t = new Thread(new EscapeRunnable());
        // ...其他代码
    }
    public void init() {
        t.start();
    }   
    private class EscapeRunnable implements Runnable {
        @Override
        public void run() {
            // 在这里通过ThisEscape.this就可以引用外围类对象, 此时可以保证外围类对象已经构造完成
        }
    }
}

并发编程缺点

  1. 在线程等待共享资源时会降低速度。
  2. 线程管理产生额外 CPU 开销。
  3. 糟糕的设计决策带来无法弥补的复杂性。

创建一个 Thread, jvm做的事情

  1. 程序计数器,指明要执行的下一个 JVM 字节码指令。
  2. 用于支持 Java 代码执行的栈
  3. 第二个则用于 native code(本机方法代码)执行的栈
  4. thread-local variables (线程本地变量)的存储区域
  5. 用于控制线程的状态管理变量

超线程

线程数量

wait, notify

volatile

字分裂

volatile可见性

volatile 重排与 Happen-Before 原则

happens before 担保原则( volatile (易变性)操作通常称为 memory barrier (通过Lock前缀指令生内存屏障)
one,two,three 变量赋值操作就可以被重排, xyz也是
happens before 担保原则确保 volatile 变量的读写指令不能跨过内存屏障进行重排
happens before 担保原则还有另一个作用:当线程向一个 volatile 变量写入时,在线程写入之前的其他所有变量(包括非 volatile 变量)也会刷新到主内存。当线程读取一个 volatile 变量时,它也会读取其他所有变量(包括非 volatile 变量)与 volatile 变量一起刷新到主内存

public void run() {
    one = 1;
    two = 2;
    three = 3;
    volaTile = 92;
    int x = four;
    int y = five;
    int z = six;
}

happens-before原则

  1. 单线程happen-before原则:在同一个线程中,前面的操作产生的结果必须对后面的操作可见,书写在前面的操作happen-before后面的操作(必须有数据依赖,无数据依赖则有可能指令重排)。
  2. 锁的happen-before原则:同一个锁的unlock操作happen-before此锁的lock操作
  3. volatile的happen-before原则: 对一个volatile变量的写操作happen-before对此变量的任意操作。
  4. happen-before的传递性原则: 如果A操作 happen-before B操作,B操作happen-before C操作,那么A操作happen-before C操作。
  5. 线程启动的happen-before原则:同一个线程的start方法happen-before此线程的其它方法
  6. 线程中断的happen-before原则:对线程interrupt方法的调用happen-before被中断线程的检测到中断发送的代码(interrupt 方法改变的状态必须对后续执行的检测方法可见)
  7. 线程终结的happen-before原则:线程中的所有操作都happen-before线程的终止检测。
  8. 对象创建的happen-before原则:一个对象的初始化完成先于他的finalize方法调用。

原子性

java并发中常见的锁

  1. 偏向锁,轻量级锁,重量级锁
  1. 乐观锁,悲观锁
  1. 公平锁,非公平锁
  1. 可重入锁
  1. 独享锁/共享锁
  1. 分段锁

多线程获取结果对比RxJava2使用

private Response asyncHandle(SearchRequest request){
    Response response = new HResponse();
    response.setLowRates(new ArrayList<>());

    List<Future<Response>> futures = new ArrayList<>();

    while (iteratorCity.hasNext()) {
        futures.add(threadPool.submit(new SearchRunner(countryID, request));
    }

    while(iter.hasNext()) {
        if(timeout(start)) {
            System.ou.println("....");
            break;
        }

        Future<Response> future = iter.next();
        try {
            Response resp = future.get(Config.getSingleTimeout(), TimeUnit.MILLISECONDS);
            if(resp != null) {
                response.getLowRates().addAll(resp.getLowRates());
                iter.remove();
            }
        } 
        // 省略catch
    }

    return response;

}
  1. 异步操作,中间执行的任务可以是异步网络操作,控制socket timeout之类的可以在这块处理。更优雅。
// 常见的示例,这是一个异步操作
Single.create(new Single.OnSubscribe<Integer>() {
    @Override
    public void call(SingleSubscriber<? super Integer> singleSubscriber) {
        // 这里被指定在IO线程
        singleSubscriber.onSuccess(addValue(1, 2));
    }
})
.subscribeOn(Schedulers.io())// 指定运行在IO线程
.subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {            }
    @Override
    public void onError(Throwable e) {    }
    @Override
    public void onNext(Integer o) {  
        // o = 3
    }
});
  1. zip
Single.zip(s1, s2, new Func2<Integer, Integer, String>() {
    @Override
    public String call(Integer o, Integer o2) {
        LogHelper.e("A:" + o + "=" + o2);
        return null;
    }}).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        LogHelper.e("kk:"+s);
    }
});

参考文章

  1. java编程思想之并发编程
  2. java编程思想之并发底层原理
  3. 八个层面比较 Java 8, RxJava, Reactor
  4. 介绍 ForkJoinPool 的适用场景,实现原理
  5. 【死磕Java并发】-----Java内存模型之happens-before
  6. happen-before原则的理解
上一篇下一篇

猜你喜欢

热点阅读