java并发编程实战一之基础篇
缓存一致性问题
计算机在执行程序时,每条指令都是在CPU中执行的,而执行指令过程中,势必涉及到数据的读取和写入。由于程序运行过程中的临时数据是存放在主存(物理内存)当中的,这时就存在一个问题,由于CPU执行速度很快,而从内存读取数据和向内存写入数据的过程跟CPU执行指令的速度比起来要慢的多,因此如果任何时候对数据的操作都要通过和内存的交互来进行,会大大降低指令执行的速度。因此在CPU里面就有了高速缓存。
也就是,当程序在运行过程中,会将运算需要的数据从主存复制一份到CPU的高速缓存当中,那么CPU进行计算时就可以直接从它的高速缓存读取数据和向其中写入数据,当运算结束之后,再将高速缓存中的数据刷新到主存当中。
在多核CPU中,每条线程可能运行于不同的CPU中,因此每个线程运行时有自己的高速缓存。被多个线程访问的共享变量在多个CPU中都存在缓存,这里那么就可能存在缓存不一致的问题
所以就出现了缓存一致性协议。最出名的就是Intel 的MESI协议,MESI协议保证了每个缓存中使用的共享变量的副本是一致的。它核心的思想是:当CPU写数据时,如果发现操作的变量是共享变量,即在其他CPU中也存在该变量的副本,会发出信号通知其他CPU将该变量的缓存行置为无效状态,因此当其他CPU需要读取这个变量时,发现自己缓存中缓存该变量的缓存行是无效的,那么它就会从内存重新读取。
缓存一致性
线程安全性
- 原子性
即一个操作或者多个操作 要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行
思考:?int long double读写操作的原子性
思考:?int i++的原子性
- 可见性
可见性是指当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值
与缓存相关,某线程改变了数据,其他线程没有立即看到修改后的值
- 有序性
即程序执行的顺序按照代码的先后顺序执行
与指令重排序有关。一般来说,处理器为了提高程序运行效率,可能会对输入代码进行优化,它不保证程序中各个语句的执行先后顺序同代码中的顺序一致,但是它会保证程序最终执行结果和代码顺序执行的结果是一致的。
Java内存模型具备一些先天的“有序性”,即不需要通过任何手段就能够得到保证的有序性,这个通常也称为 happens-before 原则。如果两个操作的执行次序无法从happens-before原则推导出来,那么它们就不能保证它们的有序性,虚拟机可以随意地对它们进行重排序。
- 程序次序规则:一个线程内,按照代码顺序,书写在前面的操作先行发生于书写在后面的操作
- 锁定规则:一个unLock操作先行发生于后面对同一个锁额lock操作
- volatile变量规则:对一个变量的写操作先行发生于后面对这个变量的读操作
- 传递规则:如果操作A先行发生于操作B,而操作B又先行发生于操作C,则可以得出操作A先行发生于操作C
- 线程启动规则:Thread对象的start()方法先行发生于此线程的每个一个动作
- 线程中断规则:对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生
- 线程终结规则:线程中所有的操作都先行发生于线程的终止检测,我们可以通过Thread.join()方法结束、Thread.isAlive()的返回值手段检测到线程已经终止执行
- 对象终结规则:一个对象的初始化完成先行发生于他的finalize()方法的开始
当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。
对象的共享
加锁与volatile
加锁机制既可以确保可见性,又可以确保原子性,而volatile变量只能确保可见性。
发布与逸出
发布一个对象的意思是指,使对象能够在当前作用域之外的代码中使用。
发布内部状态可能会破坏封装性,并使得程序难以维持不变性条件。
当某个不应该发布的对象被发布时,这种情况就被称之为逸出。
当一个对象发布时,在该对象的非私有域中引用的所有对象同样会被发布。一般来说,如果一个已经发布的对象能够通过非私有的变量引用和方法调用到达其他的对象,那么这些对象也都会被发布。
不要在构造过程中使this引用逸出。
线程封闭
如果仅在单线程内访问数据,就不需要同步。
- Ad-hoc线程封闭(脆弱)
- 栈封闭
- ThreadLocal类
不变性
满足同步需求的另一种方法是使用不可变对象(Immutable Object)
不可变对象:
- 对象创建以后其状态就不能修改
- 对象的所有域都是final类型
- 对象是正确创建的
安全发布
- 在静态初始化函数中初始化一个对象引用
- 将对象的引用保存到volatile类型的域或者AtomicReferance对象中
- 将对象的引用保存到某个正确构造对象的final类型域中
- 将对象的引用保存到一个由锁保护的域中
线程安全库的容器类:
HashTable、synchronizedMap、ConcurrentMap
Vector、CopyOnWriteArrayList、CopyOnWriteArraySet、synchronizedList、synchronizedSet
BlockingQueue、ConcurrentLinkedQueue
事实不可变对象(Effectively Immutable Object):如果对象从技术上看是可变的,但其状态在发布后不会再改变,那么把这种对象称为事实不可变对象。
- 不可变对象可以通过任意机制发布
- 事实不可变对象必须通过安全方式发布
- 可变对象必须通过安全方式来发布,并且必须是线程安全的或者由某个锁保护起来
安全地共享对象
- 线程封闭
- 只读共享
- 线程安全共享
- 保护共享
对象的组合
设计线程安全的类
在设计线程安全类的过程中,需要包含以下三个基本要素:
- 找出构成对象状态的所有变量
- 找出约束状态变量的不变性条件
- 建立对象状态的并发访问管理策略
Java监视器模式:对于任何一种锁对象,自始至终都使用该锁对象,都可以用来保护对象的状态
@NotThreadSafe
public class MutablePoint {
public int x,y;
public MutablePoint() {
x = 0;
y = 0;
}
public MutablePoint(MutablePoint p) {
this.x = p.x;
this.y = p.y;
}
}
@ThreadSafe
public class MonitorVehicleTracker {
@GuardedBy("this")
private final Map<String,MutablePoint> locations;
public MonitorVehicleTracker(Map<String, MutablePoint> locations) {
this.locations = deepCopy(locations);
}
public synchronized Map<String,MutablePoint> getLocations(){
return deepCopy(locations);
}
public synchronized MutablePoint getLocation(String id){
MutablePoint loc = locations.get(id);
return loc == null ? null : new MutablePoint(loc);
}
public synchronized void setLocation(String id,int x,int y){
MutablePoint loc = locations.get(id);
if(loc == null){
throw new IllegalArgumentException("No such ID:" + id);
}
loc.x = x;
loc.y = y;
}
private static Map<String,MutablePoint> deepCopy(Map<String,MutablePoint> m){
Map<String,MutablePoint> result = new HashMap<>();
for(String id:m.keySet()){
result.put(id,new MutablePoint(m.get(id)));
}
return Collections.unmodifiableMap(result);
}
}
线程安全性的委托
如果一个类是由多个独立且线程安全的状态变量组成,并且在所有的操作中都不包含无效状态转换,那么可以将线程安全性委托给底层的状态变量。
public class VisualComponent {
private final List<KeyListener> keyListeners = new CopyOnWriteArrayList<>();
private final List<MouseListener> mouseListeners = new CopyOnWriteArrayList<>();
public void addKeyListener(KeyListener listener){
keyListeners.add(listener);
}
public void addMouseListener(MouseListener listener){
mouseListeners.add(listener);
}
public void removeKeyListener(KeyListener listener){
keyListeners.remove(listener);
}
public void removeMouseListener(MouseListener listener){
mouseListeners.remove(listener);
}
}
Java里的基础构建模块
同步容器类
Vector HashTable Collections.synchronizedXxx工厂方法
- 同步容器的线程安全问题
public static <T> T getLast(Vector<T> vector){
int lastIndex = vector.size() - 1;
return vector.get(lastIndex);
}
public static <T> T deleteLast(Vector<T> vector){
int lastIndex = vector.size() - 1;
return vector.remove(lastIndex);
}
在多线程中上述方法是不安全的,虽然Vector是安全的容器,但size()方法和get()或者remove()同时使用,存在“先检查再运行”操作,就会抛出异常(ArrayIndexOutOfBoundsException),所以需要在客户端加锁
public static <T> T getLast(Vector<T> vector) {
synchronized (vector) {
int lastIndex = vector.size() - 1;
return vector.get(lastIndex);
}
}
public static <T> T deleteLast(Vector<T> vector) {
synchronized (vector) {
int lastIndex = vector.size() - 1;
return vector.remove(lastIndex);
}
}
- 迭代器与ConcurrentModificationException
在设计同步容器的迭代器时并没有考虑并发修改的问题,它们表现出的行为是及时失败(fail-fast)。
List<Widget> widgeList = Collections.synchronizedList(new ArrayList<Widget>());
//可能抛出ConcurrentModificationException
for(Widget w:widgeList){
doSomeThing(w);
}
解决方法有两种:一是加锁,但可能会产生死锁;二是克隆,这里的性能开销也很大。
- 隐藏迭代器
容器的toString()、hashCode()、equals()、containsAll()、removeAll()、retainAll()以及把容器作为参数的构造方法,都会对容器进行迭代。这些操作都有可能抛出ConcurrentModificationException
并发容器
- ConcurrentHashMap 替代同步Map
使用分段锁
迭代器具有弱一致性(可以容忍并发修改,但并不能保证在迭代器被构造后将修改操作反映给容器,所以size()和isEmpty()的语义被略微减弱了。 - CopyOnWriteArrayList/CopyOnWriteArraySet
"写入时复制"容器,每次修改时,都会创建并重新发布一个新的容器副本
/**
* Replaces the element at the specified position in this list with the
* specified element.
*
* @throws IndexOutOfBoundsException {@inheritDoc}
*/
public E set(int index, E element) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
E oldValue = get(elements, index);
if (oldValue != element) {
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len);
newElements[index] = element;
setArray(newElements);
} else {
// Not quite a no-op; ensures volatile write semantics
setArray(elements);
}
return oldValue;
} finally {
lock.unlock();
}
}
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return <tt>true</tt> (as specified by {@link Collection#add})
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
容器规模较大时,底层复制需要一定的开销。仅当迭代操作远远多于修改操作时,才使用"写入时复制"容器。
- ConcurrentSkipListMap 替代同步的SortedMap
- ConcurrentSkipListSet 替代同步的SortedSet
- Queue ConcurrentLinkedQueue(先进先出) PriorityQueue(优先队列)
- BlockingQueue 阻塞队列 LinkedBlockingQueue/ArrayBlockingQueue(FIFO) PriorityBlockingQueue(优先队列) SynchronousQueue(维护一组线程,不维护存储空间,直接交付)
put()和take()是可阻塞的
支持生产者消费者模式
支持有界或者无界队列 - Deque BlockDeque 双端队列和工作密取
每个消费者都有自己的双端队列,消费完自己的任务,就去其他队列的末尾秘密的获取工作
阻塞方法中断方法
某方法抛出InterruptedException时,表示该方法是一个阻塞方法。
捕获异常,恢复中断
try {
processTask(fileQueue.take())
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
同步工具类
- CountDownLatch
是一个或多个线程等待一组事件发生
public class TestHarness {
public static long timeTasks(int nThreads,final Runnable task) throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for(int i = 0;i < nThreads;i++){
// Runnable t = new Runnable() {
// @Override
// public void run() {
// try {
// startGate.await();
// try {
// task.run();
// } finally {
// endGate.countDown();
// }
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
// };
Thread t = new Thread(){
@Override
public void run() {
try {
startGate.await();
try {
task.run();
} finally {
endGate.countDown();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
return end - start;
}
public static void main(String[] args) throws InterruptedException {
Runnable a = new Runnable() {
@Override
public void run() {
int sum = 0;
for(int i = 0; i < 1000000; i++){
sum += i;
}
System.out.println(sum);
}
};
System.out.println( timeTasks(100,a) );
}
}
- FutureTask
异步获取执行的结果 - 信号量 Semaphore
控制同时访问某个特定资源的操作数量或者同时执行某个制定操作的数量
实现资源池
对容器施加边界 - 栅栏 CyclicBarrier Exchanger
简单的可伸缩性缓存
public class Momoizerl<A,V> implements Computable<A,V> {
private final ConcurrentMap<A,Future<V>> cache = new ConcurrentHashMap<>();
private final Computable<A,V> c;
public Momoizerl(Computable<A, V> c) {
this.c = c;
}
@Override
public V compute(final A arg) throws InterruptedException {
while (true){
Future<V> f = cache.get(arg);
if(f == null){
Callable<V> eval = new Callable<V>() {
@Override
public V call() throws Exception {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<V>(eval);
f = cache.putIfAbsent(arg,ft);
if(f == null){
f = ft;
ft.run();
}
}
try {
return f.get();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}
基础小结
- 可变状态越少,越容易确保线程安全性
- 尽量将域声明为final类型
- 不可变对象一定是线程安全的
- 使用所来保护可变变量
- 当保护同一个不变性条件中的所有变量时,使用同一个锁
- 复合操作,使用锁
- 安全的适当的使用并发容器和同步工具