4、Java并发编程入门与高并发面试-线程安全性
慕课网 Jimin老师 Java并发编程入门与高并发面试 学习笔记
Java并发编程入门与高并发面试
线程安全性定义:
当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些进程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的
◆原子性:提供了互斥访问,同一时刻只能有一一个线程来对它进行操作
◆可见性: 一个线程对主内存的修改可以及时的被其他线程观察到
◆有序性:一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果- -般杂乱无序
原子性-Atomic包
◆AtomicXXX : CAS. Unsafe.compareAndSwapInt
package com.huhao.concurrency.example.count;
import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 代码模拟并发测试
* Atomic包演示
*/
@Slf4j
@ ThreadSafe
public class CountExample2 {
//请求总数
public static int clientTotal = 5000;
//同时并发执行的线程数
public static int threadTotal = 200;
public static AtomicInteger count = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int index = 0; index < clientTotal; index++) {
exec.execute(() -> {
try {
//线程请求,如果线程数已经到了,可能会阻塞,等有线程释放了再执行
semaphore.acquire();
add();
//add执行完后,释放当前线程
semaphore.release();
} catch (InterruptedException e) {
log.error("exception", e);
e.printStackTrace();
}
countDownLatch.countDown();
});
}
//保证线程减到0
countDownLatch.await();
//关闭线程池
exec.shutdown();
log.error("count:{}", count);//count:4952
}
private static void add() {
count.incrementAndGet();
}
}
原因详解:
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
/**
* eg:2+1=3
* @param var1 :当前count对象
* @param var2:当前的值 2
* @param var4 :加的值 1
*
* var5是底层方法获取到的值。如果没有别的线程处理更改的时候,正常应该var5=2,
* compareAndSwapInt 来判断,直到var2与var5相同时,才会var5+var4=2+1,返回3
* @return the updated value
*/
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
AtomicLong、LongAdder
由于getAndAddInt方法,一直循环比较,在线程数少的时候,比较的成功率比较高,在多的时候,失败的次数会增多,导致一直循环,效率较低
并发量大的时候,使用LongAdder效率会高一点。如果小的话,也可以用AtomicInteger
AtomicReference 、AtomicReferenceFieldUpdater
package com.huhao.concurrency.example.atomic;
import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
/**
* 代码模拟并发测试
* AtomicReference包演示
*/
@Slf4j
@ThreadSafe
public class AtomicExampleAtomicReference {
private static AtomicReference<Integer> count = new AtomicReference<>(0);
public static void main(String[] args) {
count.compareAndSet(0, 2);//count=2
count.compareAndSet(0, 1);//count=2,不是0,则不执行
count.compareAndSet(1, 3);//count=2,不是0,则不执行
count.compareAndSet(2, 4);//count=4
count.compareAndSet(3, 5);//count=4,不是0,则不执行
log.info("count:{}", count.get());//count:4
}
}
package com.huhao.concurrency.example.atomic;
import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
/**
* 代码模拟并发测试
* AtomicReferenceFieldUpdater包演示
*/
@Slf4j
@ThreadSafe
public class AtomicExampleAtomicExampleAtomicReference {
//AtomicExampleAtomicExampleAtomicReference的count字段,必须用volatile修饰
private static AtomicIntegerFieldUpdater<AtomicExampleAtomicExampleAtomicReference>
updater = AtomicIntegerFieldUpdater.newUpdater(AtomicExampleAtomicExampleAtomicReference.class, "count");
@Getter
public volatile int count = 100;
public static void main(String[] args) {
AtomicExampleAtomicExampleAtomicReference example5 = new AtomicExampleAtomicExampleAtomicReference();
if (updater.compareAndSet(example5, 100, 120)) {
log.info("update success 1,{}", example5.getCount()); // update success 1,120
if (updater.compareAndSet(example5, 100, 120)) {
log.info("update success 2,{}", example5.getCount());
} else {
log.info("update failed,{}", example5.getCount()); // update failed,120
}
}
}
}
AtomicStampReference : CAS的ABA问题
aba问题:cas操作时候,其他线程将变量值A,从A改到B,又改回A。本线程在比较的时候,发现A变量没有变,于是就讲A值进行了交换操作,其实该值以及被其他线程更新过,与最初设计初衷不符。
解决:在变量更新时候,将变量版本号加1
AtomicBoolean
只执行一次
import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 代码模拟并发测试
* AtomicBoolean包演示
*/
@Slf4j
@ThreadSafe
public class AtomicExampleAtomicBoolean {
//请求总数
public static int clientTotal = 5000;
//同时并发执行的线程数
public static int threadTotal = 200;
public static AtomicBoolean isHappened = new AtomicBoolean();
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int index = 0; index < clientTotal; index++) {
exec.execute(() -> {
try {
//线程请求,如果线程数已经到了,可能会阻塞,等有线程释放了再执行
semaphore.acquire();
test();
//add执行完后,释放当前线程
semaphore.release();
} catch (InterruptedException e) {
log.error("exception", e);
e.printStackTrace();
}
countDownLatch.countDown();
});
}
//保证线程减到0
countDownLatch.await();
//关闭线程池
exec.shutdown();
log.error("count:{}", isHappened.get());
}
private static void test() {
if (isHappened.compareAndSet(false, true)) {
//5000次线程,只会执行一次
log.info("exec");
}
}
}
原子性-synchronized
◆修饰代码块:大括号括起来的代码,作用于调用的对象
◆修饰方法:整个方法,作用于调用的对象
◆修饰静态方法:整个静态方法,作用于所有对象
◆修饰类:括号括起来的部分,作用于所有对象
package com.huhao.concurrency.example.sync;
import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Synchronized测试修饰代码块和方法
*/
@Slf4j
@ThreadSafe
public class SynchronizedExample {
/**
* 一般的循环,多线程测下看看对不对
*/
private void notSyncBlock() {
for (int i = 0; i < 10; i++) {
log.info("notSyncBlock - {}", i);
}
}
/**
* 修饰代码块
* 大括号括起来的代码,作用于调用的对象
*/
private void synchronizedBlock() {
synchronized (this) {
for (int i = 0; i < 4; i++) {
log.info("test1 - {}", i);
}
}
}
/**
* 修饰代码块
* 大括号括起来的代码,作用于调用的对象
*/
private void synchronizedBlock(int j) {
synchronized (this) {
for (int i = 0; i < 4; i++) {
log.info("synchronizedBlock {} - {}", j, i);
}
}
}
/**
* 修饰方法
* 整个方法,作用于调用的对象
*/
private synchronized void synchronizedMethod() {
for (int i = 0; i < 10; i++) {
log.info("test2 - {}", i);
}
}
//请求总数
public static int clientTotal = 4;
//同时并发执行的线程数
public static int threadTotal = 3;
public static void main(String[] args) throws InterruptedException {
SynchronizedExample synchronizedExample = new SynchronizedExample();
SynchronizedExample synchronizedExample2 = new SynchronizedExample();
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
/**
* 线程同步执行测试是不是同步一个个的执行
*/
for (int index = 0; index < clientTotal; index++) {
exec.execute(() -> {
try {
//线程请求,如果线程数已经到了,可能会阻塞,等有线程释放了再执行
semaphore.acquire();
/**
* 没有加sync的,
* notSyncBlock - 0
* notSyncBlock - 0
* notSyncBlock - 0
* notSyncBlock - 1
* notSyncBlock - 1
* notSyncBlock - 1
* notSyncBlock - 2
* notSyncBlock - 2
* notSyncBlock - 2
* notSyncBlock - 3
*/
// synchronizedExample.notSyncBlock();
/**
* 加了sync的,同步执行
* test1 - 0
* test1 - 1
* test1 - 2
* test1 - 3
* test1 - 0
* test1 - 1
* test1 - 2
* test1 - 3
* test1 - 0
* test1 - 1
* test1 - 2
* test1 - 3
* test1 - 0
*/
// synchronizedExample.synchronizedBlock();
/**
* example和example2会同时调用,但是本身是顺序执行的
* 同一个对象,是同步执行
* 不同对象间,不干扰
*/
// synchronizedExample.synchronizedBlock(0);
// synchronizedExample2.synchronizedBlock(1);
// synchronizedExample2.synchronizedBlock(2);
//add执行完后,释放当前线程
semaphore.release();
} catch (InterruptedException e) {
log.error("exception", e);
e.printStackTrace();
}
countDownLatch.countDown();
});
}
//保证线程减到0
countDownLatch.await();
//关闭线程池
exec.shutdown();
// synchronizedExample.synchronizedMethod();
}
}
package com.huhao.concurrency.example.sync;
import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
/**
* Synchronized 测试修饰静态方法和类
*/
@Slf4j
@ThreadSafe
public class SynchronizedExample2 {
/**
* 一般的循环,多线程测下看看对不对
*/
public void notSyncBlock() {
for (int i = 0; i < 10; i++) {
log.info("notSyncBlock - {}", i);
}
}
/**
* 修饰类
*/
public static void synchronizedBlock() {
synchronized (SynchronizedExample2.class) {
for (int i = 0; i < 4; i++) {
log.info("test1 - {}", i);
}
}
}
/**
* 修饰静态方法
*/
public static synchronized void synchronizedMethod() {
for (int i = 0; i < 10; i++) {
log.info("test2 - {}", i);
}
}
//请求总数
public static int clientTotal = 4;
//同时并发执行的线程数
public static int threadTotal = 3;
public static void main(String[] args) throws InterruptedException {
}
}
对比:
◆synchronized :不可中断锁,适合竞争不激烈,可读性好
◆Lock :可中断锁,多样化同步,竞争激烈时能维持常态
◆Atomic :竞争激烈时能维持常态,比Lock性能好;只能同步一个值
可见性
导致共享变量在线程间不可见的原因
◆线程交叉执行
◆重排序结合线程交叉执行
◆共享变量更新后的值没有在工作内存与主存间及时更新
可见性- synchronized
JMM关于synchronized的两条规定:
◆线程解锁前 ,必须把共享变量的最新值刷新到主内存
◆线程加锁时 ,将清空工作内存中共享变量的值,从而使用共享变量时需要从主内存中重新读取最新的值(注意,
加锁与解锁是同一把锁)
可见性- volatile
通过加入内存屏障和禁止重排序优化来实现
◆对volatile变量写操作时 ,会在写操作后加入-条store屏障指令,将本地内存中的共享变量值刷新到主内存
◆对volatile变量读操作时 ,会在读操作前加入-条load屏障指令,从主内存中读取共享变量