ConcurrentHashMap 原理详解 ⭐⭐⭐

面试题:ConcurrentHashMap 是如何保证线程安全的?

核心回答

ConcurrentHashMap 是 Java 并发包中提供的线程安全哈希表,相比 Hashtable 和 Collections.synchronizedMap(),它采用了更细粒度的锁机制,实现了更高的并发性能。

JDK 1.7 vs JDK 1.8 对比

特性 JDK 1.7 JDK 1.8
数据结构 Segment 数组 + HashEntry 数组 + 链表 Node 数组 + 链表 + 红黑树
锁机制 分段锁(Segment 继承 ReentrantLock) CAS + synchronized(锁单个 Node)
锁粒度 段级别(默认 16 段) 节点级别
并发度 固定 16(Segment 数量) 理论上无上限
哈希冲突 链表 链表 + 红黑树(长度≥8)
size 计算 分段统计后累加 基于 CounterCell 数组

JDK 1.8 核心结构

// 核心字段
 transient volatile Node<K,V>[] table;  // 哈希桶数组
 
 // Node 节点定义
 static class Node<K,V> implements Map.Entry<K,V> {
     final int hash;
     final K key;
     volatile V val;        // volatile 保证可见性
     volatile Node<K,V> next;
 }
 
 // 特殊节点:ForwardingNode(扩容时使用)
 static final class ForwardingNode<K,V> extends Node<K,V> {
     final Node<K,V>[] nextTable;
 }
 
 // 特殊节点:TreeBin(红黑树根节点)
 static final class TreeBin<K,V> extends Node<K,V> {
     TreeNode<K,V> root;
 }

核心参数

// 默认初始容量
private static final int DEFAULT_CAPACITY = 16;

// 默认并发级别(JDK 1.7 使用,1.8 仅用于兼容)
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

// 负载因子
private static final float LOAD_FACTOR = 0.75f;

// 链表转红黑树阈值
static final int TREEIFY_THRESHOLD = 8;

// 红黑树转链表阈值
static final int UNTREEIFY_THRESHOLD = 6;

// 最小树化容量(table 长度达到 64 才允许转红黑树)
static final int MIN_TREEIFY_CAPACITY = 64;

// 扩容时单线程最小迁移量
private static final int MIN_TRANSFER_STRIDE = 16;

put() 方法执行流程

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());  // 计算哈希值
    int binCount = 0;
    
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        
        // 1. table 未初始化,先初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        
        // 2. 目标位置为空,CAS 直接插入
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                break;
        }
        
        // 3. 发现 ForwardingNode,说明正在扩容,协助扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        
        // 4. 发生哈希冲突,需要加锁处理
        else {
            V oldVal = null;
            synchronized (f) {  // 只锁当前桶的头节点
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {  // 链表
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 找到相同 key,更新值
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            // 到链表尾部,插入新节点
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key, value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {  // 红黑树
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            
            // 检查是否需要树化
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    
    addCount(1L, binCount);  // 更新元素个数
    return null;
}

线程安全机制

1. CAS 操作(无锁化)

// 使用 Unsafe 进行 CAS 操作
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

CAS 使用场景

2. synchronized 细粒度锁

// 只锁住链表/红黑树的头节点
synchronized (f) {
    // 处理链表或红黑树
}

为什么用 synchronized 而不是 ReentrantLock?

JDK 1.8 对 synchronized 进行了优化:

3. volatile 保证可见性

// table 使用 volatile 修饰
transient volatile Node<K,V>[] table;

// Node 的 val 和 next 使用 volatile 修饰
volatile V val;
volatile Node<K,V> next;

扩容机制(多线程协作)

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    
    // 1. 计算每个线程负责的迁移量
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE;
    
    // 2. 初始化新 table
    if (nextTab == null) {
        try {
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    
    int nextn = nextTab.length;
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false;
    
    // 3. 多线程并行迁移
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        
        // 分配迁移任务
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        
        // 迁移完成
        if (i < 0 || i >= n || i + n >= nextn) {
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);  // 新阈值 = 新容量 * 0.75
                return;
            }
            // ...
        }
        
        // 迁移桶中的元素
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
            advance = true;
        else {
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    if (fh >= 0) {  // 链表迁移
                        // 将链表拆分成两部分
                        // 一部分在新 table 的原位置
                        // 另一部分在新 table 的原位置 + n
                    }
                    else if (f instanceof TreeBin) {  // 红黑树迁移
                        // ...
                    }
                }
            }
        }
    }
}

扩容特点

  1. 多线程协作:多个线程可以同时参与扩容
  2. 分段迁移:每个线程负责一段数据的迁移
  3. 渐进式:查询时遇到 ForwardingNode 会协助扩容

size() 方法实现

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
}

// 使用 CounterCell 数组分散热点
@sun.misc.Contended static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

优化思路

高频面试题

Q1: ConcurrentHashMap 为什么 key 和 value 不能为 null?

// ConcurrentHashMap 的 put 方法
if (key == null || value == null) throw new NullPointerException();

原因:

  1. 避免歧义:无法区分 “key 不存在” 和 “key 存在但 value 为 null”
  2. 并发安全:在多线程环境下,null 值可能导致判断错误

对比 HashMap

Q2: ConcurrentHashMap 和 Hashtable 的区别?

特性 ConcurrentHashMap Hashtable
锁机制 细粒度锁(桶级别) 粗粒度锁(整个表)
性能 高并发性能好 并发性能差
null 支持 不支持 不支持
迭代器 弱一致性 快速失败
出现版本 JDK 1.5 JDK 1.0

Q3: size() 方法是准确的吗?

不是实时的准确值,是弱一致性的:

Q4: get() 方法需要加锁吗?

不需要,get() 是完全无锁的:

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)  // 正在扩容或是红黑树
            return (p = e.find(h, key)) != null ? p.val : null;
        
        // 遍历链表
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

最佳实践

// 1. 预估容量,避免频繁扩容
ConcurrentHashMap<String, Object> map = new ConcurrentHashMap<>(128);

// 2. 使用 computeIfAbsent 原子性计算
map.computeIfAbsent(key, k -> createValue(k));

// 3. 使用 putIfAbsent 实现幂等
map.putIfAbsent(key, value);

// 4. 批量操作时考虑使用 merge
map.merge(key, value, (oldVal, newVal) -> oldVal + newVal);

参考链接: