文章目录
- 1 概述
- 2 主要属性及内部类
- 3 构造函数
- 3.1 无参
- 3.2 带参
- 3.3 tableSizeFor()
- 4 get()-获取值
- 4.1 get()主方法
- 4.2 pread()方法
- 4.3 Node hash值
- 4.4 Node find()方法
- 4.4.1 单链表结点
- 4.4.2 ForwardingNode
- 4.4.3 TreeBin
- 4.4.4 TreeNode
- 5 put()-放入键值对
- 5.1 putVal()-放入键值对
- 5.2 initTable()-初始化哈希表
- 5.3 helpTransfer()-帮忙扩容
- 后记
1 概述
这里讲解的版本为jdk8,jdk7版本后面讲解。
关于ConcurrentHashMap如何减少hash冲突?如何提高并发?
在并发访问控制方面是否加锁?加了那些琐?为何选择这些锁?
下面我们一步步讲解ConcurrentHashMap相关知识,同时在最后会对这些问题进行总结。
2 主要属性及内部类
// 默认0(无参构造)
// 初始化时,为-1
// 扩容时,为-?
// 当初始化或扩容完成后,为下一次扩容的阈值大小(数组容量的3/4)
// jdk8位懒惰初始化,使用带参构造函数,此时table为空,sizeCtl为要初始化table的容量
private transient volatile int sizeCtl;
// 整个ConcurrentHashMap是Node[]
static class Node<K,V> implements Map.Entry<K,V> {...}
// hash表
transient volatile Node<K,V>[] table;
// 在扩容时创建的新的table
private transient volatile Node<K,V>[] nextTable;
// 扩容时如果某个table的bin迁移完成,用ForwardingNode作为旧table bin的头结点
static final class ForwardingNode<K,V> extends Node<K,V> {}
// 用在computer及computerIfAbsent时,用来占位,计算完成后替换为普通的Node
static final class ReservationNode<K,V> extends Node<K,V> {}
// 作为treeBin的头结点,存储root和first。root指向红黑树根结点,first指向双向链表的头结点
static final class TreeBin<K,V> extends Node<K,V> {}
// treeBin的结点,存储key,value,parent,left,right及red
static final class TreeNode<K,V> extends Node<K,V> {}
关于TreeBin及在构建时情况在下面4中详细说明。
3 构造函数
ConcurrentHashMap为懒惰初始化,在首次使用之前,并不会创建hash表,我们来看下构造函数。
3.1 无参
源代码:
public ConcurrentHashMap() {
}
啥也没做。
3.2 带参
我们看下参数最多的一个构造函数,源代码如下:
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
- 形参
- initialCapacity:初始化hash表(数组)容量
- loadFactor:负载因子,默认0.75f(3/4),判断是否扩容依据
- concurrencyLevel:预估并发修改线程数,默认1
- 判断如果负载因子loadFactor <=0.0f 或者初始容量<0或者concurrencyLevel <= 0,抛异常
- 判断如果初始容量<concurrencyLevel,初始容量置为concurrencyLevel。
- 计算hash数组的初始容量,tableSizeFor()用于确保容量为大于等于size的最小2的n次幂
3.3 tableSizeFor()
我们开看下该方法如何保证容量为大于等于size的最小2的n次幂,源代码如下:
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
- 参数c是正整数
- c为0的情况(除了做测试,谁会设置0呢❓)
- int为32位,c为正整数,最高位为0,假设此时c最高位1位置为 m , 0 < m ≤ 30 m,0\lt m\le30 m,0<m≤30
-
n
∣
=
n
>
>
>
1
n |= n >>> 1
n∣=n>>>1保证第
m
−
1
m-1
m−1位为1,
n
∣
=
n
>
>
>
2
n |= n >>> 2
n∣=n>>>2 保证
m
−
2
,
m
−
3
m-2,m-3
m−2,m−3位为1,以此类推,从最高位m到末尾都为1即值为
2
m
+
1
−
1
2^{m+1}-1
2m+1−1,此时
n
≥
c
n\ge c
n≥c。
- 经过 n ∣ = n > > > 16 n |= n >>> 16 n∣=n>>>16运算后,即使c最高位1在第31位,也能保证后续位都为1. 1 + 2 + 4 + 8 + 16 = 31 1+2+4+8+16=31 1+2+4+8+16=31
- (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1 运算后,一般n的大小不会大于最大容量 2 30 2^{30} 230,那么结果为 n + 1 n+1 n+1此时一定就是 2 m ≤ 最终结果 ≤ 2 m + 1 2^m\le 最终结果\le 2^{m+1} 2m≤最终结果≤2m+1
注:
- 为啥计算没使用四则运算呢?四则运算相对于我们比较容易理解,但是真正执行四则运算基于位运算,位运算效率高。如果情况允许的话,建议整数运算尽量使用位运算。比如正整数 n 除以 2 n除以2 n除以2,可以写成 n > > 1 n>>1 n>>1等
4 get()-获取值
4.1 get()主方法
get()方法没有加锁,源代码如下:
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
主要执流程:
- 计算key的hash值
- spread()方法如果进行hash计算,减少冲突的呢?
- & HASH_BITS这不操作干嘛的?
- &&顺序判断3个条件是否满足
- 判断条件
- 第一个条件判断table是否不为空
- 第二个条件判断hash表长度是否大于0
- 第三个条件判断hash分配桶下标对应的头结点是不为空
- 判断头结点e 的hash值eh是否等于目标hash值h
- 判断e的key是否等于目标key或者equals()判断是否相等
- 命中目标,返回e对应的value
- 判断e的key是否等于目标key或者equals()判断是否相等
- 否则判断eh是否小于0
- 通过e.find()方法查找目标
- eh小于0有几种情况?e.find()执行那些操作呢?
- 不符合上述条件,说明该桶下标为单链表结构且头结点不是目标结点
- while循环单链表查找目标元素
- 判断条件
- 任一条件不满足,直接返回null
4.2 pread()方法
源码如下:
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
static final int HASH_BITS = 0x7fffffff;
ConcurrentHashMap通过hash值确定桶下标的代码为:(n - 1) & h
- n-1为hash表(数组)长度-1,即为最大索引
- h就是hash值
hash数组的长度一般情况下不会很大,即使为默认长度16的情况下,理论上只要内存足够,可以存储无限个元素
- ConcurrentHashMap底层为数组+链表(红黑树)
(n - 1) & h结果只会是hash对应二进制低位参与,这样增加hash冲突的概率。通过h ^ (h >>> 16)运算, 结果hash高16位保持不变,低16位为原高16位和原低16位异或运算的值。在进行(n - 1) & h运算时,原hash的高16位也参与运算,减少了hash冲突概率。
- 可以通过实验来验证,但是精确的理论(数学)证明暂时不知道。
& HASH_BITS运算,int为32位,保证最高位结果为0即最终的hash值为正数,负数hash有特定作用,下面说明。
4.3 Node hash值
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
hash值类型(分类):
- 非负数:表示该结点为单链表结点
- MOVED(-1):结点为FowardingNode,表明当前ConcurrentHashMap正在扩容,FowardingNode结点所在的桶下标数据迁移完成。
- TREEBIN(-2):结点为TreeBin结点,当前桶下标结构为红黑树
- RESERVED(-3):临时ReservationNode结点,用在computer及computerIfAbsent时,用来占位,计算完成后替换为普通的Node
4.4 Node find()方法
不同的Node结点类型重写了find()方法,目的就是查找(hash,key)对应的目标结点。
4.4.1 单链表结点
Node普通链表结点, find()通过遍历单链表查找目标,源代码如下:
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
4.4.2 ForwardingNode
结点为ForwardingNode,说明元素迁移到新的hash表中。ForwardingNode里面保存有新hash表地址,find()去新的hash表中查找目标结点,源代码如下:
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
问题:
- 为啥新的hash表中也可能存在ForwardingNode呢(新的hash表也在扩容)?
4.4.3 TreeBin
源代码如下:
final Node<K,V> find(int h, Object k) {
if (k != null) {
for (Node<K,V> e = first; e != null; ) {
int s; K ek;
if (((s = lockState) & (WAITER|WRITER)) != 0) {
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
e = e.next;
}
else if (U.compareAndSwapInt(this, LOCKSTATE, s,
s + READER)) {
TreeNode<K,V> r, p;
try {
p = ((r = root) == null ? null :
r.findTreeNode(h, k, null));
} finally {
Thread w;
if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
(READER|WAITER) && (w = waiter) != null)
LockSupport.unpark(w);
}
return p;
}
}
}
return null;
}
主要执行流程如下:
- 判断k不等于null
- for循环遍历treeBin
- 判断如果锁状态为WAITER或者WRITER
- 遍历双向链表查找目标
- 否则就是无锁或者读锁,cas改变读锁计数
- try…finally,try代码块如果root为空,返回null;不为空,调用treeNode.findTreeNode()查找目标结点
- finally代码块cas获取锁计数,读锁计数-1,判断读锁计数是否等于6(READER|WAITER)且等待线程标识waiter不为空,则唤醒等待线程
- 返回目标结点p
- 判断如果锁状态为WAITER或者WRITER
- for循环遍历treeBin
- 返回null
注释:
- 关于treeBin读是共享的,读写互斥;
- 对于treeBin写操作,会先通过syncronized加锁,保证同一时间,只有一个线程执行写操作。
- 如果treeBin读线程先cas加读锁,此时写操作线程cas加写锁失败,修改锁状态为(s | WAITER),然后LockSupport.park(this);等读线程执行完操作,通过判断锁计数是否等于6且等待线程标识waiter不为空,则唤醒等待线程
- 为6表示最后一个读线程执行完毕且有写线程阻塞
- 关于写线程加锁操作,放在4TreeBin中详解
4.4.4 TreeNode
该方法是在红黑树中查找目标结点的实际执行操作,源代码如下:
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
if (k != null) {
TreeNode<K,V> p = this;
do {
int ph, dir; K pk; TreeNode<K,V> q;
TreeNode<K,V> pl = p.left, pr = p.right;
if ((ph = p.hash) > h)
p = pl;
else if (ph < h)
p = pr;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
return p;
else if (pl == null)
p = pr;
else if (pr == null)
p = pl;
else if ((kc != null ||
(kc = comparableClassFor(k)) != null) &&
(dir = compareComparables(kc, k, pk)) != 0)
p = (dir < 0) ? pl : pr;
else if ((q = pr.findTreeNode(h, k, kc)) != null)
return q;
else
p = pl;
} while (p != null);
}
return null;
}
}
执行流程如下:
- 从当前结点(子树根结点)开始遍历红黑树。
- 判断当前结点hash值小于目标hash,继续遍历右子树
- 否则判断当前结点hash值大于目标hash,继续遍历左子树
- 否则判断当前结点key-pk是否等于目标key-k或者pk != null && k.equals(pk))
- 命中目标,返回当前结点p
- 此时表明当前结点hash等于目标hash
- 否则判断左子树为空,继续遍历右子树
- 否则判断右子树为空,继续遍历左子树
- 否则同Comparable.compareTo()比较当前结点的pk与目标k直接大小,决定是继续遍历左子树还是右子树
- 否则右子树递归查找目标结点q,不为空返回
- 否则继续遍历左子树
- 没找到返回null
注:
- 查找逻辑可以参考后面的put逻辑,一一对应
5 put()-放入键值对
public V put(K key, V value) {
return putVal(key, value, false);
}
5.1 putVal()-放入键值对
源代码如下:
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
执行流程如下:
- 如果key为空或者value为空,抛空指针异常
- spread()计算key的hash;binCount记录hash数组下标对应的元素个数
- for循环遍历hash表(数组)
- 如果hash表tab为空或者长度为0,执行initTable()初始化哈希表
- 否则判断hash运算后数组下标i处对应的结点f是否为空
- cas方式在该位置以给定key,value设置新的Node结点
- 设置成功,break跳出循环
- 否则判断当前下标首结点的hash fh 是否等于Moved(-1)
- 调用helpTransfer()帮忙扩容
- 否则
- syncronized对当前下标i首结点加锁
- tabAt()判断当前下标i出结点是否等于f
- 判断首结点hash fh是否大于等于0
- 是说明该下标对应链表
- binCount设置为1
- for循环遍历链表
- 判断结点hash和key是否和给定的参数hash和key相等
- 根据给定参数onlyIfAbsent决定是否替换旧值
- 如果到尾结点未命中,则在尾结点后链入新结点
- 判断首结点是否是TreeBin结点
- binCount置为2
- 调用TreeBin的putTreeVal()添加新的红黑树结点
- 判断首结点hash fh是否大于等于0
- tabAt()判断当前下标i出结点是否等于f
- 判断下标i下结点计数binCount不为0
- 判断如果binCount大于等于转树形阈值8
- 调用treeifyBin()将链表转为红黑树
- break结束循环
- 判断如果binCount大于等于转树形阈值8
- syncronized对当前下标i首结点加锁
- addCount()计算size大小
- return null
注:
- ConcurrentHashMap的key和value都不能为空
- 一般情况下我们不关心(用不到)put()的返回值
问题:
- 当单链表元素有几个的时候会转换为红黑树?(提示阈值是8)
5.2 initTable()-初始化哈希表
ConcurrentHashMap是懒惰初始化的,默认不初始化hash数组,当第一次使用的时候才初始化,通过上面源码可知,源代码如下:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
执行流程如下:
- while循环判断tab为空或者tab长度为0:未完成初始化
- 判断(sc=sizectl)小于0,说明有线程通过cas方式把sizeCtl置为-1
- 调用Thread.yield()、
- 否则通过cas把sizeCtl由sc置为-1,开始扩容
- 判断(sc=sizectl)小于0,说明有线程通过cas方式把sizeCtl置为-1
- 返回新创建的tab数组
注:
- 调用initTable()的时候并未加锁,initTable()内部通过while循环+cas的方式实现并发访问控制
5.3 helpTransfer()-帮忙扩容
源代码如下:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
执行流程:
- 判断tab不为空且结点为ForwardingNode且f链接的新的hash数组不为空
- resizeStamp(tab.length)根据数组长度计算一个戳rs
- while循环判断nextTab == nextTable 且table == tab且(sc = sizeCtl)小于0
- 判断sc无符号右移16位不等于rs或者sc等于rs+1或者sc等于rs+最大帮忙扩容线程数或者transferIndex小于等于0
- break
- 判断cas设置sizeCtl为sc+1为true
- transfer()执行数据迁移
- break
- 判断sc无符号右移16位不等于rs或者sc等于rs+1或者sc等于rs+最大帮忙扩容线程数或者transferIndex小于等于0
- 返回nextTab
- 返回 table
注:
- (sc=sizeCtl)<0表示当前有线程在调整ConcurrentHashMap大小
- (sc >>> RESIZE_STAMP_SHIFT) != rs 为啥sc要无符号右移16位以及transferIndex什么作用,这个在后面transfer()讲解。
后记
如有问题,欢迎交流讨论。
❓QQ:806797785
⭐️源代码仓库地址:https://gitee.com/gaogzhen/concurrent
参考:
[1]黑马程序员.黑马程序员深入学习Java并发编程,JUC并发编程全套教程[CP/OL].2020-01-18/2022-12-12.p281~p289.