一Map體系
Hashtable是JDK 之前Map唯一線程安全的內置實現(CollectionssynchronizedMap不算)Hashtable繼承的是Dictionary(Hashtable是其唯一公開的子類)並不繼承AbstractMap或者HashMap盡管Hashtable和HashMap的結構非常類似但是他們之間並沒有多大聯系
ConcurrentHashMap是HashMap的線程安全版本ConcurrentSkipListMap是TreeMap的線程安全版本
最終可用的線程安全版本Map實現是ConcurrentHashMap/ConcurrentSkipListMap/Hashtable/Properties四個但是Hashtable是過時的類庫因此如果可以的應該盡可能的使用ConcurrentHashMap和ConcurrentSkipListMap
二concurrentHashMap的結構
我們通過ConcurrentHashMap的類圖來分析ConcurrentHashMap的結構
ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成Segment是一種可重入鎖ReentrantLock在ConcurrentHashMap裡扮演鎖的角色HashEntry則用於存儲鍵值對數據一個ConcurrentHashMap裡包含一個Segment數組Segment的結構和HashMap類似是一種數組和鏈表結構 一個Segment裡包含一個HashEntry數組每個HashEntry是一個鏈表結構的元素 每個Segment守護者一個HashEntry數組裡的元素當對HashEntry數組的數據進行修改時必須首先獲得它對應的Segment鎖
ConcurrentHashMap 類中包含兩個靜態內部類 HashEntry 和 Segment
HashEntry 用來封裝映射表的鍵 / 值對
Segment 用來充當鎖的角色
每個 Segment 對象守護整個散列映射表的若干個桶
每個桶是由若干個 HashEntry 對象鏈接起來的鏈表
一個 ConcurrentHashMap 實例中包含由若干個 Segment 對象組成的數組
三
ConcurrentHashMap實現原理
鎖分離 (Lock Stripping)
比如HashTable是一個過時的容器類
通過使用synchronized來保證線程安全
在線程競爭激烈的情況下HashTable的效率非常低下
原因是所有訪問HashTable的線程都必須競爭同一把鎖
那假如容器裡有多把鎖
每一把鎖用於鎖容器其中一部分數據
那麼當多線程訪問容器裡不同數據段的數據時
線程間就不會存在鎖競爭
從而可以有效的提高並發訪問效率
這就是ConcurrentHashMap所使用的鎖分段技術
ConcurrentHashMap內部使用段(Segment)來表示這些不同的部分
每個段其實就是一個小的hash table
它們有自己的鎖
只要多個修改操作發生在不同的段上
它們就可以並發進行
同樣當一個線程占用鎖訪問其中一個段數據的時候
其他段的數據也能被其他線程訪問
ConcurrentHashMap有些方法需要跨段
比如size()和containsValue()
它們可能需要鎖定整個表而而不僅僅是某個段
這需要按順序鎖定所有段
操作完畢後
又按順序釋放所有段的鎖
這裡
按順序
是很重要的
否則極有可能出現死鎖
在ConcurrentHashMap內部
段數組是final的
並且其成員變量實際上也是final的
但是
僅僅是將數組聲明為final的並不保證數組成員也是final的
這需要實現上的保證
這可以確保不會出現死鎖
因為獲得鎖的順序是固定的
不變性是多線程編程占有很重要的地位
下面還要談到
[java] view plaincopy /**
* The segments
each of which is a specialized hash table
*/
final Segment<K
V>[] segments;
不變(Immutable)和易變(Volatile)
ConcurrentHashMap完全允許多個讀操作並發進行
讀操作並不需要加鎖
如果使用傳統的技術
如HashMap中的實現
如果允許可以在hash鏈的中間添加或刪除元素
讀操作不加鎖將得到不一致的數據
ConcurrentHashMap實現技術是保證HashEntry幾乎是不可變的
HashEntry代表每個hash鏈中的一個節點
其結構如下所示
[java] view plaincopy static final class HashEntry<K
V> {
final K key;
final int hash;
volatile V value;
final HashEntry<K
V> next;
}
可以看到除了value不是final的
其它值都是final的
這意味著不能從hash鏈的中間或尾部添加或刪除節點
因為這需要修改next引用值
所有的節點的修改只能從頭部開始
對於put操作
可以一律添加到Hash鏈的頭部
但是對於remove操作
可能需要從中間刪除一個節點
這就需要將要刪除節點的前面所有節點整個復制一遍
最後一個節點指向要刪除結點的下一個結點
這在講解刪除操作時還會詳述
為了確保讀操作能夠看到最新的值
將value設置成volatile
這避免了加鎖
四
ConcurrentHashMap具體實現
ConcurrentHashMap的初始化
ConcurrentHashMap初始化方法是通過initialCapacity
loadFactor
concurrencyLevel幾個參數來初始化segments數組
段偏移量segmentShift
段掩碼segmentMask和每個segment裡的HashEntry數組
初始化segments數組
讓我們來看一下初始化segmentShift
segmentMask和segments數組的源代碼
[java] view plaincopy if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power
of
two sizes best matching arguments
int sshift =
;
int ssize =
;
while (ssize < concurrencyLevel) {
++sshift;
ssize 《=
;
}
segmentShift =
sshift;
segmentMask = ssize
;
this
segments = Segment
newArray(ssize)
由上面的代碼可知segments數組的長度ssize通過concurrencyLevel計算得出
為了能通過按位與的哈希算法來定位segments數組的索引
必須保證segments數組的長度是
的N次方(power
of
two size)
所以必須計算出一個是大於或等於concurrencyLevel的最小的
的N次方值來作為segments數組的長度
假如concurrencyLevel等於
或
ssize都會等於
即容器裡鎖的個數也是
注意concurrencyLevel的最大大小是
意味著segments數組的長度最大為
對應的二進制是
位
初始化segmentShift和segmentMask
這兩個全局變量在定位segment時的哈希算法裡需要使用
sshift等於ssize從
向左移位的次數
在默認情況下concurrencyLevel等於
需要向左移位移動
次
所以sshift等於
segmentShift用於定位參與hash運算的位數
segmentShift等於
減sshift
所以等於
這裡之所以用
是因為ConcurrentHashMap裡的hash()方法輸出的最大數是
位的
後面的測試中我們可以看到這點
segmentMask是哈希運算的掩碼
等於ssize減
即
掩碼的二進制各個位的值都是
因為ssize的最大長度是
所以segmentShift最大值是
segmentMask最大值是
對應的二進制是
位
每個位都是
初始化每個Segment
輸入參數initialCapacity是ConcurrentHashMap的初始化容量
loadfactor是每個segment的負載因子
在構造方法裡需要通過這兩個參數來初始化數組中的每個segment
上面代碼中的變量cap就是segment裡HashEntry數組的長度
它等於initialCapacity除以ssize的倍數c
如果c大於
就會取大於等於c的
的N次方值
所以cap不是
就是
的N次方
segment的容量threshold=(int)cap*loadFactor
默認情況下initialCapacity等於
loadfactor等於
通過運算cap等於
threshold等於零
[java] view plaincopy if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap =
;
while (cap < c)
cap 《=
;
for (int i =
; i < this
segments
length; ++i)
this
segments[i] = new Segment<K
V>(cap
loadFactor)
定位Segment
既然ConcurrentHashMap使用分段鎖Segment來保護不同段的數據
那麼在插入和獲取元素的時候
必須先通過哈希算法定位到Segment
可以看到ConcurrentHashMap會首先使用Wang/Jenkins hash的變種算法對元素的hashCode進行一次再哈希
[java] view plaincopy private static int hash(int h) {
h += (h 《
) ^
xffffcd
d;
h ^= (h >>>
)
h += (h 《
)
h ^= (h >>>
)
h += (h 《
) + (h 《
)
return h ^ (h >>>
)
}
之所以進行再哈希
其目的是為了減少哈希沖突
使元素能夠均勻的分布在不同的Segment上
從而提高容器的存取效率
假如哈希的質量差到極點
那麼所有的元素都在一個Segment中
不僅存取元素緩慢
分段鎖也會失去意義
我做了一個測試
不通過再哈希而直接執行哈希計算
[java] view plaincopy System
out
println(Integer
parseInt(
) &
)
System
out
println(Integer
parseInt(
) &
)
System
out
println(Integer
parseInt(
) &
)
System
out
println(Integer
parseInt(
) &
)
計算後輸出的哈希值全是
通過這個例子可以發現如果不進行再哈希
哈希沖突會非常嚴重
因為只要低位一樣
無論高位是什麼數
其哈希值總是一樣
我們再把上面的二進制數據進行再哈希後結果如下
為了方便閱讀
不足
位的高位補了
每隔四位用豎線分割下
[java] view plaincopy
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
可以發現每一位的數據都散列開了
通過這種再哈希能讓數字的每一位都能參加到哈希運算當中
從而減少哈希沖突
ConcurrentHashMap通過以下哈希算法定位segment
[java] view plaincopy final Segment<K
V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}
默認情況下segmentShift為
segmentMask為
再哈希後的數最大是
位二進制數據
向右無符號移動
位
意思是讓高
位參與到hash運算中
(hash >>> segmentShift) & segmentMask的運算結果分別是
和
可以看到hash值沒有發生沖突
ConcurrentHashMap的get操作
前面提到過ConcurrentHashMap的get操作是不用加鎖的
我們這裡看一下其實現
[java] view plaincopy public V get(Object key) {
int hash = hash(key
hashCode())
return segmentFor(hash)
get(key
hash)
}
看第三行
segmentFor這個函數用於確定操作應該在哪一個segment中進行
幾乎對ConcurrentHashMap的所有操作都需要用到這個函數
我們看下這個函數的實現
[java] view plaincopy final Segment<K
V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}
這個函數用了位操作來確定Segment
根據傳入的hash值向右無符號右移segmentShift位
然後和segmentMask進行與操作
結合我們之前說的segmentShift和segmentMask的值
就可以得出以下結論
假設Segment的數量是
的n次方
根據元素的hash值的高n位就可以確定元素到底在哪一個Segment中
在確定了需要在哪一個segment中進行操作以後
接下來的事情就是調用對應的Segment的get方法
[java] view plaincopy V get(Object key
int hash) {
if (count !=
) { // read
volatile
HashEntry<K
V> e = getFirst(hash)
while (e != null) {
if (e
hash == hash && key
equals(e
key)) {
V v = e
value;
if (v != null)
return v;
return readValueUnderLock(e)
// recheck
}
e = e
next;
}
}
return null;
}
先看第二行代碼
這裡對count進行了一次判斷
其中count表示Segment中元素的數量
我們可以來看一下count的定義
[java] view plaincopy transient volatile int count;
可以看到count是volatile的
實際上這裡裡面利用了volatile的語義
對volatile字段的寫入操作happens
before於每一個後續的同一個字段的讀操作
因為實際上put
remove等操作也會更新count的值
所以當競爭發生的時候
volatile的語義可以保證寫操作在讀操作之前
也就保證了寫操作對後續的讀操作都是可見的
這樣後面get的後續操作就可以拿到完整的元素內容
然後
在第三行
調用了getFirst()來取得鏈表的頭部
[java] view plaincopy HashEntry<K
V> getFirst(int hash) {
HashEntry<K
V>[] tab = table;
return tab[hash & (tab
length
)];
}
同樣
這裡也是用位操作來確定鏈表的頭部
hash值和HashTable的長度減一做與操作
最後的結果就是hash值的低n位
其中n是HashTable的長度以
為底的結果
在確定了鏈表的頭部以後
就可以對整個鏈表進行遍歷
看第
行
取出key對應的value的值
如果拿出的value的值是null
則可能這個key
value對正在put的過程中
如果出現這種情況
那麼就加鎖來保證取出的value是完整的
如果不是null
則直接返回value
ConcurrentHashMap的put操作
看完了get操作
再看下put操作
put操作的前面也是確定Segment的過程
這裡不再贅述
直接看關鍵的segment的put方法
[java] view plaincopy V put(K key
int hash
V value
boolean onlyIfAbsent) {
lock()
try {
int c = count;
if (c++ > threshold) // ensure capacity
rehash()
HashEntry<K
V>[] tab = table;
int index = hash & (tab
length
)
HashEntry<K
V> first = tab[index];
HashEntry<K
V> e = first;
while (e != null && (e
hash != hash || !key
equals(e
key)))
e = e
next;
V oldValue;
if (e != null) {
oldValue = e
value;
if (!onlyIfAbsent)
e
value = value;
}
else {
oldValue = null;
++modCount;
tab[index] = new HashEntry<K
V>(key
hash
first
value)
count = c; // write
volatile
}
return oldValue;
} finally {
unlock()
}
}
首先對Segment的put操作是加鎖完成的
然後在第五行
如果Segment中元素的數量超過了阈值(由構造函數中的loadFactor算出)這需要進行對Segment擴容
並且要進行rehash
關於rehash的過程大家可以自己去了解
這裡不詳細講了
第
和第
行的操作就是getFirst的過程
確定鏈表頭部的位置
第
行這裡的這個while循環是在鏈表中尋找和要put的元素相同key的元素
如果找到
就直接更新更新key的value
如果沒有找到
則進入
行這裡
生成一個新的HashEntry並且把它加到整個Segment的頭部
然後再更新count的值
ConcurrentHashMap的remove操作
Remove操作的前面一部分和前面的get和put操作一樣
都是定位Segment的過程
然後再調用Segment的remove方法
[java] view plaincopy V remove(Object key
int hash
Object value) {
lock()
try {
int c = count
;
HashEntry<K
V>[] tab = table;
int index = hash & (tab
length
)
HashEntry<K
V> first = tab[index];
HashEntry<K
V> e = first;
while (e != null && (e
hash != hash || !key
equals(e
key)))
e = e
next;
V oldValue = null;
if (e != null) {
V v = e
value;
if (value == null || value
equals(v)) {
oldValue = v;
// All entries following removed node can stay
// in list
but all preceding ones need to be
// cloned
++modCount;
HashEntry<K
V> newFirst = e
next;
for (HashEntry<K
V> p = first; p != e; p = p
next)
newFirst = new HashEntry<K
V>(p
key
p
hash
newFirst
p
value)
tab[index] = newFirst;
count = c; // write
volatile
}
}
return oldValue;
} finally {
unlock()
}
}
首先remove操作也是確定需要刪除的元素的位置
不過這裡刪除元素的方法不是簡單地把待刪除元素的前面的一個元素的next指向後面一個就完事了
我們之前已經說過HashEntry中的next是final的
一經賦值以後就不可修改
在定位到待刪除元素的位置以後
程序就將待刪除元素前面的那一些元素全部復制一遍
然後再一個一個重新接到鏈表上去
看一下下面這一幅圖來了解這個過程
假設鏈表中原來的元素如上圖所示現在要刪除元素那麼刪除元素以後的鏈表就如下圖所示
ConcurrentHashMap的size操作
在前面的章節中我們涉及到的操作都是在單個Segment中進行的但是ConcurrentHashMap有一些操作是在多個Segment中進行比如size操作ConcurrentHashMap的size操作也采用了一種比較巧的方式來盡量避免對所有的Segment都加鎖
前面我們提到了一個Segment中的有一個modCount變量代表的是對Segment中元素的數量造成影響的操作的次數這個值只增不減size操作就是遍歷了兩次Segment每次記錄Segment的modCount值然後將兩次的modCount進行比較如果相同則表示期間沒有發生過寫入操作就將原先遍歷的結果返回如果不相同則把這個過程再重復做一次如果再不相同則就需要將所有的Segment都鎖住然後一個一個遍歷了具體的實現大家可以看ConcurrentHashMap的源碼這裡就不貼了
From:http://tw.wingwit.com/Article/program/Java/gj/201311/27559.html