并发的ConcurrentHashkMap是我们日常中使用频次非常高的一个并发map,在最近的面试中几乎每次都会被问到内部的原理,看了下源码记录下,如果有错误的地方,麻烦指正下。
PS:想写好一篇博文真的好累啊,最代码有没有保存草稿功能啊
jdk7以前和jdk8之后发生了很大的变化,分别看下:
JDK1.7以及以前
jdk1.7以及以前的版本,ConcurrentHashMap内部是由Segment数组+HashEntry数组实现的,是不是一个二级哈希表。画了个简图,如下:
并发:不同Segment读读,读写,写写操作可以并发进行;相同Segment的读读和读写操作互不影响,写写操作会阻塞其中一个线程,如下图:
ConcurrentHashMap的默认初始化容量DEFAULT_INITIAL_CAPACITY是16,默认的加载因子DEFAULT_LOAD_FACTOR是0.75,并发级别DEFAULT_CONCURRENCY_LEVEL默认是16,说下这个并发级别DEFAULT_CONCURRENCY_LEVEL,该变量表示预估有多少线程并发修改这个map,该变量决定了Segment数组的大小(也就是分段锁的个数),Segment数组默认大小最小是2,最大是65536,源码中的关键常量定义如下:
//默认初始化容量 static final int DEFAULT_INITIAL_CAPACITY = 16; //默认加载因子 static final float DEFAULT_LOAD_FACTOR = 0.75f; //默认并发级别,该常量决定了Segment分段的个数 static final int DEFAULT_CONCURRENCY_LEVEL = 16; //最大容量 static final int MAXIMUM_CAPACITY = 1 << 30; //Segment数组最小容量 static final int MIN_SEGMENT_TABLE_CAPACITY = 2; //在size方法和containsValue方法时使用的尝试次数 static final int RETRIES_BEFORE_LOCK = 2; //Segment最大容量 static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
Segment是什么呢?
首先看一段描述:Segments are specialized versions of hash tables. This subclasses from ReentrantLock opportunistically, just to simplify some locking and avoid separate construction,大概意思就是说Segments是一个特殊版的hash表,实现了ReentrantLock可重入锁,其实就是提供了分段锁的功能,这也是ConcurrentHashMap处理并发高效的原因,结合上面的图看下,因为Segment的存在降低了锁的粒度。
源码:
static final class Segment<K,V> extends ReentrantLock implements Serializable
Segment的几个重要属性
//tryLock的重试次数,单处理器重试次数为1,多处理器重试次数为64 static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; //Segment中的HashEntry数组,volatile修饰,保证多线程下的及时可见性 transient volatile HashEntry<K,V>[] table; //HashEntry数组大小 transient int count; //Segment被修改的次数 transient int modCount; /** * The table is rehashed when its size exceeds this threshold. * (The value of this field is always <tt>(int)(capacity * * loadFactor)</tt>.) *数组扩容,数组大小超过threshold,就必须扩容 */ transient int threshold; //加载因子 final float loadFactor;
HashEntry是什么呢?
HashEntry用来封装散列映射表中的键值对;value和next被volatile修饰,保证并发读写的可见性,同时next维护了一个链表结构。
static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; HashEntry(int hash, K key, V value, HashEntry<K,V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } .......................... }
ConcurrentHashMap的初始化方法:
根据concurrencyLevel计算Segment数组大小ssize,也就是分成几段,然后根据初始化容量initialCapacity/ssize计算出每个Segment中HashEntry数组的大小,然后初始化Segment数组的第一个元素,然后完成map的初始化
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; //Segment数组的大小 int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; //计算单个Segment中HashEntry数组的大小 int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // 初始化Segment[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor),(HashEntry<K,V>[])new HashEntry[cap]); //初始化Segment数组 Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
ConcurrentHashMap的put方法:通过两次Hash查找需要put的位置
先对key值进行一次hash,计算Segment数组中的具体Segment位置,没有找到则创建,找到了,则调用segment的put方法
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); }
然后调用Segment的put方法:并发分断加锁的地方就在这个地方了,先简单解释下该方法
1.首先调用tryLock获取锁,如果获取到锁,则再进行一次hash计算,计算value在HashEntry数组中的下标,如果存在HashEntry,根据key和hash值判断是否发生hash碰撞,没有碰撞(key的内存地址一致),用新值替换旧值;发生碰撞,遍历HashEntry链表,找到插入的位置(插入修改等操作都是基于CAS的,JAVA的并发包都是基于CAS实现的,没有CAS就没有并发包,可以关注下UnSafe这个类)。这其实就是ConcurrentHashMap解决hash碰撞的方法(分离链接法)。
2.没有获取到锁,则执行scanAndLockForPut()方法,在scanAndLockForPut方法中,会通过重复执行tryLock()方法尝试获取锁,在多处理器环境下,重复次数为64,单处理器重复次数为1(就是上面解释的Segment的MAX_SCAN_RETRIES属性),当执行tryLock()方法的次数超过上限时,则执行lock()方法挂起线程
final V put(K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; //计算HashEntry在HashEntry[]数组中的位置 int index = (tab.length - 1) & hash; HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) { K k; //根据内存地址比较key值 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; //onlyIfAbsent参数为false(默认情况),表示如果新值直接覆盖旧值;为true表示存在旧值直接返回,也就是不可添加重复的key if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; //数组容量大于threshold并且小于最大容量,扩容 if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // negative while locating node while (!tryLock()) {//尝试获取锁 HashEntry<K,V> f; // to recheck first below if (retries < 0) { //第一次,如果头结点为空,则创建新节点 if (e == null) { if (node == null) // speculatively create node node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } //如果key值相同,retries置零 else if (key.equals(e.key)) retries = 0; //否则,查找下一个结点 else e = e.next; } //如果尝试获取锁的次数大于最大尝试次数,调用lock阻塞当前线程,并跳出尝试获取锁的循环 else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } //retries是偶数并且不是头节点,在自旋中链头可能会发生变化,如果当前HashEntry要存放位置的首结点,如果有其它线程已经完成了插入的操作,则会将retries置为-1。 //ConcurrentHashMap认为这种情况之后会很快获取到锁。一直重复tryLock获取锁,获取到后返回node else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; }
如何扩容?
首先要清楚扩容是扩展Segment中HashEntry数组的大小
比如初始化map的容量initialCapacity是16,并发级别concurrencyLevel也是16(默认就是16),那么此时map中的Segment个数就是16个,单个Segment中的HashEntry数组大小cap就是initialCapacity/concurrencyLevel = 1,但是规定HashEntry的最小值是2(参见MIN_SEGMENT_TABLE_CAPACITY常量),所以此时cap的值为2,扩容阈值threshold=(int)(cap*loadFactor),即threshold=2*0.75=1.5,取整后就是1,那么触发扩容的条件就是当某一个Segment中的HashEntry数组个数大于threshold时,就会触发扩容
private void rehash(HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1; threshold = (int)(newCapacity * loadFactor); HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; if (next == null) // Single node on list newTable[idx] = e; else { // Reuse consecutive sequence at same slot HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // Clone remaining nodes for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
size方法的计算如何保证正确性?
采用连续统计的方式计算size大小,最多统计三次,参见RETRIES_BEFORE_LOCK常量。
1.先采用不加锁方式连续统计两次,主要统计的是每一个Segment的modCount,如果两次计算结果的modCount总和相同,则说明计算出的元素个数是准确的
2.前两次连续统计结果不一样,第三次对每一个Segment加锁统计,统计完后再释放锁。
public int size() { // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. final Segment<K,V>[] segments = this.segments; int size; boolean overflow; // true if size overflows 32 bits long sum; // sum of modCounts long last = 0L; // previous sum int retries = -1; // first iteration isn't retry try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; }
JDK1.8后
jdk1.8内部实现和1.7最大的变化就是取消了Segment分段枷锁的结构,而是用Node数组+cas+synchronized去实现,锁粒度更小。
优点:
1.高效的扩容:数组扩容是需要加锁保护的,1.8版本以前锁的个数就是Segment的个数,所以同时扩容的线程数是Segment的个数;1.8版本锁的粒度是单个Node节点,理论上可扩容的的线程数是Node数组的长度,但是为了防止扩容线程过多,其规定了扩容线程数是Node数组长度的1/16。
2.更小的锁粒度,1.8版本以前,锁定的是Segment段,1.8后版本从putVal方法可以看出,synchronized只锁定首个Node节点。
3.更快速的查询效率,1.8版本前,如果HashEntry链表过长的话,查询性能会严重降低,1.8之后的版本将其修改为链表+红黑树的结构,当链表长度大于8时,会通过treeifyBin将数据结构转换成红黑树,查询效率大大提高。
ConcurrentHashMap关键常量定义:
/** * Base counter value, used mainly when there is no contention, * but also as a fallback during table initialization * races. Updated via CAS. */ private transient volatile long baseCount; /** * Table initialization and resizing control. When negative, the * table is being initialized or resized: -1 for initialization, * else -(1 + the number of active resizing threads). Otherwise, * when table is null, holds the initial table size to use upon * creation, or 0 for default. After initialization, holds the * next element count value upon which to resize the table. */ private transient volatile int sizeCtl;
ConcurrentHashMap初始化方法
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; }
Node节点:
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } ..........省略......... }
put方法:
1.首次table为空,调用initTable初始化,也就是说Node数组的初始化发生在在第一次调用put方法的时候
2.根据key计算Node数组的位置,如果响应位置的Node没有初始化,则通过CAS进行初始化(如果有多个线程访问头结点为空的Node数组tab,其中线程1发现头结点tab[i]为空,执行casTabAt,发现头结点tab[i]为null等于预期值null,插入node1,线程2执行casTabAt发现tab[i]不等于预期值null,线程2重新回到for循环开始处,重新获取tab[i]作为预期值,重复上述逻辑,这就是经典的无锁算法)
3.如果当前节点处于MOVED移动状态,说明该链表正在进行transfer操作,返回扩容完成后的table
4.如果相应位置的Node不为空,且当前该节点不处于移动状态,则对该节点加synchronized锁
4.1如果该节点的hash不小于0,则遍历链表更新节点或插入新节点;
4.2如果该节点是TreeBin类型(红黑树),则调用putTreeVal()插入节点
4.3如果binCount不为0,说明put操作对数据产生了影响,如果当前链表的个数达到8个,则通过treeifyBin方法转化为红黑树,如果oldVal不为空,说明是一次更新操作,没有对元素个数产生影响,则直接返回旧值
5.如果插入的是一个新节点,则执行addCount()方法尝试更新元素个数baseCount
/** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
size方法:1.8中的size实现比1.7简单多,因为元素个数保存baseCount中,部分元素的变化个数保存在CounterCell数组中,通过累加baseCount和CounterCell数组中的数量,即可得到元素的总个数,实现如下:
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }