javadoop.com/post/hashmap

今天发一篇”水文”,可能很多读者都会表示不理解,不过我想把它作为并发序列文章中不可缺少的一块来介绍。本来以为花不了多少时间的,不过最终还是投入了挺多时间来完成这篇文章的。

网上关于 HashMap 和 ConcurrentHashMap 的文章确实不少,不过缺斤少两的文章比较多,所以才想自己也写一篇,把细节说清楚说透node++,尤其像 Java8 中的 ConcurrentHashMap,大部分文章都说不清楚。终归是希望能降低大家学习的成本,不希望大家到处找各种不是很靠谱的文章,看完一篇又一篇,可是还是模模糊糊。

阅读建议:四节基本上可以进行独立阅读,建议初学者可按照 Java7 HashMap -> Java7 ConcurrentHashMap -> Java8 HashMap -> Java8 ConcurrentHashMap 顺序进行阅读,可适当降低阅读门槛。

阅读前提:本文分析的是源码,所以至少读者要熟悉它们的接口使用node++,同时,对于并发,读者至少要知道 CAS、ReentrantLock、UNSAFE 操作这几个基本的知识,文中不会对这些知识进行介绍。Java8 用到了红黑树,不过本文不会进行展开,感兴趣的读者请自行查找相关资料。

Java7 HashMap

HashMap 是最简单的,一来我们非常熟悉,二来就是它不支持并发操作,所以源码也非常简单。

首先,我们用下面这张图来介绍 HashMap 的结构。

node++_node xpath_node rest api

这个仅仅是示意图,因为没有考虑到数组要扩容的情况,具体的后面再说。

大方向上,HashMap 里面是一个数组,然后数组中每个元素是一个单向链表。

上图中,每个绿色的实体是嵌套类 Entry 的实例,Entry 包含四个属性:key, value, hash 值和用于单向链表的 next。

capacity:当前数组容量,始终保持 2^n,可以扩容,扩容后数组大小为当前的 2 倍。

loadFactor:负载因子,默认为 0.75。

threshold:扩容的阈值,等于 capacity * loadFactor

put 过程分析

还是比较简单的,跟着代码走一遍吧。

public V put(K key, V value) {

// 当插入第一个元素的时候,需要先初始化数组大小

if (table == EMPTY_TABLE) {

inflateTable(threshold);

}

// 如果 key 为 null,感兴趣的可以往里看,最终会将这个 entry 放到 table[0] 中

if (key == null)

return putForNullKey(value);

// 1. 求 key 的 hash 值

int hash = hash(key);

// 2. 找到对应的数组下标

int i = indexFor(hash, table.length);

// 3. 遍历一下对应下标处的链表,看是否有重复的 key 已经存在,

// 如果有,直接覆盖,put 方法返回旧值就结束了

for (Entry e = table[i]; e != null; e = e.next) {

Object k;

if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {

V oldValue = e.value;

e.value = value;

e.recordAccess(this);

return oldValue;

}

}

modCount++;

// 4. 不存在重复的 key,将此 entry 添加到链表中,细节后面说

addEntry(hash, key, value, i);

return null;

}

数组初始化

在第一个元素插入 HashMap 的时候做一次数组的初始化,就是先确定初始的数组大小,并计算数组扩容的阈值。

private void inflateTable(int toSize) {

// 保证数组大小一定是 2 的 n 次方。

// 比如这样初始化:new HashMap(20),那么处理成初始数组大小是 32

int capacity = roundUpToPowerOf2(toSize);

// 计算扩容阈值:capacity * loadFactor

threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1);

// 算是初始化数组吧

table = new Entry[capacity];

initHashSeedAsNeeded(capacity); //ignore

}

这里有一个将数组大小保持为 2 的 n 次方的做法,Java7 和 Java8 的 HashMap 和 ConcurrentHashMap 都有相应的要求,只不过实现的代码稍微有些不同,后面再看到的时候就知道了。

计算具体数组位置

这个简单,我们自己也能 YY 一个:使用 key 的 hash 值对数组长度进行取模就可以了。

static int indexFor(int hash, int length) {

// assert Integer.bitCount(length) == 1 : “length must be a non-zero power of 2”;

return hash & (length-1);

}

这个方法很简单,简单说就是取 hash 值的低 n 位。如在数组长度为 32 的时候,其实取的就是 key 的 hash 值的低 5 位,作为它在数组中的下标位置。

添加节点到链表中

找到数组下标后,会先进行 key 判重,如果没有重复,就准备将新值放入到链表的表头。

void addEntry(int hash, K key, V value, int bucketIndex) {

// 如果当前 HashMap 大小已经达到了阈值,并且新值要插入的数组位置已经有元素了,那么要扩容

if ((size >= threshold) && (null != table[bucketIndex])) {

// 扩容,后面会介绍一下

resize(2 * table.length);

// 扩容以后,重新计算 hash 值

hash = (null != key) ? hash(key) : 0;

// 重新计算扩容后的新的下标

bucketIndex = indexFor(hash, table.length);

}

// 往下看

createEntry(hash, key, value, bucketIndex);

}

// 这个很简单,其实就是将新值放到链表的表头,然后 size++

void createEntry(int hash, K key, V value, int bucketIndex) {

Entry e = table[bucketIndex];

table[bucketIndex] = new Entry(hash, key, value, e);

size++;

}

这个方法的主要逻辑就是先判断是否需要扩容,需要的话先扩容,然后再将这个新的数据插入到扩容后的数组的相应位置处的链表的表头。

数组扩容

前面我们看到,在插入新值的时候,如果当前的 size 已经达到了阈值,并且要插入的数组位置上已经有元素,那么就会触发扩容,扩容后,数组大小为原来的 2 倍。

void resize(int newCapacity) {

Entry[] oldTable = table;

int oldCapacity = oldTable.length;

if (oldCapacity == MAXIMUM_CAPACITY) {

threshold = Integer.MAX_VALUE;

return;

}

// 新的数组

Entry[] newTable = new Entry[newCapacity];

// 将原来数组中的值迁移到新的更大的数组中

transfer(newTable, initHashSeedAsNeeded(newCapacity));

table = newTable;

threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);

}

扩容就是用一个新的大数组替换原来的小数组,并将原来数组中的值迁移到新的数组中。

由于是双倍扩容,迁移过程中,会将原来 table[i] 中的链表的所有节点,分拆到新的数组的 newTable[i] 和 newTable[i + oldLength] 位置上。如原来数组长度是 16,那么扩容后,原来 table[0] 处的链表中的所有元素会被分配到新数组中 newTable[0] 和 newTable[16] 这两个位置。代码比较简单,这里就不展开了。

get 过程分析

相对于 put 过程,get 过程是非常简单的。

根据 key 计算 hash 值。

找到相应的数组下标:hash & (length – 1)。

遍历该数组位置处的链表,直到找到相等(==或equals)的 key。

public V get(Object key) {

// 之前说过,key 为 null 的话,会被放到 table[0],所以只要遍历下 table[0] 处的链表就可以了

if (key == null)

return getForNullKey();

//

Entry entry = getEntry(key);

return null == entry ? null : entry.getValue();

}

getEntry(key):

final Entry getEntry(Object key) {

if (size == 0) {

return null;

}

int hash = (key == null) ? 0 : hash(key);

// 确定数组下标,然后从头开始遍历链表,直到找到为止

for (Entry e = table[indexFor(hash, table.length)];

e != null;

e = e.next) {

Object k;

if (e.hash == hash &&

((k = e.key) == key || (key != null && key.equals(k))))

return e;

}

return null;

}

Java7 ConcurrentHashMap

ConcurrentHashMap 和 HashMap 思路是差不多的,但是因为它支持并发操作,所以要复杂一些。

整个 ConcurrentHashMap 由一个个 Segment 组成,Segment 代表”部分“或”一段“的意思,所以很多地方都会将其描述为分段锁。注意,行文中,我很多地方用了“槽”来代表一个 segment。

简单理解就是,ConcurrentHashMap 是一个 Segment 数组,Segment 通过继承 ReentrantLock 来进行加锁,所以每次需要加锁的操作锁住的是一个 segment,这样只要保证每个 Segment 是线程安全的,也就实现了全局的线程安全。

node rest api_node++_node xpath

concurrencyLevel:并行级别、并发数、Segment 数,怎么翻译不重要,理解它。默认是 16,也就是说 ConcurrentHashMap 有 16 个 Segments,所以理论上,这个时候,最多可以同时支持 16 个线程并发写,只要它们的操作分别分布在不同的 Segment 上。这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以扩容的。

再具体到每个 Segment 内部,其实每个 Segment 很像之前介绍的 HashMap,不过它要保证线程安全,所以处理起来要麻烦些。

初始化

initialCapacity:初始容量,这个值指的是整个 ConcurrentHashMap 的初始容量,实际操作的时候需要平均分给每个 Segment。

loadFactor:负载因子,之前我们说了,Segment 数组不可以扩容,所以这个负载因子是给每个 Segment 内部使用的。

public ConcurrentHashMap(int initialCapacity,

float loadFactor, int concurrencyLevel) {

if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel MAX_SEGMENTS)

concurrencyLevel = MAX_SEGMENTS;

// Find power-of-two sizes best matching arguments

int sshift = 0;

int ssize = 1;

// 计算并行级别 ssize,因为要保持并行级别是 2 的 n 次方

while (ssize < concurrencyLevel) {

++sshift;

ssize segmentShift) & segmentMask;

// 刚刚说了,初始化的时候初始化了 segment[0],但是其他位置还是 null,

// ensureSegment(j) 对 segment[j] 进行初始化

if ((s = (Segment)UNSAFE.getObject // nonvolatile; recheck

(segments, (j threshold && tab.length < MAXIMUM_CAPACITY)

rehash(node); // 扩容后面也会具体分析

else

// 没有达到阈值,将 node 放到数组 tab 的 index 位置,

// 其实就是将新的节点设置成原链表的表头

setEntryAt(tab, index, node);

++modCount;

count = c;

oldValue = null;

break;

}

}

} finally {

// 解锁

unlock();

}

return oldValue;

}

整体流程还是比较简单的,由于有独占锁的保护,所以 segment 内部的操作并不复杂。至于这里面的并发问题,我们稍后再进行介绍。

到这里 put 操作就结束了,接下来,我们说一说其中几步关键的操作。

初始化槽: ensureSegment

ConcurrentHashMap 初始化的时候会初始化第一个槽 segment[0],对于其他槽来说,在插入第一个值的时候进行初始化。

这里需要考虑并发,因为很可能会有多个线程同时进来初始化同一个槽 segment[k],不过只要有一个成功了就可以。

private Segment ensureSegment(int k) {

final Segment[] ss = this.segments;

long u = (k MAX_SCAN_RETRIES) {

lock();

break;

}

else if ((retries & 1) == 0 &&

// 这个时候是有大问题了,那就是有新的元素进到了链表,成为了新的表头

// 所以这边的策略是,相当于重新走一遍这个 scanAndLockForPut 方法

(f = entryForHash(this, hash)) != first) {

e = first = f; // re-traverse if entry changed

retries = -1;

}

}

return node;

}

这个方法有两个出口,一个是 tryLock() 成功了,循环终止,另一个就是重试次数超过了 MAX_SCAN_RETRIES,进到 lock() 方法,此方法会阻塞等待,直到成功拿到独占锁。

这个方法就是看似复杂,但是其实就是做了一件事,那就是获取该 segment 的独占锁,如果需要的话顺便实例化了一下 node。

扩容: rehash

重复一下,segment 数组不能扩容,扩容是 segment 数组某个位置内部的数组 HashEntry[] 进行扩容,扩容后,容量为原来的 2 倍。

首先,我们要回顾一下触发扩容的地方,put 的时候,如果判断该值的插入会导致该 segment 的元素个数超过阈值,那么先进行扩容,再插值,读者这个时候可以回去 put 方法看一眼。

该方法不需要考虑并发,因为到这里的时候,是持有该 segment 的独占锁的。

// 方法参数上的 node 是这次扩容后,需要添加到新的数组中的数据。

private void rehash(HashEntry node) {

HashEntry[] oldTable = table;

int oldCapacity = oldTable.length;

// 2 倍

int newCapacity = oldCapacity >> segmentShift) & segmentMask)

限时特惠:本站每日持续更新海量设计资源,一年会员只需29.9元,全站资源免费下载
站长微信:ziyuanshu688