15.多线程总结(二)
原子类
什么是原子类
原子曾经被当作不可分割的最小单位,所以原子类可以认为他的操作都是不可分割的,具有原子性,为什么需要原子类呢?对于多线程访问同一个变量,需要加锁,而加锁是比较消耗性能的,JDK1.5之后,新增的原子类提供了一种用法简单,性能高效,线程安全的更新一个变量的方式,这些类位于JUC下的atomic包,发展到JDK1.8,该包下共有17个类,包括了原子更新基本类型,原子更新数组,原子更新属性,原子更新引用几种
1.8新增的:DoubleAccumulator,DoubleAdder,LongAccumulator,LongAdder,Striped64
原子更新基本类型
发展至JDk1.8,基本类型原子类有以下几个:
AtomicBoolean AtomicLong AtomicInteger,DoubleAccumulator,DoubleAdder,LongAccumulator,LongAdder
大致可分为3类:
AtomicBoolean、AtomicInteger、AtomicLong 元老级的原子更新,方法几乎一模一样,DoubleAdder、 LongAdder 对Double、Long的原子更新性能进行优化提升,DoubleAccumulator、LongAccumulator 支持自定 义运算
原子更新数组类型
AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
原子更新属性
原子的更新某个类的某个字段时,就需要使用原子更新字段类,atomic包提供了4个类进行原子字段更新,AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicStampedReference,AtomicReferenceFieldUpdater
使用上述类的时候,必须遵循以下原则:
1.字段必须是volatile类型的,在线程之间共享变量时保证立即可见
2.字段的描述类型是与调用者与操作对象字段的关系一致。也就是说调用者能够直接操作对象字段,那么就可以反射进行原子操作。
3.对于父类的字段,子类是不能直接操作的,尽管子类可以访问父类的字段。
4.只能是实例变量,不能是类变量,也就是说不能加static关键字。
5.只能是可修改变量,不能使final变量,因为final的语义就是不可修改。
6.对于AtomicIntegerFieldUpdater和AtomicLongFieldUpdater只能修改int/long类型的字段,不能修改其包装
类型(Integer/Long)。
7.如果要修改包装类型就需要使用AtomicReferenceFieldUpdater。
原子更新引用
AtomicReference:用于对引用的原子更新
AtomicMarkableReference:带版本戳的原子引用类型,版本戳为boolean类型
AtomicStampedReference:带版本戳的原子引用类型,版本戳为int类型。
容器
同步容器与并发容器
1.同步容器
Vector、HashTable -- JDK提供的同步容器类 Collections.synchronizedXXX 本质是对相应的容器进行包装
2.同步容器类的缺点
在单独使用里面的方法的时候,可以保证线程安全,但是,复合操作需要额外加锁来保证线程安全 使用 Iterator迭代容器或使用使用for-each遍历容器,在迭代过程中修改容器会抛出 ConcurrentModificationException异常。想要避免出现ConcurrentModificationException,就必须在迭代过程 持有容器的锁。但是若容器较大,则迭代的时间也会较长。那么需要访问该容器的其他线程将会长时间等待。 从而会极大降低性能。 若不希望在迭代期间对容器加锁,可以使用"克隆"容器的方式。使用线程封闭,由于 其他线程不会对容器进行修改,可以避免ConcurrentModificationException。但是在创建副本的时候,存在较 大性能开销。 toString,hashCode,equalse,containsAll,removeAll,retainAll等方法都会隐式的Iterate, 也即可能抛出ConcurrentModificationException。
单线程下发生并发修改异常
/******************单线程下发生并发修改异常**********************/
import java.util.Vector;
public class TestVector {
public static void main(String[] args) {
Vector<String> v = new Vector<>();
for (int i = 0; i < 100; i++) {
//在单独使用里面的方法的时候,可以保证线程安全(add方法时同步的)
v.add("num"+i);
}
//使用 Iterator迭代容器或使用使用for-each遍历容器,在迭代过程中修改容
//器会抛出 ConcurrentModificationException异常
for (String s : v) {
if ("num2".equals(s)){
v.remove(s);//java.util.ConcurrentModificationException
}
System.out.println(s);
}
}
}
public synchronized boolean add(E e) {
modCount++;
ensureCapacityHelper(elementCount + 1);
elementData[elementCount++] = e;
return true;
}
单线程下使用迭代器不会发生并发修改异常
public class TestVector {
/*****************单线程下使用迭代器不会发生并发修改异常**********************/
public static void main(String[] args) {
Vector<String> v = new Vector<>();
for (int i = 0; i < 100; i++) {
v.add("num"+i);
}
Iterator<String> iterator = v.iterator();
while(iterator.hasNext()){
String next = iterator.next();
if ("num2".equals(next)){
iterator.remove();
}
System.out.println(next);
}
}
}
多线程下使用迭代器发生异常
public class TestVector {
public static void main(String[] args) {
Vector<String> v = new Vector<>();
for (int i = 0; i < 100; i++) {
v.add("num"+i);
}
Iterator<String> iterator = v.iterator();
for (int i = 0; i < 4; i++) {
new Thread(new Runnable() {
@Override
public void run() {
while(iterator.hasNext()){
//NoSuchElementException看源码是从next方法抛出的,我们来分析一下
//本身next方法是同步的,是安全的,但是,复合操作下就不是了,
//假设两个线程,线程1走到while(iterator.hasNext())时,是第99个,判断还有下一个
//但是由于时间片切换,此时线程1在这个位置休眠了,然后线程2
//也走到了这个位置,由于游标还没有移动,所以iterator.hasNext()
//还是成立的,所以他也进来了,然后线程2继续,走完了iterator.next()
//此时其实游标已经到最后一个了,然后线程1醒过来,由于他
//已经判断过iterator.hasNext(),所以他也会执行iterator.next()
//问题就在这个时候发生了, if (i >= elementCount)满足条件,触发了异常
String next = iterator.next();
if ("num2".equals(next)){
iterator.remove();
}
}
}
}).start();
}
}
}
//Vector的next方法
public E next() {
synchronized (Vector.this) {
checkForComodification();
int i = cursor;
if (i >= elementCount)
throw new NoSuchElementException();
cursor = i + 1;
return elementData(lastRet = i);
}
}
Exception in thread "Thread-0" java.util.NoSuchElementException
at java.util.Vector$Itr.next(Vector.java:1166)
at TestVector$1.run(TestVector.java:16)
at java.lang.Thread.run(Thread.java:748)
解决上边问题的办法就是加锁,将hasNext和next方法锁在一起
public class TestVector {
public static void main(String[] args) {
Vector<String> v = new Vector<>();
for (int i = 0; i < 100; i++) {
v.add("num"+i);
}
Iterator<String> iterator = v.iterator();
for (int i = 0; i < 4; i++) {
new Thread(new Runnable() {
@Override
public void run() {
synchronized (iterator) {
while (iterator.hasNext()) {
String next = iterator.next();
if ("num2".equals(next)) {
iterator.remove();
}
}
}
}
}).start();
}
}
}
3.并发容器
CopyOnWrite、Concurrent、BlockingQueue 根据具体场景进行设计,尽量避免使用锁,提高容器的并发访问 性。
ConcurrentBlockingQueue:基于queue实现的FIFO的队列。队列为空,取操作会被阻塞
ConcurrentLinkedQueue,队列为空,取得时候就直接返回空
2.并发容器
CopyOnWriteArrayList
同步容器我们以Vector为例通过代码演示了他在使用中可能存在的问题,并发容器我们以CopyOnWriteArrayList为例来进行演示,在同样的场景下是否也会有这种问题?
单线程下遍历过程中删除,未发生并发修改异常
public class TestCopyOnWriteArrayList {
public static void main(String[] args) {
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
for (int i = 0; i < 1000; i++) {
list.add("item"+i);
}
list.forEach(e->{
if("item2".equals(e)){
list.remove(e);
}
});
}
}
单线程下使用迭代器遍历并删除元素
Exception in thread "main" java.lang.UnsupportedOperationException,CopyOnWriteArrayList不支持迭代器遍历
public class TestCopyOnWriteArrayList {
public static void main(String[] args) {
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
for (int i = 0; i < 1000; i++) {
list.add("item"+i);
}
Iterator<String> iterator = list.iterator();
while(iterator.hasNext()){
String next = iterator.next();
if ("item0".equals(next)){
System.out.println("item="+next);
iterator.remove();
}else{
System.out.println(next);
}
}
System.out.println("done");
}
}
多线程下遍历并移除指定元素,不会发生ConcurrentModifyException,我们来看看remove方法源码
import java.util.concurrent.CopyOnWriteArrayList;
public class TestCopyOnWriteArrayList {
public static void main(String[] args) {
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
for (int i = 0; i < 100; i++) {
list.add("item"+i);
}
for (int i = 0; i < 4; i++) {
new Thread(new Runnable() {
@Override
public void run() {
//根据index移除
for (int i1 = 0; i1 < list.size(); i1++) {
String s = list.get(i1);
System.out.println("bbbbbbbbbb=="+s);
if ("item2".equals(s)){
System.out.println("移除item2");
list.remove(i1);
}
}
}
}).start();
}
System.out.println("11111");
for (int i = 0; i < 4; i++) {
new Thread(new Runnable() {
@Override
public void run() {
//直接移除对象
for (String s : list) {
System.out.println("aaaaaaaaaaaa=="+s);
if ("item3".equals(s)){
System.out.println("移除item3");
list.remove(s);
}
}
}
}).start();
}
System.out.println("222222");
}
}
remove(index)
根据index进行元素移除的时候,有加锁,所以多线程不会同时进行移除,不会发生问题。拿到底层数组,把除被移除的元素之外的元素重新拷贝一份到新的数组种,所以每次移除都会导致数组的拷贝
public E remove(int index) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
E oldValue = get(elements, index);
int numMoved = len - index - 1;
if (numMoved == 0)
setArray(Arrays.copyOf(elements, len - 1));
else {
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
setArray(newElements);
}
return oldValue;
} finally {
lock.unlock();
}
}
remove(Object)
根据元素对象进行移除,会判断当前数组种是否存在这个元素,如果已经被其他线程移除了,那么就不会再次移除,返回false,否则才去移除元素,所以也不会发生异常
public boolean remove(Object o) {
Object[] snapshot = getArray();
int index = indexOf(o, snapshot, 0, snapshot.length);
return (index < 0) ? false : remove(o, snapshot, index);
}
为什么ArrayList 遍历的时候remove会发生并发修改异常,而Vector和CopyOnWriteArrayList不会,就是因为后两者都是枷锁的,ArrayList只有在单线程中才能保证安全
LinkedBlockingQueue
在并发编程中LinkedBlockingQueue使用非常频繁,因为它可以作为生产者和消费者的中间商,下面对它几个主要方法的源码进行探秘:
public static void main(String[] args) {
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
boolean add = queue.add(1);
System.out.println("add添加1"+add);
try {
queue.put(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean offer = queue.offer(3);
System.out.println("offer添加3"+offer);
Integer remove = queue.remove();
System.out.println("remove取出"+remove);
Integer poll = queue.poll();
System.out.println("poll取出"+poll);
try {
Integer take = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("take取出"+poll);
}
往队列中添加
#######add():调用offer方法实现的,区别是在队列满的时候会抛出异常
public boolean add(E e) {
if (offer(e))
return true;
else //容量已满,抛出异常
throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
//获取到队列长度
final AtomicInteger count = this.count;
//判断长度是否已经达到容量值
if (count.get() == capacity)
//容量已满,返回false
return false;
int c = -1;
//创建一个node,保存当前元素和下一个元素
Node<E> node = new Node<E>(e);
//拿到put锁
final ReentrantLock putLock = this.putLock;
//锁住
putLock.lock();
try {
//再次判断当前容量是否没满
if (count.get() < capacity) {
//没满,就入队
enqueue(node);
//cas的方式自增1.因为新入队了一个元素,getAndIncrement返回值是
//自增前的值,所以添加第一个元素后,自增后是1,返回值是0
c = count.getAndIncrement();
//入队之后,如果当前队列至少还能容纳一个元素
if (c + 1 < capacity)
//至少还能容纳一个元素,通知其他线程添加
//notFull是一个Condition,它的作用是唤醒notFull.await()方法,而
//notFull.await()方法只有以put方式添加元素的时候有用到,因为只
//有通过put添加元素,在元素满的时候会进行阻塞,这里调用它的
//目的是,有可能此时有其他线程在通过put方法往队列中添加元素
//但是上一次它判断队列满了阻塞在那里了,然后本线程在天加的
//时候发现队列有空间了,就去随机唤醒一个阻塞中的线程
notFull.signal();
}
} finally {
//释放锁
putLock.unlock();
}
//c=0的情况只有队列为空,添加第一个元素之后,满足,也就是
//添加了一个元素,通知其他线程队列有元素了,可以取了
if (c == 0)
signalNotEmpty();
return c >= 0;
}
private void signalNotEmpty() {
//拿到take锁,
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//添加一个元素后,队列不再是空的,通知出去,notEmpty.await()在take()方法
//中有被调用,就是当队列为空的时候进行阻塞,这里已经向队列中
//添加了一个元素,不再是空的,所以唤醒阻塞的线程去取元素
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
可以看到这里调用了两个唤醒线程的操作notFull.signal(),只要队列还能容纳至少一个元素,就通知其他线程往里边加,notEmpty.signal()只要队列从空变成了添加了一个元素,就通知其他线程往外取
接下来进入enqueue(node),很简单,就是把这个元素加到了队列上
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
#######offer():同上,区别是队列满了直接返回false,不会抛出异常
#######put():队列满,阻塞
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
//拿到put锁
final ReentrantLock putLock = this.putLock;
//拿到当前长度
final AtomicInteger count = this.count;
//也是加锁,和lock的区别就是会抛出中断异常,因为它会阻塞
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
//满了,阻塞,这个时候如果调用了Thread.interrupt方法,就会抛出
//InterruptException
notFull.await();
}
//入队,和上边一样的
enqueue(node);
//自增1
c = count.getAndIncrement();
if (c + 1 < capacity)
//同样,唤醒其他线程添加元素
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
//唤醒其他线程取元素
signalNotEmpty();
}
从队列中取
#######remove():直接调用poll,区别是空的时候会抛异常
public E remove() {
E x = poll();
if (x != null)
return x;
else //队列为空,抛出异常
throw new NoSuchElementException();
}
public E poll() {
final AtomicInteger count = this.count;
//如果队列长度为空,返回null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
//获取到take锁
final ReentrantLock takeLock = this.takeLock;
//锁住
takeLock.lock();
try {
//再次判断队列中有没有元素
if (count.get() > 0) {
//从队列中取出
x = dequeue();
//队列数量减1
c = count.getAndDecrement();
//元素被取出一个之后,只要还有剩余的元素,就唤醒阻塞中的线程
//取元素,这里被唤醒的是take方法中的notEmpty.await()
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();//解锁
}
if (c == capacity)
//只有当队列是满的,此时取出了一个元素之后,会满足这里的条件,
//count.getAndDecrement()会将队列长度减1然后返回减1之前的值
//也就是capacity,此时队列已经不满了,通知其他线程往里边put(put方法调用如果队列满了会阻塞,这里就是唤醒那个线程)
signalNotFull();
return x;
}
#######poll():空的时候返回null,不抛异常,源码见上边
#######take():队列为空的时候,进入等待
public E take() throws InterruptedException {
E x;
int c = -1;
//队列长度
final AtomicInteger count = this.count;
//take锁
final ReentrantLock takeLock = this.takeLock;
//锁住,但是在await的情况下有可能抛出InterruptException
takeLock.lockInterruptibly();
try {
//如果队列为空,阻塞
while (count.get() == 0) {
notEmpty.await();
}
// 取出一个元素
x = dequeue();
//长度减1
c = count.getAndDecrement();
if (c > 1)
//取出一个之后,只要还有元素,就唤醒可能在等待中的线程取元素
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
//只有当队列是满的,此时取出了一个元素之后,会满足这里的条件,
//count.getAndDecrement()会将队列长度减1然后返回减1之前的值
//也就是capacity,此时队列已经不满了,通知其他线程往里边put(put方法调用如果队列满了会阻塞,这里就是唤醒那个线程)
signalNotFull();
return x;
}