我们知道哈希表是一种非常高效的数据结构,设计优良的哈希函数可以使其上的增删改查操作达到o(1)级别。java为我们提供了一个现成的哈希结构,那就是hashmap类,在前面的文章中我曾经介绍过hashmap类,知道它的所有方法都未进行同步,因此在多线程环境中是不安全的。为此,java为我们提供了另外一个hashtable类,它对于多线程同步的处理非常简单粗暴,那就是在hashmap的基础上对其所有方法都使用synchronized关键字进行加锁。这种方法虽然简单,但导致了一个问题,那就是在同一时间内只能由一个线程去操作哈希表。即使这些线程都只是进行读操作也必须要排队,这在竞争激烈的多线程环境中极为影响性能。本篇介绍的concurrenthashmap就是为了解决这个问题的,它的内部使用分段锁将锁进行细粒度化,从而使得多个线程能够同时操作哈希表,这样极大的提高了性能。下图是其内部结构的示意图。
1. concurrenthashmap有哪些成员变量?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
//默认初始化容量 static final int default_initial_capacity = 16 ; //默认加载因子 static final float default_load_factor = 0 .75f; //默认并发级别 static final int default_concurrency_level = 16 ; //集合最大容量 static final int maximum_capacity = 1 << 30 ; //分段锁的最小数量 static final int min_segment_table_capacity = 2 ; //分段锁的最大数量 static final int max_segments = 1 << 16 ; //加锁前的重试次数 static final int retries_before_lock = 2 ; //分段锁的掩码值 final int segmentmask; //分段锁的移位值 final int segmentshift; //分段锁数组 final segment<k,v>[] segments; |
在阅读完本篇文章之前,相信读者不能理解这些成员变量的具体含义和作用,不过请读者们耐心看下去,后面将会在具体场景中一一介绍到这些成员变量的作用。在这里读者只需对这些成员变量留个眼熟即可。但是仍有个别变量是我们现在需要了解的,例如segment数组代表分段锁集合,并发级别则代表分段锁的数量(也意味有多少线程可以同时操作),初始化容量代表整个容器的容量,加载因子代表容器元素可以达到多满的一种程度。
2. 分段锁的内部结构是怎样的?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
//分段锁 static final class segment<k,v> extends reentrantlock implements serializable { //自旋最大次数 static final int max_scan_retries = runtime.getruntime().availableprocessors() > 1 ? 64 : 1 ; //哈希表 transient volatile hashentry<k,v>[] table; //元素总数 transient int count; //修改次数 transient int modcount; //元素阀值 transient int threshold; //加载因子 final float loadfactor; //省略以下内容 ... } |
segment是concurrenthashmap的静态内部类,可以看到它继承自reentrantlock,因此它在本质上是一个锁。它在内部持有一个hashentry数组(哈希表),并且保证所有对该数组的增删改查方法都是线程安全的,具体是怎样实现的后面会讲到。所有对concurrenthashmap的增删改查操作都可以委托segment来进行,因此concurrenthashmap能够保证在多线程环境下是安全的。又因为不同的segment是不同的锁,所以多线程可以同时操作不同的segment,也就意味着多线程可以同时操作concurrenthashmap,这样就能避免hashtable的缺陷,从而极大的提高性能。
3. concurrenthashmap初始化时做了些什么?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
//核心构造器 @suppresswarnings ( "unchecked" ) public concurrenthashmap( int initialcapacity, float loadfactor, int concurrencylevel) { if (!(loadfactor > 0 ) || initialcapacity < 0 || concurrencylevel <= 0 ) { throw new illegalargumentexception(); } //确保并发级别不大于限定值 if (concurrencylevel > max_segments) { concurrencylevel = max_segments; } int sshift = 0 ; int ssize = 1 ; //保证ssize为2的幂, 且是最接近的大于等于并发级别的数 while (ssize < concurrencylevel) { ++sshift; ssize <<= 1 ; } //计算分段锁的移位值 this .segmentshift = 32 - sshift; //计算分段锁的掩码值 this .segmentmask = ssize - 1 ; //总的初始容量不能大于限定值 if (initialcapacity > maximum_capacity) { initialcapacity = maximum_capacity; } //获取每个分段锁的初始容量 int c = initialcapacity / ssize; //分段锁容量总和不小于初始总容量 if (c * ssize < initialcapacity) { ++c; } int cap = min_segment_table_capacity; //保证cap为2的幂, 且是最接近的大于等于c的数 while (cap < c) { cap <<= 1 ; } //新建一个segment对象模版 segment<k,v> s0 = new segment<k,v>(loadfactor, ( int )(cap * loadfactor), (hashentry<k,v>[]) new hashentry[cap]); //新建指定大小的分段锁数组 segment<k,v>[] ss = (segment<k,v>[]) new segment[ssize]; //使用unsafe给数组第0个元素赋值 unsafe.putorderedobject(ss, sbase, s0); this .segments = ss; } |
concurrenthashmap有多个构造器,但是上面贴出的是它的核心构造器,其他构造器都通过调用它来完成初始化。核心构造器需要传入三个参数,分别是初始容量,加载因子和并发级别。在前面介绍成员变量时我们可以知道默认的初始容量为16,加载因子为0.75f,并发级别为16。现在我们看到核心构造器的代码,首先是通过传入的concurrencylevel来计算出ssize,ssize是segment数组的长度,它必须保证是2的幂,这样就可以通过hash&ssize-1来计算分段锁在数组中的下标。由于传入的concurrencylevel不能保证是2的幂,所以不能直接用它来当作segment数组的长度,因此我们要找到一个最接近concurrencylevel的2的幂,用它来作为数组的长度。假如现在传入的concurrencylevel=15,通过上面代码可以计算出ssize=16,sshift=4。接下来立马可以算出segmentshift=16,segmentmask=15。注意这里的segmentshift是分段锁的移位值,segmentmask是分段锁的掩码值,这两个值是用来计算分段锁在数组中的下标,在下面我们会讲到。在算出分段锁的个数ssize之后,就可以根据传入的总容量来计算每个分段锁的容量,它的值c = initialcapacity / ssize。分段锁的容量也就是hashentry数组的长度,同样也必须保证是2的幂,而上面算出的c的值不能保证这一点,所以不能直接用c作为hashentry数组的长度,需要另外找到一个最接近c的2的幂,将这个值赋给cap,然后用cap来作为hashentry数组的长度。现在我们有了ssize和cap,就可以新建分段锁数组segment[]和元素数组hashentry[]了。注意,与jdk1.6不同是的,在jdk1.7中只新建了segment数组,并没有对它初始化,初始化segment的操作留到了插入操作时进行。
4. 通过怎样的方式来定位锁和定位元素?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
//根据哈希码获取分段锁 @suppresswarnings ( "unchecked" ) private segment<k,v> segmentforhash( int h) { long u = (((h >>> segmentshift) & segmentmask) << sshift) + sbase; return (segment<k,v>) unsafe.getobjectvolatile(segments, u); } //根据哈希码获取元素 @suppresswarnings ( "unchecked" ) static final <k,v> hashentry<k,v> entryforhash(segment<k,v> seg, int h) { hashentry<k,v>[] tab; return (seg == null || (tab = seg.table) == null ) ? null : (hashentry<k,v>) unsafe.getobjectvolatile(tab, (( long )(((tab.length - 1 ) & h)) << tshift) + tbase); } |
在jdk1.7中是通过unsafe来获取数组元素的,因此这里比jdk1.6多了些计算数组元素偏移量的代码,这些代码我们暂时不关注,现在我们只需知道下面这两点:
a. 通过哈希码计算分段锁在数组中的下标:(h >>> segmentshift) & segmentmask。
b. 通过哈希码计算元素在数组中的下标:(tab.length - 1) & h。
现在我们假设传给构造器的两个参数为initialcapacity=128, concurrencylevel=16。根据计算可以得到ssize=16, sshift=4,segmentshift=28,segmentmask=15。同样,算得每个分段锁内的hashentry数组的长度为8,所以tab.length-1=7。根据这些值,我们通过下图来解释如何根据同一个哈希码来定位分段锁和元素。
可以看到分段锁和元素的定位都是通过元素的哈希码来决定的。定位分段锁是取哈希码的高位值(从32位处取起),定位元素是取的哈希码的低位值。现在有个问题,它们一个从32位的左端取起,一个从32位的右端取起,那么会在某个时刻产生冲突吗?我们在成员变量里可以找到maximum_capacity = 1 << 30,max_segments = 1 << 16,这说明定位分段锁和定位元素使用的总的位数不超过30,并且定位分段锁使用的位数不超过16,所以至少还隔着2位的空余,因此是不会产生冲突的。
5. 查找元素具体是怎样实现的?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
//根据key获取value public v get(object key) { segment<k,v> s; hashentry<k,v>[] tab; //使用哈希函数计算哈希码 int h = hash(key); //根据哈希码计算分段锁的索引 long u = (((h >>> segmentshift) & segmentmask) << sshift) + sbase; //获取分段锁和对应的哈希表 if ((s = (segment<k,v>)unsafe.getobjectvolatile(segments, u)) != null && (tab = s.table) != null ) { //根据哈希码获取链表头结点, 再对链表进行遍历 for (hashentry<k,v> e = (hashentry<k,v>) unsafe.getobjectvolatile (tab, (( long )(((tab.length - 1 ) & h)) << tshift) + tbase); e != null ; e = e.next) { k k; //根据key和hash找到对应元素后返回value值 if ((k = e.key) == key || (e.hash == h && key.equals(k))) { return e.value; } } } return null ; } |
在jdk1.6中分段锁的get方法是通过下标来访问数组元素的,而在jdk1.7中是通过unsafe的getobjectvolatile方法来读取数组中的元素。为啥要这样做?我们知道虽然segment对象持有的hashentry数组引用是volatile类型的,但是数组内的元素引用不是volatile类型的,因此多线程对数组元素的修改是不安全的,可能会在数组中读取到尚未构造完成的对象。在jdk1.6中是通过第二次加锁读取来保证安全的,而jdk1.7中通过unsafe的getobjectvolatile方法来读取同样也是为了保证这一点。使用getobjectvolatile方法读取数组元素需要先获得元素在数组中的偏移量,在这里根据哈希码计算得到分段锁在数组中的偏移量为u,然后通过偏移量u来尝试读取分段锁。由于分段锁数组在构造时没进行初始化,因此可能读出来一个空值,所以需要先进行判断。在确定分段锁和它内部的哈希表都不为空之后,再通过哈希码读取hashentry数组的元素,根据上面的结构图可以看到,这时获得的是链表的头结点。之后再从头到尾的对链表进行遍历查找,如果找到对应的值就将其返回,否则就返回null。以上就是整个查找元素的过程。
6. 插入元素具体是怎样实现的?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
//向集合添加键值对(若存在则替换) @suppresswarnings ( "unchecked" ) public v put(k key, v value) { segment<k,v> s; //传入的value不能为空 if (value == null ) throw new nullpointerexception(); //使用哈希函数计算哈希码 int hash = hash(key); //根据哈希码计算分段锁的下标 int j = (hash >>> segmentshift) & segmentmask; //根据下标去尝试获取分段锁 if ((s = (segment<k,v>)unsafe.getobject(segments, (j << sshift) + sbase)) == null ) { //获得的分段锁为空就去构造一个 s = ensuresegment(j); } //调用分段锁的put方法 return s.put(key, hash, value, false ); } //向集合添加键值对(不存在才添加) @suppresswarnings ( "unchecked" ) public v putifabsent(k key, v value) { segment<k,v> s; //传入的value不能为空 if (value == null ) throw new nullpointerexception(); //使用哈希函数计算哈希码 int hash = hash(key); //根据哈希码计算分段锁的下标 int j = (hash >>> segmentshift) & segmentmask; //根据下标去尝试获取分段锁 if ((s = (segment<k,v>)unsafe.getobject(segments, (j << sshift) + sbase)) == null ) { //获得的分段锁为空就去构造一个 s = ensuresegment(j); } //调用分段锁的put方法 return s.put(key, hash, value, true ); } |
concurrenthashmap中有两个添加键值对的方法,通过put方法添加时如果存在则会进行覆盖,通过putifabsent方法添加时如果存在则不进行覆盖,这两个方法都是调用分段锁的put方法来完成操作,只是传入的最后一个参数不同而已。在上面代码中我们可以看到首先是根据key的哈希码来计算出分段锁在数组中的下标,然后根据下标使用unsafe类getobject方法来读取分段锁。由于在构造concurrenthashmap时没有对segment数组中的元素初始化,所以可能读到一个空值,这时会先通过ensuresegment方法新建一个分段锁。获取到分段锁之后再调用它的put方法完成添加操作,下面我们来看看具体是怎样操作的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
//添加键值对 final v put(k key, int hash, v value, boolean onlyifabsent) { //尝试获取锁, 若失败则进行自旋 hashentry<k,v> node = trylock() ? null : scanandlockforput(key, hash, value); v oldvalue; try { hashentry<k,v>[] tab = table; //计算元素在数组中的下标 int index = (tab.length - 1 ) & hash; //根据下标获取链表头结点 hashentry<k,v> first = entryat(tab, index); for (hashentry<k,v> e = first;;) { //遍历链表寻找该元素, 找到则进行替换 if (e != null ) { k k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldvalue = e.value; //根据参数决定是否替换旧值 if (!onlyifabsent) { e.value = value; ++modcount; } break ; } e = e.next; //没找到则在链表添加一个结点 } else { //将node结点插入链表头部 if (node != null ) { node.setnext(first); } else { node = new hashentry<k,v>(hash, key, value, first); } //插入结点后将元素总是加1 int c = count + 1 ; //元素超过阀值则进行扩容 if (c > threshold && tab.length < maximum_capacity) { rehash(node); //否则就将哈希表指定下标替换为node结点 } else { setentryat(tab, index, node); } ++modcount; count = c; oldvalue = null ; break ; } } } finally { unlock(); } return oldvalue; } |
为保证线程安全,分段锁中的put操作是需要进行加锁的,所以线程一开始就会去获取锁,如果获取成功就继续执行,若获取失败则调用scanandlockforput方法进行自旋,在自旋过程中会先去扫描哈希表去查找指定的key,如果key不存在就会新建一个hashentry返回,这样在获取到锁之后就不必再去新建了,为的是在等待锁的过程中顺便做些事情,不至于白白浪费时间,可见作者的良苦用心。具体自旋方法我们后面再细讲,现在先把关注点拉回来,线程在成功获取到锁之后会根据计算到的下标,获取指定下标的元素。此时获取到的是链表的头结点,如果头结点不为空就对链表进行遍历查找,找到之后再根据onlyifabsent参数的值决定是否进行替换。如果遍历没找到就会新建一个hashentry指向头结点,此时如果自旋时创建了hashentry,则直接将它的next指向当前头结点,如果自旋时没有创建就在这里新建一个hashentry并指向头结点。在向链表添加元素之后检查元素总数是否超过阀值,如果超过就调用rehash进行扩容,没超过的话就直接将数组对应下标的元素引用指向新添加的node。setentryat方法内部是通过调用unsafe的putorderedobject方法来更改数组元素引用的,这样就保证了其他线程在读取时可以读到最新的值。
7. 删除元素具体是怎样实现的?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
//删除指定元素(找到对应元素后直接删除) public v remove(object key) { //使用哈希函数计算哈希码 int hash = hash(key); //根据哈希码获取分段锁的索引 segment<k,v> s = segmentforhash(hash); //调用分段锁的remove方法 return s == null ? null : s.remove(key, hash, null ); } //删除指定元素(查找值等于给定值才删除) public boolean remove(object key, object value) { //使用哈希函数计算哈希码 int hash = hash(key); segment<k,v> s; //确保分段锁不为空才调用remove方法 return value != null && (s = segmentforhash(hash)) != null && s.remove(key, hash, value) != null ; } |
concurrenthashmap提供了两种删除操作,一种是找到后直接删除,一种是找到后先比较再删除。这两种删除方法都是先根据key的哈希码找到对应的分段锁后,再通过调用分段锁的remove方法完成删除操作。下面我们来看看分段锁的remove方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
//删除指定元素 final v remove(object key, int hash, object value) { //尝试获取锁, 若失败则进行自旋 if (!trylock()) { scanandlock(key, hash); } v oldvalue = null ; try { hashentry<k,v>[] tab = table; //计算元素在数组中的下标 int index = (tab.length - 1 ) & hash; //根据下标取得数组元素(链表头结点) hashentry<k,v> e = entryat(tab, index); hashentry<k,v> pred = null ; //遍历链表寻找要删除的元素 while (e != null ) { k k; //next指向当前结点的后继结点 hashentry<k,v> next = e.next; //根据key和hash寻找对应结点 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { v v = e.value; //传入的value不等于v就跳过, 其他情况就进行删除操作 if (value == null || value == v || value.equals(v)) { //如果pred为空则代表要删除的结点为头结点 if (pred == null ) { //重新设置链表头结点 setentryat(tab, index, next); } else { //设置pred结点的后继为next结点 pred.setnext(next); } ++modcount; --count; //记录元素删除之前的值 oldvalue = v; } break ; } //若e不是要找的结点就将pred引用指向它 pred = e; //检查下一个结点 e = next; } } finally { unlock(); } return oldvalue; } |
在删除分段锁中的元素时需要先获取锁,如果获取失败就调用scanandlock方法进行自旋,如果获取成功就执行下一步,首先计算数组下标然后通过下标获取hashentry数组的元素,这里获得了链表的头结点,接下来就是对链表进行遍历查找,在此之前先用next指针记录当前结点的后继结点,然后对比key和hash看看是否是要找的结点,如果是的话就执行下一个if判断。满足value为空或者value的值等于结点当前值这两个条件就会进入到if语句中进行删除操作,否则直接跳过。在if语句中执行删除操作时会有两种情况,如果当前结点为头结点则直接将next结点设置为头结点,如果当前结点不是头结点则将pred结点的后继设置为next结点。这里的pred结点表示当前结点的前继结点,每次在要检查下一个结点之前就将pred指向当前结点,这就保证了pred结点总是当前结点的前继结点。注意,与jdk1.6不同,在jdk1.7中hashentry对象的next变量不是final的,因此这里可以通过直接修改next引用的值来删除元素,由于next变量是volatile类型的,所以读线程可以马上读到最新的值。
8. 替换元素具体是怎样实现的?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
//替换指定元素(cas操作) public boolean replace(k key, v oldvalue, v newvalue) { //使用哈希函数计算哈希码 int hash = hash(key); //保证oldvalue和newvalue不为空 if (oldvalue == null || newvalue == null ) throw new nullpointerexception(); //根据哈希码获取分段锁的索引 segment<k,v> s = segmentforhash(hash); //调用分段锁的replace方法 return s != null && s.replace(key, hash, oldvalue, newvalue); } //替换元素操作(cas操作) final boolean replace(k key, int hash, v oldvalue, v newvalue) { //尝试获取锁, 若失败则进行自旋 if (!trylock()) { scanandlock(key, hash); } boolean replaced = false ; try { hashentry<k,v> e; //通过hash直接找到头结点然后对链表遍历 for (e = entryforhash( this , hash); e != null ; e = e.next) { k k; //根据key和hash找到要替换的结点 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { //如果指定的当前值正确则进行替换 if (oldvalue.equals(e.value)) { e.value = newvalue; ++modcount; replaced = true ; } //否则不进行任何操作直接返回 break ; } } } finally { unlock(); } return replaced; } |
concurrenthashmap同样提供了两种替换操作,一种是找到后直接替换,另一种是找到后先比较再替换(cas操作)。这两种操作的实现大致是相同的,只是cas操作在替换前多了一层比较操作,因此我们只需简单了解其中一种操作即可。这里拿cas操作进行分析,还是老套路,首先根据key的哈希码找到对应的分段锁,然后调用它的replace方法。进入分段锁中的replace方法后需要先去获取锁,如果获取失败则进行自旋,如果获取成功则进行下一步。首先根据hash码获取链表头结点,然后根据key和hash进行遍历查找,找到了对应的元素之后,比较给定的oldvalue是否是当前值,如果不是则放弃修改,如果是则用新值进行替换。由于hashentry对象的value域是volatile类型的,因此可以直接替换。
9. 自旋时具体做了些什么?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
//自旋等待获取锁(put操作) private hashentry<k,v> scanandlockforput(k key, int hash, v value) { //根据哈希码获取头结点 hashentry<k,v> first = entryforhash( this , hash); hashentry<k,v> e = first; hashentry<k,v> node = null ; int retries = - 1 ; //在while循环内自旋 while (!trylock()) { hashentry<k,v> f; if (retries < 0 ) { //如果头结点为空就新建一个node if (e == null ) { if (node == null ) { node = new hashentry<k,v>(hash, key, value, null ); } retries = 0 ; //否则就遍历链表定位该结点 } else if (key.equals(e.key)) { retries = 0 ; } else { e = e.next; } //retries每次在这加1, 并判断是否超过最大值 } else if (++retries > max_scan_retries) { lock(); break ; //retries为偶数时去判断first有没有改变 } else if ((retries & 1 ) == 0 && (f = entryforhash( this , hash)) != first) { e = first = f; retries = - 1 ; } } return node; } //自旋等待获取锁(remove和replace操作) private void scanandlock(object key, int hash) { //根据哈希码获取链表头结点 hashentry<k,v> first = entryforhash( this , hash); hashentry<k,v> e = first; int retries = - 1 ; //在while循环里自旋 while (!trylock()) { hashentry<k,v> f; if (retries < 0 ) { //遍历链表定位到该结点 if (e == null || key.equals(e.key)) { retries = 0 ; } else { e = e.next; } //retries每次在这加1, 并判断是否超过最大值 } else if (++retries > max_scan_retries) { lock(); break ; //retries为偶数时去判断first有没有改变 } else if ((retries & 1 ) == 0 && (f = entryforhash( this , hash)) != first) { e = first = f; retries = - 1 ; } } } |
在前面我们讲到过,分段锁中的put,remove,replace这些操作都会要求先去获取锁,只有成功获得锁之后才能进行下一步操作,如果获取失败就会进行自旋。自旋操作也是在jdk1.7中添加的,为了避免线程频繁的挂起和唤醒,以此提高并发操作时的性能。在put方法中调用的是scanandlockforput,在remove和replace方法中调用的是scanandlock。这两种自旋方法大致是相同的,这里我们只分析scanandlockforput方法。首先还是先根据hash码获得链表头结点,之后线程会进入while循环中执行,退出该循环的唯一方式是成功获取锁,而在这期间线程不会被挂起。刚进入循环时retries的值为-1,这时线程不会马上再去尝试获取锁,而是先去寻找到key对应的结点(没找到会新建一个),然后再将retries设为0,接下来就会一次次的尝试获取锁,对应retries的值也会每次加1,直到超过最大尝试次数如果还没获取到锁,就会调用lock方法进行阻塞获取。在尝试获取锁的期间,还会每隔一次(retries为偶数)去检查头结点是否被改变,如果被改变则将retries重置回-1,然后再重走一遍刚才的流程。这就是线程自旋时所做的操作,需注意的是如果在自旋时检测到头结点已被改变,则会延长线程的自旋时间。
10. 哈希表扩容时都做了哪些操作?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
//再哈希 @suppresswarnings ( "unchecked" ) private void rehash(hashentry<k,v> node) { //获取旧哈希表的引用 hashentry<k,v>[] oldtable = table; //获取旧哈希表的容量 int oldcapacity = oldtable.length; //计算新哈希表的容量(为旧哈希表的2倍) int newcapacity = oldcapacity << 1 ; //计算新的元素阀值 threshold = ( int )(newcapacity * loadfactor); //新建一个hashentry数组 hashentry<k,v>[] newtable = (hashentry<k,v>[]) new hashentry[newcapacity]; //生成新的掩码值 int sizemask = newcapacity - 1 ; //遍历旧表的所有元素 for ( int i = 0 ; i < oldcapacity ; i++) { //取得链表头结点 hashentry<k,v> e = oldtable[i]; if (e != null ) { hashentry<k,v> next = e.next; //计算元素在新表中的索引 int idx = e.hash & sizemask; //next为空表明链表只有一个结点 if (next == null ) { //直接把该结点放到新表中 newtable[idx] = e; } else { hashentry<k,v> lastrun = e; int lastidx = idx; //定位lastrun结点, 将lastrun之后的结点直接放到新表中 for (hashentry<k,v> last = next; last != null ; last = last.next) { int k = last.hash & sizemask; if (k != lastidx) { lastidx = k; lastrun = last; } } newtable[lastidx] = lastrun; //遍历在链表lastrun结点之前的元素, 将它们依次复制到新表中 for (hashentry<k,v> p = e; p != lastrun; p = p.next) { v v = p.value; int h = p.hash; int k = h & sizemask; hashentry<k,v> n = newtable[k]; newtable[k] = new hashentry<k,v>(h, p.key, v, n); } } } } //计算传入结点在新表中的下标 int nodeindex = node.hash & sizemask; //将传入结点添加到链表头结点 node.setnext(newtable[nodeindex]); //将新表指定下标元素换成传入结点 newtable[nodeindex] = node; //将哈希表引用指向新表 table = newtable; } |
rehash方法在put方法中被调用,我们知道在put方法时会新建元素并添加到哈希数组中,随着元素的增多发生哈希冲突的可能性越大,哈希表的性能也会随之下降。因此每次put操作时都会检查元素总数是否超过阀值,如果超过则调用rehash方法进行扩容。因为数组长度一旦确定则不能再被改变,因此需要新建一个数组来替换原先的数组。从代码中可以知道新创建的数组长度为原数组的2倍(oldcapacity << 1)。创建好新数组后需要将旧数组中的所有元素移到新数组中,因此需要计算每个元素在新数组中的下标。计算新下标的过程如下图所示。
我们知道下标直接取的是哈希码的后几位,由于新数组的容量是直接用旧数组容量右移1位得来的,因此掩码位数向右增加1位,取到的哈希码位数也向右增加1位。如上图,若旧的掩码值为111,则元素下标为101,扩容后新的掩码值为1111,则计算出元素的新下标为0101。由于同一条链表上的元素下标是相同的,现在假设链表所有元素的下标为101,在扩容后该链表元素的新下标只有0101或1101这两种情况,因此数组扩容会打乱原先的链表并将链表元素分成两批。在计算出新下标后需要将元素移动到新数组中,在hashmap中通过直接修改next引用导致了多线程的死锁。虽然在concurrenthashmap中通过加锁避免了这种情况,但是我们知道next域是volatile类型的,它的改动能立马被读线程读取到,因此为保证线程安全采用复制元素来迁移数组。但是对链表中每个元素都进行复制有点影响性能,作者发现链表尾部有许多元素的next是不变的,它们在新数组中的下标是相同的,因此可以考虑整体移动这部分元素。具统计实际操作中只有1/6的元素是必须复制的,所以整体移动链表尾部元素(lastrun后面的元素)是可以提升一定性能的。
注:本篇文章基于jdk1.7版本。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://www.cnblogs.com/liuyun1995/archive/2018/03/26/8631264.html