SSM社区架构社区SSH社区

Java并发编程

2018-07-08  本文已影响3人  wch853

相关概念

并发

并发是指多个线程同时处于执行的某个状态,交替地使用系统资源来处理任务。使用并发具有以下优势:

CPU高速缓存

CPU执行速度很快,而从内存读取数据和向内存写入数据的过程跟CPU执行指令的速度比起来要慢的多。CPU高速缓存的出现是为了缓解CPU和内存之间速度不匹配的问题(CPU - cache - memory)。
当程序运行时,会将需要的数据从主存中复制一份到CPU的高速缓存中,当CPU执行指令时直接在缓存中存取数据,在之后的某个时间点,再将缓存中的数据刷新到主存中。

CPU高速缓存
CPU高速缓存的意义

缓存容量远远小于主存,缓存无法命中CPU所需数据的情况在所难免,缓存的意义如下

缓存一致性协议(MESI协议)

在多线程运行环境下,存在多份高速缓存,如果多个线程对多份缓存中的同一变量都做了修改,就有可能出现数据不一致的情况。为了保证多个CPU cache之间缓存共享数据的一致,采用MESI协议,其核心思想是:当CPU写数据时,如果发现操作的变量是共享变量,即在其他CPU中也存在该变量的副本,会发出信号通知其他CPU将该变量的缓存行置为无效状态,因此当其他CPU需要读取这个变量时,发现自己缓存中缓存该变量的缓存行是无效的,那么它就会从内存重新读取。

Java内存模型

Java内存模型规定了所有变量都存储在主内存中,每个线程都有自己的工作内存(CPU寄存器和高速缓存的抽象),工作内存中保存了线程使用到的变量的副本拷贝,线程对变量的读写都必须在工作内存中进行,在之后的某个时间点再将工作内存中的变量同步到主内存中。不同线程无法直接访问对方工作内存的变量,必须通过主内存来进行通信,如果没有同步规则,就容易发生错误。


Java内存模型抽象结构
同步操作
同步操作

如果要把一个变量从主内存中复制到工作内存,就需要按顺寻地执行read和load操作,如果把变量从工作内存中同步回主内存中,就要按顺序地执行store和write操作。Java内存模型只要求上述操作必须按顺序执行,而没有保证必须是连续执行。也就是

线程安全性

当多个线程访问某个类时,不管运行时环境采用何种调度方式或将这些进程如何交替执行,并且不需要任何额外的同步或协同,这个类都能表现出正确的行为。

原子性

CAS(Compare And Swap)

线程安全的实现方式之一是加锁,但是在多线程并发执行的情况下,加锁和释放会导致较多的上下文切换、导致死锁问题等。
CAS是一种不通过加锁,使用非阻塞方式实现线程安全的方式,发生冲突导致失败时就重试,直到成功。CAS有三个操作参数:内存地址、期望值、修改值,其核心操作是读取某内存地址中的值与期望值进行比较,当他们相同时才将该内存位置的值修改为修改值,若不相同,说明内存中的值已被其他线程修改,则将读取的内存中的值赋给期望值,重新进行读取、比较操作。比较和置换是作为单个原子操作实现的。

/**
 * @param obj 当前对象
 * @param valueOffset 期望当前值
 * @param newValue 新值
 */
public final Object getAndSetObject(Object obj, long valueOffset, Object newValue) {
    // 内存返回值
    Object valueInMemo;
    do {
        valueInMemo = this.getObjectVolatile(obj, valueOffset);
    } while(!this.compareAndSwapObject(obj, valueOffset, valueInMemo, newValue));
    return valueInMemo;
}
CAS的ABA问题

从内存中读取和比较之间,内存中的值可能被多次修改,最终回到原来的值,这与设计思路是不相符的,为了解决这个问题,内存中的值可以引入版本号处理,只要内存变量被修改过,其版本号就会递增,比较时版本号与读取时不一致,即认为失败。

CAS的性能问题

当并发执行的线程数据很多,可能会导致重试多次。

synchronize

基于JVM,不可中断锁,适合竞争不激烈的情况,可读性好。当一个线程访问同步代码块时,它首先是需要得到锁才能执行同步代码,当退出或者抛出异常时必须要释放锁。

Lock

依赖特殊CPU指令,可中断锁,多样化同步,竞争激烈时能维持常态。

可见性

导致共享变量在线程间不可见的原因
synchronized
volatile

volatile是通过内存屏障和静止重排序优化来实现可见性的,不能保证线程安全,可以利用其可见性作为不同线程间的同步标记。

有序性

Java内存模型中,允许编译器和处理器对指令进行重排序,重排序过程不会影响到单线程程序的执行,却可能会影响到多线程并发执行的正确性。

happens-before(先行发生原则)

如果两个操作的执行次序无法从happens-before原则推导出来,那么久不能保证有序性,虚拟机可以对它们进行重排序。

安全发布对象

安全发布方法

单例模式

懒汉模式

懒汉模式是在第一次使用时创建单例实例。创建单例时,在获取单例的静态方法上加锁会造成性能开销;使用双重检测机制在指令重排序的情况下会造成线程不安全。因此使用懒汉模式创建单例应使用volatile关键字+双重检测机制。

public class LazySingleton {

    /**
     * 私有构造函数
     */
    private LazySingleton() {
    }

    /**
     * 单例对象
     * volatile + 双重检测机制 -> 禁止重排序
     */
    private volatile static LazySingleton instance = null;

    /**
     * instance = new LazySingleton();
     * 1.分配对象内存空间
     * 2.初始化对象
     * 3.设置instance指向刚分配的内存
     *
     * JVM和CPU优化,发生了指令重排序 1-3-2,线程A执行完3,线程B执行第一个判空,直接返回
     * 通过volatile关键字禁止重排序
     *
     * @return
     */
    public static LazySingleton getInstance() {
        if (null == instance) {
            synchronized (LazySingleton.class) {
                if (null == instance) {
                    // 双重检测
                    instance = new LazySingleton();
                }
            }
        }
        return instance;
    }
}
饿汉模式

饿汉模式是在类装载时创建单例实例。

public class HungrySingleton {

    /**
     * 私有构造函数
     */
    private HungrySingleton() {
    }

    /**
     * 单例对象
     */
    private static HungrySingleton instance = new HungrySingleton();

    public static HungrySingleton getInstance() {
        return instance;
    }
}
使用枚举的单例模式
public class EnumSingleton {

    private EnumSingleton() {
    }

    private static EnumSingleton getInstance() {
        return Singleton.INSTANCE.getInstance();
    }

    private enum Singleton {
        INSTANCE;

        private EnumSingleton instance;

        /**
         * JVM保证该方法只调用一次
         */
        Singleton() {
            instance = new EnumSingleton();
        }

        public EnumSingleton getInstance() {
            return instance;
        }
    }
}

线程安全策略

不可变对象

将对象设计为不可变对象实际上是对并发造成的线程安全问题的规避,不可变对象需要满足以下条件:

final关键字
unmodifiable与Immutable

分别由java.util.collections和com.google.collect包提供只读视图,对集合修改操作的实现均以抛出异常代替。

线程封闭

把对象封装到一个线程里,只有这一个线程能访问到这个对象。

常见线程不安全类

常用同步容器

采用synchronize实现线程安全的常用同步容器。

安全共享对象策略

并发容器

J.U.C

J.U.C是指java.util.concurrent包。AQS是指java.util.concurrent.locks.AbstractQueuedSynchronizer。

CountDownLatch

CountDownLatch

CountDownLatch可以看作是一个计数器,其内部维护着一个count计数,对这个计数器的操作都是原子操作,同时只能有一个线程去操作这个计数器,CountDownLatch通过构造函数传入一个初始计数值,调用者可以通过调用CountDownLatch对象的countDown()方法来使计数减1,如果调用CountDownLatch对象的await()方法,那么线程就会一直阻塞,直到countDown()方法将计数减到0,才可以继续执行。await()方法允许设置超时参数。

ExecutorService executorService = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(requestCount);
for (int i = 0; i < requestCount; i++) {
    final int threadNum = i;
    executorService.execute(() -> {
        // execute task...
        countDownLatch.countDown();
    });
}
countDownLatch.await();

Semaphore

Semaphore是计数信号量,用于限制获取某种资源的线程数量。在共享资源使用前和使用完成后,分别调用Semaphore的acquire()和release()方法获取和释放许可。

ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadCount);
for (int i = 0; i < requestCount; i++) {
    final int threadNum = i;
    executorService.execute(() -> {
        // 获取许可
        semaphore.acquire();
        // 一次获取多个许可
        // semaphore.acquire(10);
        // 尝试获取许可,获取不到就放弃
        // semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS);
        // execute task...
        // 释放许可
        semaphore.release();
    });
}

CyclicBarrier

CyclicBarrier允许一组线程相互等待,直到每个线程都到达屏障点之后才能执行后续操作,线程通过调用CyclicBarrier对象的await()方法通知自己已到达屏障,然后被阻塞。CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。CountDownLatch的计数器只能使用一次,其描述的是一个或多个线程等待其他线程的关系;而CyclicBarrier的计数器可以使用reset()方法重置,其描述的是多个线程相互等待的关系。

ExecutorService executorService = Executors.newCachedThreadPool();
final CyclicBarrier cyclicBarrier = new CyclicBarrier(reachBarrierCount);
for (int i = 0; i < 10; i++) {
    final int threadNum = i;
    executorService.execute(()->{
        log.info("{} ready", threadNum);
        cyclicBarrier.await();
        log.info("{} continue", threadNum);
    });
}

在申明CyclicBarrier时可以指定Runnable,在要求数量的线程都到达屏障时优先执行。

CyclicBarrier cyclicBarrier = new CyclicBarrier(reachBarrierCount, new Runnable() {
    @Override
    public void run() {
        // all reach barrier do something before continue
    }
});

Lock

synchronized是JVM层面的实现,Lock是JDK层面的实现,需要保证锁手工释放。

final Lock lock = new ReentrantLock();
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadCount);
CountDownLatch countDownLatch = new CountDownLatch(requestCount);
for (int i = 0; i < requestCount; i++) {
    executorService.execute(() -> {
        try {
            semaphore.acquire();
            lock.lock();
            try {
                count++;
            } finally {
                lock.unlock();
            }
            semaphore.release();
        } catch (Exception e) {
            log.info("Exception: {}", e);
        }
        countDownLatch.countDown();
    });
}
countDownLatch.await();

ReentrantLock是一种独占锁,即同一时间只有一个线程能获得锁,但是读操作与读操作之间是不存在互斥关系的。因此引入ReentrantReadWriteLock,它的特性是:一个资源可以被多个读操作访问,或者一个写操作访问,但两者不能同时进行。然而当读取很多,写入很少的情况时,使用 ReentrantReadWriteLock 可能会使写入线程遭遇饥饿问题。
StampedLock控制锁有三种模式(写,读,乐观读),一个StampedLock状态是由版本和模式两个部分组成,锁获取方法返回一个数字作为票据stamp,它用相应的锁状态表示并控制访问,数字0表示没有写锁被授权访问。在读锁上分为悲观锁和乐观锁。所谓的乐观读模式,也就是若读的操作很多,写的操作很少的情况下,你可以乐观地认为,写入与读取同时发生几率很少,因此不悲观地使用完全的读取锁定,程序可以查看读取资料之后,是否遭到写入执行的变更,再采取后续的措施(重新读取变更信息,或者抛出异常)。

final StampedLock lock = new StampedLock();
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadCount);
CountDownLatch countDownLatch = new CountDownLatch(requestCount);
for (int i = 0; i < requestCount; i++) {
    executorService.execute(() -> {
        try {
            semaphore.acquire();
            long stamp = lock.lock();
            try {
                count++;
            } finally {
                lock.unlock(stamp);
            }
            semaphore.release();
        } catch (Exception e) {
            log.info("Exception: {}", e);
        }
        countDownLatch.countDown();
    });
}
countDownLatch.await();
锁的选择

FutureTask

Thread和Runnable创建的线程在执行完成后无法获取执行结果,但是自Java 1.5开始就提供了Callable和Future接口在任务执行完毕后获取结果。

/**
 * 实现Callable接口的call()方法可以获取线程返回结果
 */
public interface Callable<V> {

    V call() throws Exception;
}

/**
 * Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否取消、查询是否完成、获取结果。
 * 必要时可以通过get()方法获取执行结果,该方法会阻塞直到任务返回结果。
 */
public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

FutureTask实现了RunnableFuture接口,而RunnableFuture接口继承了Runnable和Future接口,所以FutureTask既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

// Callable + Future
Future<String> future = executorService.submit(new Callable<String>() {
    @Override
    public String call() throws Exception {
        log.info("do something in callable");
        Thread.sleep(3000);
        return "Done";
    }
});
String result = future.get();

// Callable + FutureTask
FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
    @Override
    public String call() throws Exception {
        log.info("do something in callable");
        Thread.sleep(3000);
        return "Done";
    }
});
new Thread(futureTask).start();
String result = futureTask.get();

Fork/Join框架

Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

工作窃取算法

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。工作窃取的运行流程图如下:


工作窃取算法

对于一个比较大的任务,可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,这些子任务分别放到不同的队列里,每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。已经完成任务的线程就会去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源;比如创建多个线程和多个双端队列。

@Slf4j
@AllArgsConstructor
public class ForkJoinTaskExample extends RecursiveTask<Integer> {

    private static final int THRESHOLD = 2;

    private int start;

    private int end;

    @Override
    protected Integer compute() {
        int sum = 0;

        // 任务足够小就计算
        boolean canCompute = (end - start) <= THRESHOLD;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);

            // 执行子任务,调用compute()方法继续拆分,直到任务足够小
            leftTask.fork();
            rightTask.fork();

            // 分别获取子任务计算结果
            Integer leftResult = leftTask.join();
            Integer rightResult = rightTask.join();
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
        Future<Integer> result = forkJoinPool.submit(task);

        try {
            log.info("result: {}", result.get());
        } catch (Exception e) {
            log.error("exception", e);
        }
    }
}

ForkJoinTask需要实现compute()方法,在这个方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务并返回结果。如果不足够小,就必须分割成子任务,每个子任务在调用fork()方法时,又会进入compute()方法查看当前子任务是否需要继续分割。使用join()方法会等待子任务执行完并得到其结果。

线程池

线程池是指在初始化一个多线程应用程序过程中创建一个线程集合,在需要执行新的任务时重用这些线程而不是新建一个线程。线程池中的每个线程都在等待或执行任务,一旦任务已经完成,线程就会回到线程池中等待下一次任务。

线程池的优点
ThreadPoolExecutor
属性
方法
J.U.C提供的线程池
线程池配置

多线程并发最佳实践

死锁

如果一组进程中的每一个进程都在等待仅由该组进程中的其他进程才能引发的事件,那么该组进程是死锁的。

死锁的条件

(1)互斥条件,在一段时间内,某资源只能被一个进程占用。
(2)请求和保持条件,某进程已经保持了至少一个资源,但又提出了新的资源请求,而该资源已被其他进程占有,此时请求进程被阻塞,但对自己已持有的资源保持不放。
(3)不可抢占条件,进程已获得的资源在未使用完之前不能被抢占,只能在进程使用完时由自己释放。
(4)循环等待条件,在发生死锁时,必然存在一个进程—资源的循环链

多线程编程最佳实践

高并发问题策略

扩容

线程占用内存的大小取决于其工作内存中变量的多少,随着并发量增加,可能就需要增加内存或服务器。

缓存

缓存特征
命中率的影响因素
缓存问题

消息队列

特性
消息队列解决的问题

应用拆分

单个服务器的处理是有上限的,可以将一个庞大的应用按照某种规则拆分成多个应用,分开部署。

应用拆分原则
微服务

微服务是通过将功能分解到各个离散的服务中以实现对解决方案的解耦,提供更加灵活的服务支持。微服务把一个大型的单个应用程序和服务拆分为数个甚至数十个的支持微服务,它可扩展单个组件而不是整个的应用程序堆栈,从而满足服务等级协议。微服务围绕业务领域组件来创建应用,这些应用可独立地进行开发、管理和迭代。在分散的组件中使用云架构和平台式部署、管理和服务功能,使产品交付变得更加简单。其本质是用一些功能比较明确、业务比较精练的服务去解决更大、更实际的问题。

应用限流

应用限流就是通过对并发访问/请求进行限制,从而达到保护系统的目的。

限流算法

服务降级与服务熔断

降级

降级指请求处理不了或出错时给一个默认的返回。

熔断

熔断指系统出现过载,为了防止造成整个系统故障而采取的措施。

服务降级与服务熔断的比较

切库、分库分表

当单个库的数据量过大或者单个库服务器由于读写瓶颈造成服务器压力过大时,需要考虑切库、分库分表.

数据库切库
数据库分库分表

当一个表数据量很大,大到即使做了sql和索引优化之后,基本操作的速度还是影响使用就必须考虑分表了。分表后单表的并发能力提高了,写操作效率也会提高;其次是查询时间变短了,数据分布在不同的文件里,磁盘的IO性能也提高了,磁盘读写锁影响的数据量变小,插入数据库需要重新建立的索引变少。

上一篇 下一篇

猜你喜欢

热点阅读