10 并发容器(ConcurrentHashMap)

2019-10-10  本文已影响0人  攻城狮哦哦也

1 预备知识

1.1 为什么高并发中比较少用HashMap和HashTable

HashMap
Hashmap多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。
HashTable
HashTable使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下。因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同步方法时,会进入阻塞或轮询状态。如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法来获取元素,所以竞争越激烈效率越低。

1.2 Hash

概述
Hash,一般翻译做散列、杂凑,或音译为哈希,是把任意长度的输入(又叫做预映射pre-image)通过散列算法变换成固定长度的输出,该输出就是散列值。这种转换是一种压缩映射,也就是,散列值的空间通常远小于输入的空间,不同的输入可能会散列成相同的输出,所以不可能从散列值来确定唯一的输入值。简单的说就是一种将任意长度的消息压缩到某一固定长度的的函数。
Hash算法可以将一个数据转换为一个标志,这个标志和源数据的每一个字节都有十分紧密的关系。Hash算法还具有一个特点,就是很难找到逆向规律。

Hash算法是一个广义的算法,也可以认为是一种思想,使用Hash算法可以提高存储空间的利用率,可以提高数据的查询效率,也可以做数字签名来保障数据传递的安全性。所以Hash算法被广泛地应用在互联网应用中。 [1]

Hash算法也被称为散列算法,Hash算法虽然被称为算法,但实际上它更像是一种思想。Hash算法没有一个固定的公式,只要符合散列思想的算法都可以被称为是Hash算法。 [2]
Hash冲突

Hi=RH1(key) i=1,2,…,k

当哈希地址Hi=RH1(key)发生冲突时,再计算Hi=RH2(key)……,直到冲突不再产生。这种方法不易产生聚集,但增加了计算时间。

注:
ConcurrentHashMap在发生hash冲突时采用了链地址法。
md4,md5,sha-hash算法也属于hash算法,又称摘要算法。

1.3 位运算

image.png
二进制运算规则,每一位的结果:对应位上的数字(1/0) * 2(对应位下标作为指数)
将所有位数求和,则就是该二进制的十进制结果
由上面的表格可以看出,数字类型在数字渐渐变大时,是由低位慢慢向高位扩展的。
Java实际保存int型时 正数 第31位 =0 负数:第31位=1
常用的位运算
public class Permission {
    
    // 是否允许查询,二进制第1位,0表示否,1表示是  
    public static final int ALLOW_SELECT = 1 << 0; // 0001  = 1
      
    // 是否允许新增,二进制第2位,0表示否,1表示是  
    public static final int ALLOW_INSERT = 1 << 1; // 0010  = 2
      
    // 是否允许修改,二进制第3位,0表示否,1表示是  
    public static final int ALLOW_UPDATE = 1 << 2; // 0100  =4
      
    // 是否允许删除,二进制第4位,0表示否,1表示是  
    public static final int ALLOW_DELETE = 1 << 3; // 1000  = 8
    // 存储目前的权限状态  
    private int flag; 
    //设置用户的权限
    public void setPer(int per) {
        flag = per;
    }
    //增加用户的权限(1个或者多个)
    public void enable(int per) {
        flag = flag|per;
    }
  //删除用户的权限(1个或者多个)
    public void disable(int per) {
        flag = flag&~per;
    }
    //判断用户的权限
    public boolean isAllow(int per) {
        return ((flag&per)== per);
    }
    //判断用户没有的权限
    public boolean isNotAllow(int per) {
        return ((flag&per)==0);
    }
  
    public static void main(String[] args) {
        int flag = 15;
        Permission permission = new Permission();
        permission.setPer(flag);
        permission.disable(ALLOW_DELETE|ALLOW_INSERT);
        System.out.println("select = "+permission.isAllow(ALLOW_SELECT));
        System.out.println("update = "+permission.isAllow(ALLOW_UPDATE));
        System.out.println("insert = "+permission.isAllow(ALLOW_INSERT));
        System.out.println("delete = "+permission.isAllow(ALLOW_DELETE));
        
        
    }
}

2 ConcurrentHashMap 在JDK1.7中原理和实现

2.1 数据结构

image.png

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment实际继承自可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,每个Segment里包含一个HashEntry数组,我们称之为table,每个HashEntry是一个链表结构的元素。

2.2 初始化

初始化有三个参数

2.3 在get和put操作中,是如何快速定位元素放在哪个位置的?

对于某个元素而言,一定是放在某个segment元素的某个table元素中的,所以在定位上,
定位segment:取得key的hashcode值进行一次再散列(通过Wang/Jenkins算法),拿到再散列值后,以再散列值的高位进行取模得到当前元素在哪个segment上。

image.png
image.png
定位table:同样是取得key的再散列值以后,用再散列值的全部和table的长度进行取模,得到当前元素在table的哪个元素上。
image.png

2.4 get()方法

定位segment和定位table后,依次扫描这个table元素下的的链表,要么找到元素,要么返回null。
在高并发下的情况下如何保证取得的元素是最新的?
答:用于存储键值对数据的HashEntry,在设计上它的成员变量value等都是volatile类型的,这样就保证别的线程对value值的修改,get方法可以马上看到。

image.png

2.5 put()方法

2.6 扩容操作

Segment 不扩容,扩容下面的table数组,每次都是将数组翻倍

image.png
带来的好处
假设原来table长度为4,那么元素在table中的分布是这样的:
Hash值 15 23 34 56 77
在table中下标 3 = 15%4 3 = 23 % 4 2 = 34%4 0 = 56%4 1 = 77 % 4

扩容后table长度变为8,那么元素在table中的分布变成:

Hash值 56 34 77 15,32
在table中下标 0 1 2 3 4 5 6 7

可以看见 hash值为34和56的下标保持不变,而15,23,77的下标都是在原来下标的基础上+4即可,可以快速定位和减少重排次数。

2.7 size()方法

size的时候进行两次不加锁的统计,两次一致直接返回结果,不一致,重新加锁再次统计

2.8 弱一致性

get方法和containsKey方法都是通过对链表遍历判断是否存在key相同的节点以及获得该节点的value。但由于遍历过程中其他线程可能对链表结构做了调整,因此get和containsKey返回的可能是过时的数据,这一点是ConcurrentHashMap在弱一致性上的体现。

3 ConcurrentHashMap 在JDK1.8中原理和实现

3.1 数据结构

image.png
与1.7相比的重大变化

3.2 初始化

只是给成员变量赋值,put时进行实际数组的填充

3.3 在get和put操作中,是如何快速定位元素放在哪个位置的?

image.png
image.png

3.4 get()方法

image.png

3.5 put()方法

数组的实际初始化


image.png
image.png
image.png
image.png

3.6 扩容操作

transfer()方法进行实际的扩容操作,table大小也是翻倍的形式,有一个并发扩容的机制。

3.7 size()方法

估计的大概数量,不是精确数量

3.8 一致性

弱一致

4 更多的并发容器

ConcurrentSkipListMap 和 ConcurrentSkipListSet

4.1 跳表(SkipList)

image.png

SkipList,以空间换时间,在原链表的基础上形成多层索引,但是某个节点在插入时,是否成为索引,随机决定,所以跳表又称为概率数据结构。

4.2 写时复制容器

CopyOnWriteArrayList

CopyOnWriteArraySet
写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以写时复制容器也是一种读写分离的思想,读和写不同的容器。如果读的时候有多个线程正在向容器添加数据,读还是会读到旧的数据,因为写的时候不会锁住旧的,只能保证最终一致性。
适用读多写少的并发场景,常见应用:白名单/黑名单, 商品类目的访问和更新场景。
存在内存占用问题。

5 阻塞队列

5.1 生产者消费者模式

1)当队列满的时候,插入元素的线程被阻塞,直到队列不满。
2)队列为空的时候,获取元素的线程被阻塞,直到队列不空。

生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式。生产者和消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而是通过阻塞队列来进行通信,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

常用方法

方法 抛出异常 返回值 一直阻塞 超时退出
插入方法 add offer put Offer(time)
移除方法 remove poll take Poll(time)
检查方法 element peek N/A N/A

5.2 常用阻塞队列

ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
按照先进先出原则,要求设定初始大小
LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
按照先进先出原则,可以不设定初始大小,Integer.Max_Value
ArrayBlockingQueue和LinkedBlockingQueue不同:

PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
默认情况下,按照自然顺序,要么实现compareTo()方法,指定构造参数Comparator
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
支持延时获取的元素的阻塞队列,元素必须要实现Delayed接口。适用场景:实现自己的缓存系统,订单到期,限时支付等等。
延时队列示例:取出到期订单

/**
 *
 *类说明:订单的实体类
 */
public class Order {
    private final String orderNo;//订单的编号
    private final double orderMoney;//订单的金额
    
    public Order(String orderNo, double orderMoney) {
        super();
        this.orderNo = orderNo;
        this.orderMoney = orderMoney;
    }

    public String getOrderNo() {
        return orderNo;
    }

    public double getOrderMoney() {
        return orderMoney;
    }

}
/**
 *
 *类说明:存放到队列的元素
 */
public class ItemVo<T> implements Delayed{
    
    private long activeTime;//到期时间,单位毫秒
    private T date;
    
    //activeTime是个过期时长
    public ItemVo(long activeTime, T date) {
        super();
        this.activeTime = TimeUnit.NANOSECONDS.convert(activeTime, 
                TimeUnit.MILLISECONDS)+System.nanoTime();//将传入的时长转换为超时的时刻
        this.date = date;
    }
    
    public long getActiveTime() {
        return activeTime;
    }

    public T getDate() {
        return date;
    }

    //按照剩余时间排序
    @Override
    public int compareTo(Delayed o) {
        long d = getDelay(TimeUnit.NANOSECONDS)-o.getDelay(TimeUnit.NANOSECONDS);
        return (d==0)?0:((d>0)?1:-1);
    }

    //返回元素的剩余时间
    @Override
    public long getDelay(TimeUnit unit) {
        long d = unit.convert(this.activeTime-System.nanoTime(),
                TimeUnit.NANOSECONDS);
        return d;
    }
    
}
/**
 *
 *类说明:将订单放入队列
 */
public class PutOrder implements Runnable {
    
    private DelayQueue<ItemVo<Order>> queue;
    
    public PutOrder(DelayQueue<ItemVo<Order>> queue) {
        super();
        this.queue = queue;
    }

    @Override
    public void run() {
        
        //5秒到期
        Order ordeTb = new Order("Tb12345",366);
        ItemVo<Order> itemTb = new ItemVo<Order>(5000,ordeTb);
        queue.offer(itemTb);
        System.out.println("订单5秒后到期:"+ordeTb.getOrderNo());
        
        //8秒到期
        Order ordeJd = new Order("Jd54321",366);
        ItemVo<Order> itemJd = new ItemVo<Order>(8000,ordeJd);
        queue.offer(itemJd);
        System.out.println("订单8秒后到期:"+ordeJd.getOrderNo());     
        
    }   
}

/**
 *
 *类说明:取出到期订单的功能
 */
public class FetchOrder implements Runnable {
    
    private DelayQueue<ItemVo<Order>> queue;
    
    public FetchOrder(DelayQueue<ItemVo<Order>> queue) {
        super();
        this.queue = queue;
    }   

    @Override
    public void run() {
        while(true) {
            try {
                ItemVo<Order> item = queue.take();
                Order order = (Order)item.getDate();
                System.out.println("get from queue:"+order.getOrderNo());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }   
}
/**
 *
 *类说明:测试延时订单
 */
public class Test {
    public static void main(String[] args) throws InterruptedException {
        
        DelayQueue<ItemVo<Order>> queue = new DelayQueue<>();
        
        new Thread(new PutOrder(queue)).start();
        new Thread(new FetchOrder(queue)).start();

        //每隔500毫秒,打印个数字
        for(int i=1;i<15;i++){
            Thread.sleep(500);
            System.out.println(i*500);
        }
    }
}

订单5秒后到期:Tb12345
订单8秒后到期:Jd54321
500
1000
1500
2000
2500
3000
3500
4000
4500
get from queue:Tb12345
5000
5500
6000
6500
7000
get from queue:Jd54321

SynchronousQueue:一个不存储元素的阻塞队列。
每一个put操作都要等待一个take操作
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
transfer(),必须要消费者消费了以后方法才会返回,tryTransfer()无论消费者是否接收,方法都立即返回。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
可以从队列的头和尾都可以插入和移除元素,实现工作密取,方法名带了First对头部操作,带了last从尾部操作,另外:add=addLast; remove=removeFirst; take=takeFirst

阻塞队列的实现原理(待补充)
比如,ArrayBlockingQueue就是基于Lock和Condition实现的。

上一篇 下一篇

猜你喜欢

热点阅读