熱點推薦:
您现在的位置: 電腦知識網 >> 編程 >> Java編程 >> Java高級技術 >> 正文

Java多線程之ConcurrentHashMap深入分析

2022-06-13   來源: Java高級技術 

  一Map體系



  Hashtable是JDK 之前Map唯一線程安全的內置實現(CollectionssynchronizedMap不算)Hashtable繼承的是Dictionary(Hashtable是其唯一公開的子類)並不繼承AbstractMap或者HashMap盡管Hashtable和HashMap的結構非常類似但是他們之間並沒有多大聯系

  ConcurrentHashMap是HashMap的線程安全版本ConcurrentSkipListMap是TreeMap的線程安全版本

  最終可用的線程安全版本Map實現是ConcurrentHashMap/ConcurrentSkipListMap/Hashtable/Properties四個但是Hashtable是過時的類庫因此如果可以的應該盡可能的使用ConcurrentHashMap和ConcurrentSkipListMap

  concurrentHashMap的結構

  我們通過ConcurrentHashMap的類圖來分析ConcurrentHashMap的結構

  


  ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成Segment是一種可重入鎖ReentrantLock在ConcurrentHashMap裡扮演鎖的角色HashEntry則用於存儲鍵值對數據一個ConcurrentHashMap裡包含一個Segment數組Segment的結構和HashMap類似是一種數組和鏈表結構 一個Segment裡包含一個HashEntry數組每個HashEntry是一個鏈表結構的元素 每個Segment守護者一個HashEntry數組裡的元素當對HashEntry數組的數據進行修改時必須首先獲得它對應的Segment鎖

  


  ConcurrentHashMap 類中包含兩個靜態內部類 HashEntry 和 SegmentHashEntry 用來封裝映射表的鍵 / 值對Segment 用來充當鎖的角色每個 Segment 對象守護整個散列映射表的若干個桶每個桶是由若干個 HashEntry 對象鏈接起來的鏈表一個 ConcurrentHashMap 實例中包含由若干個 Segment 對象組成的數組
  三ConcurrentHashMap實現原理
  鎖分離 (Lock Stripping)
  比如HashTable是一個過時的容器類通過使用synchronized來保證線程安全在線程競爭激烈的情況下HashTable的效率非常低下原因是所有訪問HashTable的線程都必須競爭同一把鎖那假如容器裡有多把鎖每一把鎖用於鎖容器其中一部分數據那麼當多線程訪問容器裡不同數據段的數據時線程間就不會存在鎖競爭從而可以有效的提高並發訪問效率這就是ConcurrentHashMap所使用的鎖分段技術
  ConcurrentHashMap內部使用段(Segment)來表示這些不同的部分每個段其實就是一個小的hash table它們有自己的鎖只要多個修改操作發生在不同的段上它們就可以並發進行同樣當一個線程占用鎖訪問其中一個段數據的時候其他段的數據也能被其他線程訪問
  ConcurrentHashMap有些方法需要跨段比如size()和containsValue()它們可能需要鎖定整個表而而不僅僅是某個段這需要按順序鎖定所有段操作完畢後又按順序釋放所有段的鎖這裡按順序是很重要的否則極有可能出現死鎖在ConcurrentHashMap內部段數組是final的並且其成員變量實際上也是final的但是僅僅是將數組聲明為final的並不保證數組成員也是final的這需要實現上的保證這可以確保不會出現死鎖因為獲得鎖的順序是固定的不變性是多線程編程占有很重要的地位下面還要談到
  [java] view plaincopy /**
  * The segments each of which is a specialized hash table
  */
  final Segment<KV>[] segments;
  不變(Immutable)和易變(Volatile)
  ConcurrentHashMap完全允許多個讀操作並發進行讀操作並不需要加鎖如果使用傳統的技術如HashMap中的實現如果允許可以在hash鏈的中間添加或刪除元素讀操作不加鎖將得到不一致的數據ConcurrentHashMap實現技術是保證HashEntry幾乎是不可變的HashEntry代表每個hash鏈中的一個節點其結構如下所示
  [java] view plaincopy static final class HashEntry<KV> {
  final K key;
  final int hash;
  volatile V value;
  final HashEntry<KV> next;
  }
  可以看到除了value不是final的其它值都是final的這意味著不能從hash鏈的中間或尾部添加或刪除節點因為這需要修改next引用值所有的節點的修改只能從頭部開始對於put操作可以一律添加到Hash鏈的頭部但是對於remove操作可能需要從中間刪除一個節點這就需要將要刪除節點的前面所有節點整個復制一遍最後一個節點指向要刪除結點的下一個結點這在講解刪除操作時還會詳述為了確保讀操作能夠看到最新的值將value設置成volatile這避免了加鎖


  四ConcurrentHashMap具體實現
  ConcurrentHashMap的初始化
  ConcurrentHashMap初始化方法是通過initialCapacityloadFactor concurrencyLevel幾個參數來初始化segments數組段偏移量segmentShift段掩碼segmentMask和每個segment裡的HashEntry數組
  初始化segments數組讓我們來看一下初始化segmentShiftsegmentMask和segments數組的源代碼
  [java] view plaincopy if (concurrencyLevel > MAX_SEGMENTS)
  concurrencyLevel = MAX_SEGMENTS;
  // Find poweroftwo sizes best matching arguments
  int sshift = ;
  int ssize = ;
  while (ssize < concurrencyLevel) {
  ++sshift;
  ssize 《= ;
  }
  segmentShift = sshift;
  segmentMask = ssize ;
  thissegments = SegmentnewArray(ssize)
  由上面的代碼可知segments數組的長度ssize通過concurrencyLevel計算得出為了能通過按位與的哈希算法來定位segments數組的索引必須保證segments數組的長度是的N次方(poweroftwo size)所以必須計算出一個是大於或等於concurrencyLevel的最小的的N次方值來作為segments數組的長度假如concurrencyLevel等於ssize都會等於即容器裡鎖的個數也是注意concurrencyLevel的最大大小是意味著segments數組的長度最大為對應的二進制是
  初始化segmentShift和segmentMask這兩個全局變量在定位segment時的哈希算法裡需要使用sshift等於ssize從向左移位的次數在默認情況下concurrencyLevel等於需要向左移位移動所以sshift等於segmentShift用於定位參與hash運算的位數segmentShift等於減sshift所以等於這裡之所以用是因為ConcurrentHashMap裡的hash()方法輸出的最大數是位的後面的測試中我們可以看到這點segmentMask是哈希運算的掩碼等於ssize減掩碼的二進制各個位的值都是因為ssize的最大長度是所以segmentShift最大值是segmentMask最大值是對應的二進制是每個位都是
  初始化每個Segment輸入參數initialCapacity是ConcurrentHashMap的初始化容量loadfactor是每個segment的負載因子在構造方法裡需要通過這兩個參數來初始化數組中的每個segment
  上面代碼中的變量cap就是segment裡HashEntry數組的長度它等於initialCapacity除以ssize的倍數c如果c大於就會取大於等於c的的N次方值所以cap不是就是的N次方segment的容量threshold=(int)cap*loadFactor默認情況下initialCapacity等於loadfactor等於通過運算cap等於threshold等於零
  [java] view plaincopy if (initialCapacity > MAXIMUM_CAPACITY)
  initialCapacity = MAXIMUM_CAPACITY;
  int c = initialCapacity / ssize;
  if (c * ssize < initialCapacity)
  ++c;
  int cap = ;
  while (cap < c)
  cap 《= ;
  for (int i = ; i < thissegmentslength; ++i)
  thissegments[i] = new Segment<KV>(cap loadFactor)
  定位Segment
  既然ConcurrentHashMap使用分段鎖Segment來保護不同段的數據那麼在插入和獲取元素的時候必須先通過哈希算法定位到Segment可以看到ConcurrentHashMap會首先使用Wang/Jenkins hash的變種算法對元素的hashCode進行一次再哈希
  [java] view plaincopy private static int hash(int h) {
  h += (h 《 ) ^ xffffcdd;
  h ^= (h >>>
  h += (h 《
  h ^= (h >>>
  h += (h 《 ) + (h 《
  return h ^ (h >>>
  }
  之所以進行再哈希其目的是為了減少哈希沖突使元素能夠均勻的分布在不同的Segment上從而提高容器的存取效率假如哈希的質量差到極點那麼所有的元素都在一個Segment中不僅存取元素緩慢分段鎖也會失去意義我做了一個測試不通過再哈希而直接執行哈希計算
  [java] view plaincopy Systemoutprintln(IntegerparseInt( ) &
  Systemoutprintln(IntegerparseInt( ) &
  Systemoutprintln(IntegerparseInt( ) &
  Systemoutprintln(IntegerparseInt( ) &
  計算後輸出的哈希值全是通過這個例子可以發現如果不進行再哈希哈希沖突會非常嚴重因為只要低位一樣無論高位是什麼數其哈希值總是一樣我們再把上面的二進制數據進行再哈希後結果如下為了方便閱讀不足位的高位補了每隔四位用豎線分割下
  [java] view plaincopy |||||||
  |||||||
  |||||||
  |||||||
  可以發現每一位的數據都散列開了通過這種再哈希能讓數字的每一位都能參加到哈希運算當中從而減少哈希沖突ConcurrentHashMap通過以下哈希算法定位segment
  [java] view plaincopy final Segment<KV> segmentFor(int hash) {
  return segments[(hash >>> segmentShift) & segmentMask];
  }
  默認情況下segmentShift為segmentMask為再哈希後的數最大是位二進制數據向右無符號移動意思是讓高位參與到hash運算中 (hash >>> segmentShift) & segmentMask的運算結果分別是可以看到hash值沒有發生沖突
  ConcurrentHashMap的get操作
  前面提到過ConcurrentHashMap的get操作是不用加鎖的我們這裡看一下其實現
  [java] view plaincopy public V get(Object key) {
  int hash = hash(keyhashCode())
  return segmentFor(hash)get(key hash)
  }
  看第三行segmentFor這個函數用於確定操作應該在哪一個segment中進行幾乎對ConcurrentHashMap的所有操作都需要用到這個函數我們看下這個函數的實現
  [java] view plaincopy final Segment<KV> segmentFor(int hash) {
  return segments[(hash >>> segmentShift) & segmentMask];
  }
  這個函數用了位操作來確定Segment根據傳入的hash值向右無符號右移segmentShift位然後和segmentMask進行與操作結合我們之前說的segmentShift和segmentMask的值就可以得出以下結論假設Segment的數量是的n次方根據元素的hash值的高n位就可以確定元素到底在哪一個Segment中
  在確定了需要在哪一個segment中進行操作以後接下來的事情就是調用對應的Segment的get方法
  [java] view plaincopy V get(Object key int hash) {
  if (count != ) { // readvolatile
  HashEntry<KV> e = getFirst(hash)
  while (e != null) {
  if (ehash == hash && keyequals(ekey)) {
  V v = evalue;
  if (v != null)
  return v;
  return readValueUnderLock(e) // recheck
  }
  e = enext;
  }
  }
  return null;
  }



  先看第二行代碼這裡對count進行了一次判斷其中count表示Segment中元素的數量我們可以來看一下count的定義
  [java] view plaincopy transient volatile int count;
  可以看到count是volatile的實際上這裡裡面利用了volatile的語義
  對volatile字段的寫入操作happensbefore於每一個後續的同一個字段的讀操作
  因為實際上putremove等操作也會更新count的值所以當競爭發生的時候volatile的語義可以保證寫操作在讀操作之前也就保證了寫操作對後續的讀操作都是可見的這樣後面get的後續操作就可以拿到完整的元素內容
  然後在第三行調用了getFirst()來取得鏈表的頭部
  [java] view plaincopy HashEntry<KV> getFirst(int hash) {
  HashEntry<KV>[] tab = table;
  return tab[hash & (tablength )];
  }
  同樣這裡也是用位操作來確定鏈表的頭部hash值和HashTable的長度減一做與操作最後的結果就是hash值的低n位其中n是HashTable的長度以為底的結果
  在確定了鏈表的頭部以後就可以對整個鏈表進行遍歷看第取出key對應的value的值如果拿出的value的值是null則可能這個keyvalue對正在put的過程中如果出現這種情況那麼就加鎖來保證取出的value是完整的如果不是null則直接返回value
  ConcurrentHashMap的put操作
  看完了get操作再看下put操作put操作的前面也是確定Segment的過程這裡不再贅述直接看關鍵的segment的put方法
  [java] view plaincopy V put(K key int hash V value boolean onlyIfAbsent) {
  lock()
  try {
  int c = count;
  if (c++ > threshold) // ensure capacity
  rehash()
  HashEntry<KV>[] tab = table;
  int index = hash & (tablength
  HashEntry<KV> first = tab[index];
  HashEntry<KV> e = first;
  while (e != null && (ehash != hash || !keyequals(ekey)))
  e = enext;
  V oldValue;
  if (e != null) {
  oldValue = evalue;
  if (!onlyIfAbsent)
  evalue = value;
  }
  else {
  oldValue = null;
  ++modCount;
  tab[index] = new HashEntry<KV>(key hash first value)
  count = c; // writevolatile
  }
  return oldValue;
  } finally {
  unlock()
  }
  }
  首先對Segment的put操作是加鎖完成的然後在第五行如果Segment中元素的數量超過了阈值(由構造函數中的loadFactor算出)這需要進行對Segment擴容並且要進行rehash關於rehash的過程大家可以自己去了解這裡不詳細講了
  第和第行的操作就是getFirst的過程確定鏈表頭部的位置
  第行這裡的這個while循環是在鏈表中尋找和要put的元素相同key的元素如果找到就直接更新更新key的value如果沒有找到則進入行這裡生成一個新的HashEntry並且把它加到整個Segment的頭部然後再更新count的值
  ConcurrentHashMap的remove操作
  Remove操作的前面一部分和前面的get和put操作一樣都是定位Segment的過程然後再調用Segment的remove方法
  [java] view plaincopy V remove(Object key int hash Object value) {
  lock()
  try {
  int c = count ;
  HashEntry<KV>[] tab = table;
  int index = hash & (tablength
  HashEntry<KV> first = tab[index];
  HashEntry<KV> e = first;
  while (e != null && (ehash != hash || !keyequals(ekey)))
  e = enext;
  V oldValue = null;
  if (e != null) {
  V v = evalue;
  if (value == null || valueequals(v)) {
  oldValue = v;
  // All entries following removed node can stay
  // in list but all preceding ones need to be
  // cloned
  ++modCount;
  HashEntry<KV> newFirst = enext;
  for (HashEntry<KV> p = first; p != e; p = pnext)
  newFirst = new HashEntry<KV>(pkey phash
  newFirst pvalue)
  tab[index] = newFirst;
  count = c; // writevolatile
  }
  }
  return oldValue;
  } finally {
  unlock()
  }
  }
  首先remove操作也是確定需要刪除的元素的位置不過這裡刪除元素的方法不是簡單地把待刪除元素的前面的一個元素的next指向後面一個就完事了我們之前已經說過HashEntry中的next是final的一經賦值以後就不可修改在定位到待刪除元素的位置以後程序就將待刪除元素前面的那一些元素全部復制一遍然後再一個一個重新接到鏈表上去看一下下面這一幅圖來了解這個過程

  
假設鏈表中原來的元素如上圖所示現在要刪除元素那麼刪除元素以後的鏈表就如下圖所示

  

  ConcurrentHashMap的size操作
  在前面的章節中我們涉及到的操作都是在單個Segment中進行的但是ConcurrentHashMap有一些操作是在多個Segment中進行比如size操作ConcurrentHashMap的size操作也采用了一種比較巧的方式來盡量避免對所有的Segment都加鎖
  前面我們提到了一個Segment中的有一個modCount變量代表的是對Segment中元素的數量造成影響的操作的次數這個值只增不減size操作就是遍歷了兩次Segment每次記錄Segment的modCount值然後將兩次的modCount進行比較如果相同則表示期間沒有發生過寫入操作就將原先遍歷的結果返回如果不相同則把這個過程再重復做一次如果再不相同則就需要將所有的Segment都鎖住然後一個一個遍歷了具體的實現大家可以看ConcurrentHashMap的源碼這裡就不貼了


From:http://tw.wingwit.com/Article/program/Java/gj/201311/27559.html
    推薦文章
    Copyright © 2005-2022 電腦知識網 Computer Knowledge   All rights reserved.