`

ConcurrentHashMap<K, V>的实现

 
阅读更多

哈希表存储模型



 

 

与Map接口中的内部Entry<K, V>接口(键值对)不同,

ConcurrentHashMap在其基础上,自己规定了内部类HashEntry<K, V>

 

static final class HashEntry<K,V> {
        final K key;
        final int hash;
        volatile V value;
        final HashEntry<K,V> next;

        HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
            this.key = key;
            this.hash = hash;
            this.next = next;
            this.value = value;
        }

	@SuppressWarnings("unchecked")
	static final <K,V> HashEntry<K,V>[] newArray(int i) {
	    return new HashEntry[i];
	}
}

 

 

该内部对象,多了两个属性:哈希值,以及next对象(其作用是遇到key值不同,但是哈希值相同的对象,在那个特定的哈希数组索引节点可以形成链表)

 

为了保证线程安全,ConcurrentHashMap类封装了自己的Map经典操作逻辑,get,put,remove等。

在其内部实现了Segment<K,V>类,主要属性为:

 

transient volatile HashEntry<K,V>[] table

规定了哈希数组(表)

 

 

内部类Segment<K,V>实现的经典方法:

 

HashEntry<K,V> getFirst(int hash) {
            HashEntry<K,V>[] tab = table;
            return tab[hash & (tab.length - 1)];
        }

总是返回哈希索引的第一个对象

 

 

 

V get(Object key, int hash) {
            if (count != 0) { // read-volatile
                HashEntry<K,V> e = getFirst(hash);
                while (e != null) {
                    if (e.hash == hash && key.equals(e.key)) {
                        V v = e.value;
                        if (v != null)
                            return v;
                        return readValueUnderLock(e); // recheck
                    }
                    e = e.next;
                }
            }
            return null;
        }

循环遍历在此哈希节点上的所有对象吗,找到符合条件的为止。

 

 

 

 V put(K key, int hash, V value, boolean onlyIfAbsent) {
            lock();
            try {
                int c = count;
                if (c++ > threshold) // ensure capacity
                    rehash();
                HashEntry<K,V>[] tab = table;
                int index = hash & (tab.length - 1);
                HashEntry<K,V> first = tab[index];
                HashEntry<K,V> e = first;
                while (e != null && (e.hash != hash || !key.equals(e.key)))
                    e = e.next;

                V oldValue;
                if (e != null) {
                    oldValue = e.value;
                    if (!onlyIfAbsent)
                        e.value = value;
                }
                else {
                    oldValue = null;
                    ++modCount;
                    tab[index] = new HashEntry<K,V>(key, hash, first, value);
                    count = c; // write-volatile
                }
                return oldValue;
            } finally {
                unlock();
            }
        }

加锁,保证线程安全

 

若添加时key值不重复,hash不重复,直接填写哈希表。

若添加时key值重复,将会覆盖原先的键值对。

若添加时key值不重复,但hash值重复,总是将这类新加入的键值对节点设置成为该哈希索引的第一个节点。

HashEntry<K,V> first = tab[index];

上一方法中,哈希索引的first节点,在新的键值对生成时,变为新节点的下一个节点

tab[index] = new HashEntry<K,V>(key, hash, first, value);

HashEntry(K key, int hash, HashEntry<K,V> next, V value)

 

 

V remove(Object key, int hash, Object value) {
            lock();
            try {
                int c = count - 1;
                HashEntry<K,V>[] tab = table;
                int index = hash & (tab.length - 1);
                HashEntry<K,V> first = tab[index];
                HashEntry<K,V> e = first;
                while (e != null && (e.hash != hash || !key.equals(e.key)))
                    e = e.next;

                V oldValue = null;
                if (e != null) {
                    V v = e.value;
                    if (value == null || value.equals(v)) {
                        oldValue = v;
                        // All entries following removed node can stay
                        // in list, but all preceding ones need to be
                        // cloned.
                        ++modCount;
                        HashEntry<K,V> newFirst = e.next;
                        for (HashEntry<K,V> p = first; p != e; p = p.next)
                            newFirst = new HashEntry<K,V>(p.key, p.hash,
                                                          newFirst, p.value);
                        tab[index] = newFirst;
                        count = c; // write-volatile
                    }
                }
                return oldValue;
            } finally {
                unlock();
            }
        }

remove方法的最核心一点就是,删除节点后,需要重新构造在此哈希索引上的当前链表。

 

 

以哈希存储模型1为例,当删除e4后,其链表的调整顺序为:



 
以2为例,当删除e6后,



然而,在ConcurrentHashMap中,最核心的还要属内部类Segment的设计。

 static final class Segment<K,V> extends ReentrantLock implements Serializable { 
        /** 
         * 在本 segment 范围内,包含的 HashEntry 元素的个数
         * 该变量被声明为 volatile 型
         */ 
        transient volatile int count; 

        /** 
         * table 被更新的次数
         */ 
        transient int modCount; 

        /** 
         * 当 table 中包含的 HashEntry 元素的个数超过本变量值时,触发 table 的再散列
         */ 
        transient int threshold; 

        /** 
         * table 是由 HashEntry 对象组成的数组
         * 如果散列时发生碰撞,碰撞的 HashEntry 对象就以链表的形式链接成一个链表
         * table 数组的数组成员代表散列映射表的一个桶
         * 每个 table 守护整个 ConcurrentHashMap 包含桶总数的一部分
         * 如果并发级别为 16,table 则守护 ConcurrentHashMap 包含的桶总数的 1/16 
         */ 
        transient volatile HashEntry<K,V>[] table; 

        /** 
         * 装载因子
         */ 
        final float loadFactor; 

        Segment(int initialCapacity, float lf) { 
            loadFactor = lf; 
            setTable(HashEntry.<K,V>newArray(initialCapacity)); 
        } 

        /** 
         * 设置 table 引用到这个新生成的 HashEntry 数组
         * 只能在持有锁或构造函数中调用本方法
         */ 
        void setTable(HashEntry<K,V>[] newTable) { 
            // 计算临界阀值为新数组的长度与装载因子的乘积
            threshold = (int)(newTable.length * loadFactor); 
            table = newTable; 
        } 

        /** 
         * 根据 key 的散列值,找到 table 中对应的那个桶(table 数组的某个数组成员)
         */ 
        HashEntry<K,V> getFirst(int hash) { 
            HashEntry<K,V>[] tab = table; 
            // 把散列值与 table 数组长度减 1 的值相“与”,
 // 得到散列值对应的 table 数组的下标
            // 然后返回 table 数组中此下标对应的 HashEntry 元素
            return tab[hash & (tab.length - 1)]; 
        } 
 }

 

 

Segment 类继承于 ReentrantLock 类,从而使得 Segment 对象能充当锁的角色。每个 Segment 对象用来守护其(成员对象 table 中)包含的若干个桶。

table 是一个由 HashEntry 对象组成的数组。table 数组的每一个数组成员就是散列映射表的一个桶。(上文已经介绍)

count 变量是一个计数器,它表示每个 Segment 对象管理的 table 数组(若干个 HashEntry 组成的链表)包含的 HashEntry 对象的个数。每一个 Segment 对象都有一个 count 对象来表示本 Segment 中包含的 HashEntry 对象的总数。

 

注意,之所以在每个 Segment 对象中包含一个计数器,而不是在 ConcurrentHashMap 中使用全局的计数器,是为了当需要更新计数器时,不用锁定整个 ConcurrentHashMap。

 

在ConcurrentHashMap内部,使用数组的方式,对Segment对象进行维护。

final Segment<K,V>[] segments;

而数组segments的索引,也是通过特殊的哈希算法来确定。

final Segment<K,V> segmentFor(int hash) {
        return segments[(hash >>> segmentShift) & segmentMask];
    }

  

实现Map接口的put,get等方法,首先需要确定元素在segments中的索引,然后根据key值与哈希值确定segments[index]对象中table(哈希表)的索引

public V put(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key.hashCode());
        return segmentFor(hash).put(key, hash, value, false);
    }

 

整个ConcurrentHashMap的结构如下所示:



 

所以在内部类Segment的put, remove等方法中的加锁操作是针对(键的 hash 值对应的)某个具体的 Segment,锁定的是该 Segment 而不是整个 ConcurrentHashMap。因为插入键 / 值对操作只是在这个 Segment 包含的某个桶中完成,不需要锁定整个ConcurrentHashMap。此时,其他写线程对另外 15 个Segment 的加锁并不会因为当前线程对这个 Segment 的加锁而阻塞。同时,所有读线程几乎不会因本线程的加锁而阻塞(除非读线程刚好读到这个 Segment 中某个 HashEntry 的 value 域的值为 null,此时需要加锁后重新读取该值)。

相比较于 HashTable 和由同步包装器包装的 HashMap每次只能有一个线程执行读或写操作,ConcurrentHashMap 在并发访问性能上有了质的提高。在理想状态下,ConcurrentHashMap 可以支持 16 个线程执行并发写操作(如果并发级别设置为 16),及任意数量线程的读操作。

 

总结:

引入Segment的好处是,将N多个键值对的存储维护分摊到不同的Segment中,在不同的Segment中对哈希表进行维护,由于各个Segment操作之间隔离,能够最大限度的对键值对进行高效并发操作。

 

 

  • 大小: 30.5 KB
  • 大小: 9 KB
  • 大小: 2.9 KB
  • 大小: 21.3 KB
2
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics