Concurrent Java 01 - 线程安全性

2019-02-02  本文已影响0人  阿武_Accat

线程安全三个必要性

确保三个必要性

原子性

Atomic 包提供了一批AtomicXXX类型,用于确保对象的获取和操作步骤为原子性操作。


atomic包提供类 将Read,Load,Use, Assign, Store, Write 绑定在一起
package com.accat.concurrency.example.atomic;

import com.accat.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@ThreadSafe
public class AtomicExample1 {

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    public static AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count.get());
    }

    private static void add() {
        count.incrementAndGet();
        // count.getAndIncrement();
    }
}

这里关键是add()中的count.incrementAndGet(),追踪这个方法
count.incrementAndGet
-> unsafe.getAndAddInt

public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
    }

这里使用了乐观锁的概念,不断地去比较var2, 和 var5,如果相同则切换值var5 + var4

-> getIntVolatile + compareAndSwapInt
追踪链到这个方法,compareAndSwapInt -- CAS 代表 比较切换值同时进行,属于java底层的代码。

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

compareAndSet

package com.accat.concurrency.example.atomic;

import com.accat.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicReference;

@Slf4j
@ThreadSafe
public class AtomicExample4 {

    private static AtomicReference<Integer> count = new AtomicReference<>(0);

    public static void main(String[] args) {
        count.compareAndSet(0, 2); // 2
        count.compareAndSet(0, 1); // no
        count.compareAndSet(1, 3); // no
        count.compareAndSet(2, 4); // 4
        count.compareAndSet(3, 5); // no
        log.info("count:{}", count.get());
    }
}

compareAndSet直接设置值,由于AtomicReference<V>是个泛型类,所以设置的值为V类型。

AtomicIntegerFieldUpdater

package com.accat.concurrency.example.atomic;

import com.accat.concurrency.annoations.ThreadSafe;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

@Slf4j
@ThreadSafe
public class AtomicExample5 {

    private static AtomicIntegerFieldUpdater<AtomicExample5> updater =
            AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class, "count");

    @Getter
    public volatile int count = 100;

    public static void main(String[] args) {

        AtomicExample5 example5 = new AtomicExample5();

        if (updater.compareAndSet(example5, 100, 120)) {
            log.info("update success 1, {}", example5.getCount());
        }

        if (updater.compareAndSet(example5, 100, 120)) {
            log.info("update success 2, {}", example5.getCount());
        } else {
            log.info("update failed, {}", example5.getCount());
        }
    }
}

更新某个对象中的某字段值

AtomicStampedReference

标准乐观锁

/**
     * Atomically sets the value of both the reference and stamp
     * to the given update values if the
     * current reference is {@code ==} to the expected reference
     * and the current stamp is equal to the expected stamp.
     *
     * @param expectedReference the expected value of the reference
     * @param newReference the new value for the reference
     * @param expectedStamp the expected value of the stamp
     * @param newStamp the new value for the stamp
     * @return {@code true} if successful
     */
    public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            expectedStamp == current.stamp &&
            ((newReference == current.reference &&
              newStamp == current.stamp) ||
             casPair(current, Pair.of(newReference, newStamp)));
    }

AtomicStampedReference类可以说是乐观锁的标准实现,
①该类为改变的类变量维护一个版本号,每次相信该类变量版本为最新,
②如果是最新则设置值,
③如果不是则放弃操作后返回false

让多线程代码只执行一次

package com.mmall.concurrency.example.atomic;

import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;

@Slf4j
@ThreadSafe
public class AtomicExample6 {

    private static AtomicBoolean isHappened = new AtomicBoolean(false);

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    test();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("isHappened:{}", isHappened.get());
    }

    private static void test() {
        if (isHappened.compareAndSet(false, true)) {
            log.info("execute");
        }
    }
}

test()中的代码只执行了一次,如果要确保多线程执行同一方法时,确保方法只被执行一次则可以参考上述代码。

上述atomic包提供的类是存在缺陷的,因为它只提供对单一类成员变量的原子性操作。
AtomicInteger只提供对单一Integer的原子性操作。
如果我有两个类型AtomicInteger aAtomicInteger b, 而操作 a + b 的过程中依然存在缝隙。

锁的分类

synchronized

synchronized更为底层,JVM层面的实现
Lock更为上层,是Java代码实现


synchronized作用范围
package com.accat.concurrency.example.sync;

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class SynchronizedExample1 {

    // 修饰一个代码块
    public void test1(int j) throws InterruptedException {
        synchronized (this) {
            for (int i = 0; i < 10; i++) {
                log.info("test1 {} - {}", j, i);
                Thread.sleep(200);
            }
        }
    }

    // 修饰一个方法
    public synchronized void test2(int j) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            log.info("test2 {} - {}", j, i);
            Thread.sleep(200);
        }
    }

    public static void main(String[] args) {
        SynchronizedExample1 example1 = new SynchronizedExample1();
        SynchronizedExample1 example2 = new SynchronizedExample1();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> {
            try {
                example1.test2(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        executorService.execute(() -> {
            try {
                example2.test2(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}
package com.accat.concurrency.example.sync;

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class SynchronizedExample2 {

    // 修饰一个类
    public static void test1(int j) throws InterruptedException {
        synchronized (SynchronizedExample2.class) {
            for (int i = 0; i < 10; i++) {
                log.info("test1 {} - {}", j, i);
                Thread.sleep(200);
            }
        }
    }

    // 修饰一个静态方法
    public static synchronized void test2(int j) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            log.info("test2 {} - {}", j, i);
            Thread.sleep(200);
        }
    }

    public static void main(String[] args) {
        SynchronizedExample2 example1 = new SynchronizedExample2();
        SynchronizedExample2 example2 = new SynchronizedExample2();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> {
            try {
                example1.test1(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        executorService.execute(() -> {
            try {
                example2.test1(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}
不同关键字的应用场景

可见性

在Read前加上Load指令,在Write后加入Store指令 Volatile写 Volatile读

Volatile关键字保证读取或者写入工作内存时都事先与主内存中的数据进行同步。

package com.accat.concurrency.example.count;

import com.accat.concurrency.annoations.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

@Slf4j
@NotThreadSafe
public class CountExample4 {

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    public static volatile int count = 0;

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private static void add() {
        count++;
        // 1、count读取  多个线程读取最新的结果
        // 2、+1
        // 3、count写入  多个线程把+1的结果同时写回主内存
    }
}

volatile并不具备线程原子性,不能保证线程安全。
它只能保证读取的值为主内存当前值,写入值后对其他线程立马可见,影响的是判断值的过程。但是当操作时,可能多个操作(+1)执行后同时写入主内存中。


volatile适应场景一:状态通知

关于volatile底层细节

上一篇下一篇

猜你喜欢

热点阅读