熱點推薦:
您现在的位置: 電腦知識網 >> 編程 >> Java編程 >> Java高級技術 >> 正文

Java 5.0多線程編程

2013-11-23 19:53:48  來源: Java高級技術 

  Java自年面世以來得到了廣泛得一個運用但是對多線程編程的支持Java很長時間一直停留在初級階段在Java 之前Java裡的多線程編程主要是通過Thread類Runnable接口Object對象中的wait() notify() notifyAll()等方法和synchronized關鍵詞來實現的這些工具雖然能在大多數情況下解決對共享資源的管理和線程間的調度但存在以下幾個問題

        過於原始拿來就能用的功能有限即使是要實現簡單的多線程功能也需要編寫大量的代碼這些工具就像匯編語言一樣難以學習和使用比這更糟糕的是稍有不慎它們還可能被錯誤地使用而且這樣的錯誤很難被發現

        如果使用不當會使程序的運行效率大大降低

        為了提高開發效率簡化編程開發人員在做項目的時候往往需要寫一些共享的工具來實現一些普遍適用的功能但因為沒有規范相同的工具會被重復地開發造成資源浪費

        因為鎖定的功能是通過Synchronized來實現的這是一種塊結構只能對代碼中的一段代碼進行鎖定而且鎖定是單一的如以下代碼所示

  synchronized(lock){

  //執行對共享資源的操作

  ……

  }

  一些復雜的功能就很難被實現比如說如果程序需要取得lock A和lock B來進行操作然後需要取得lock C並且釋放lock A來進行操作Java 之前的多線程框架就顯得無能為力了

  因為這些問題程序員對舊的框架一直頗有微詞這種情況一直到Java 才有較大的改觀一系列的多線程工具包被納入了標准庫文件這些工具包括了一個新的多線程程序的執行框架使編程人員可方便地協調和調度線程的運行並且新加入了一些高性能的常用的工具使程序更容易編寫運行效率更高本文將分類並結合例子來介紹這些新加的多線程工具

  

  在我們開始介紹Java 裡的新Concurrent工具前讓我們先來看一下一個用舊的多線程工具編寫的程序這個程序裡有一個Server線程它需要啟動兩個ComponentServer線程需等到Component線程完畢後再繼續相同的功能在Synchronizer一章裡用新加的工具CountDownLatch有相同的實現兩個程序孰優孰劣哪個程序更容易編寫哪個程序更容易理解相信大家看過之後不難得出結論

  public class ServerThread {

  Object concLock = new Object();

  int count = ;

  public void runTwoThreads() {

  //啟動兩個線程去初始化組件

  new Thread(new ComponentThread(this))start();

  new Thread(new ComponentThread(this))start();

  // Wait for other thread

  while(count != ) {

  synchronized(concLock) {

  try {

  concLockwait();

  Systemoutprintln(Wake up);

  } catch (InterruptedException ie) { //處理異常}

  }

  }

  Systemoutprintln(Server is up);

  }

  public void callBack() {

  synchronized(concLock) {

  count;

  concLocknotifyAll();

  }

  }

  public static void main(String[] args){

  ServerThread server = new ServerThread();

  serverrunTwoThreads();

  }

  }

  

  public class ComponentThread implements Runnable {

  private ServerThread server;

  public ComponentThread(ServerThread server) {

  thisserver = server;

  }

  public void run() {

  //做組件初始化的工作

  Systemoutprintln(Do component initialization);

  servercallBack();

  }

  }

三個新加的多線程包

  Java 裡新加入了三個多線程包ncurrent ncurrentatomic ncurrentlocks


ncurrent包含了常用的多線程工具是新的多線程工具的主體
ncurrentatomic包含了不用加鎖情況下就能改變值的原子變量比如說AtomicInteger提供了addAndGet()方法Add和Get是兩個不同的操作為了保證別的線程不干擾以往的做法是先鎖定共享的變量然後在鎖定的范圍內進行兩步操作但用AtomicIntegeraddAndGet()就不用擔心鎖定的事了其內部實現保證了這兩步操作是在原子量級發生的不會被別的線程干擾
ncurrentlocks包包含鎖定的工具

Callable 和 Future接口

  Callable是類似於Runnable的接口實現Callable接口的類和實現Runnable的類都是可被其它線程執行的任務Callable和Runnable有幾點不同


Callable規定的方法是call()而Runnable規定的方法是run()
Callable的任務執行後可返回值而Runnable的任務是不能返回值的
call()方法可拋出異常而run()方法是不能拋出異常的
運行Callable任務可拿到一個Future對象通過Future對象可了解任務執行情況可取消任務的執行還可獲取任務執行的結果

  以下是Callable的一個例子

  public class DoCallStuff implements Callable{ // *

  private int aInt;

  public DoCallStuff(int aInt) {

  thisaInt = aInt;

  }

  public String call() throws Exception { //*

  boolean resultOk = false;

  if(aInt == ){

  resultOk = true;

  }  else if(aInt == ){

  while(true){ //infinite loop

  Systemoutprintln(looping);

  Threadsleep();

  }

  } else {

  throw new Exception(Callable terminated with Exception!); //*

  }

  if(resultOk){

  return Task done;

  } else {

  return Task failed;

  }

  }

  }

  *: 名為DoCallStuff類實現了CallableString將是call方法的返回值類型例子中用了String但可以是任何Java類

  *: call方法的返回值類型為String這是和類的定義相對應的並且可以拋出異常

  *: call方法可以拋出異常如加重的斜體字所示

  

  以下是調用DoCallStuff的主程序

  import ncurrentExecutionException;

  import ncurrentExecutorService;

  import ncurrentExecutors;

  import ncurrentFuture;

  public class Executor {

  public static void main(String[] args){

  //*

  DoCallStuff call = new DoCallStuff();

  DoCallStuff call = new DoCallStuff();

  DoCallStuff call = new DoCallStuff();

  //*

  ExecutorService es = ExecutorsnewFixedThreadPool();

  //*

  Future future = essubmit(call);

  Future future = essubmit(call);

  Future future = essubmit(call);

  try {

  //*

  Systemoutprintln(futureget());

  //*

  Threadsleep();

  Systemoutprintln(Thread terminated? : + futurecancel(true));

  //*

  Systemoutprintln(futureget());

  } catch (ExecutionException ex) {

  exprintStackTrace();

  } catch (InterruptedException ex) {

  exprintStackTrace();

  }

  }

  }

  *: 定義了幾個任務

  *: 初始了任務執行工具任務的執行框架將會在後面解釋

  *: 執行任務任務啟動時返回了一個Future對象如果想得到任務執行的結果或者是異常可對這個Future對象進行操作Future所含的值必須跟Callable所含的值對映比如說例子中Future對印Callable

  *: 任務正常執行完畢futureget()會返回線程的值

  *: 任務在進行一個死循環調用futurecancel(true)來中止此線程傳入的參數標明是否可打斷線程true表明可以打斷

  *: 任務拋出異常調用futureget()時會引起異常的拋出

  運行Executor會有以下運行結果

  looping

  Task done //*

  looping

  looping//*

  looping

  looping

  looping

  looping

  Thread terminated? :true //*

  //*

  ncurrentExecutionException: javalangException: Callable terminated with Exception!

  at ncurrentFutureTask$SyncinnerGet(FutureTaskjava:)

  at ncurrentFutureTaskget(FutureTaskjava:)

  at concurrentExecutormain(Executorjava:)

  ……

  *: 任務正常結束

  *: 任務是個死循環這是它的打印結果

  *: 指示任務被取消

  *: 在執行futureget()時得到任務拋出的異常

新的任務執行架構

  在Java 之前啟動一個任務是通過調用Thread類的start()方法來實現的任務的提於交和執行是同時進行的如果你想對任務的執行進行調度或是控制同時執行的線程數量就需要額外編寫代碼來完成裡提供了一個新的任務執行架構使你可以輕松地調度和控制任務的執行並且可以建立一個類似數據庫連接池的線程池來執行任務這個架構主要有三個接口和其相應的具體類組成這三個接口是Executor ExecutorService和ScheduledExecutorService讓我們先用一個圖來顯示它們的關系

  

  圖的左側是接口圖的右側是這些接口的具體類注意Executor是沒有直接具體實現的

  Executor接口

  是用來執行Runnable任務的它只定義一個方法


execute(Runnable command)執行Ruannable類型的任務

  ExecutorService接口

  ExecutorService繼承了Executor的方法並提供了執行Callable任務和中止任務執行的服務其定義的方法主要有


submit(task)可用來提交Callable或Runnable任務並返回代表此任務的Future對象
invokeAll(collection of tasks)批處理任務集合並返回一個代表這些任務的Future對象集合
shutdown()在完成已提交的任務後關閉服務不再接受新任務
shutdownNow()停止所有正在執行的任務並關閉服務
isTerminated()測試是否所有任務都執行完畢了
isShutdown()測試是否該ExecutorService已被關閉

  ScheduledExecutorService接口

  在ExecutorService的基礎上ScheduledExecutorService提供了按時間安排執行任務的功能它提供的方法主要有


schedule(task initDelay): 安排所提交的Callable或Runnable任務在initDelay指定的時間後執行
scheduleAtFixedRate()安排所提交的Runnable任務按指定的間隔重復執行
scheduleWithFixedDelay()安排所提交的Runnable任務在每次執行完後等待delay所指定的時間後重復執行

  代碼ScheduleExecutorService的例子

  public class ScheduledExecutorServiceTest {

  public static void main(String[] args)

  throws InterruptedException ExecutionException{

  //*

  ScheduledExecutorService service = ExecutorsnewScheduledThreadPool();

  //*

  Runnable task = new Runnable() {

  public void run() {

  Systemoutprintln(Task repeating);

  }

  };

  //*

  final ScheduledFuture future =

  servicescheduleAtFixedRate(task TimeUnitSECONDS);

  //*

  ScheduledFuture future = serviceschedule(new Callable(){

  public String call(){

  futurecancel(true);

  return task cancelled!;

  }

  } TimeUnitSECONDS);

  Systemoutprintln(futureget());

  //*

  serviceshutdown();

  }

  }

  這個例子有兩個任務第一個任務每隔一秒打印一句Task repeating第二個任務在秒鐘後取消第一個任務

  *: 初始化一個ScheduledExecutorService對象這個對象的線程池大小為

  *: 用內函數的方式定義了一個Runnable任務

  *: 調用所定義的ScheduledExecutorService對象來執行任務任務每秒執行一次能重復執行的任務一定是Runnable類型注意我們可以用TimeUnit來制定時間單位這也是Java 裡新的特征以前的記時單位是微秒現在可精確到奈秒

  *: 調用ScheduledExecutorService對象來執行第二個任務第二個任務所作的就是在秒鐘後取消第一個任務

  *: 關閉服務

  Executors類

  雖然以上提到的接口有其實現的具體類但為了方便Java 建議使用Executors的工具類來得到Executor接口的具體對象需要注意的是Executors是一個類不是Executor的復數形式Executors提供了以下一些static的方法


callable(Runnable task): 將Runnable的任務轉化成Callable的任務
newSingleThreadExecutor: 產生一個ExecutorService對象這個對象只有一個線程可用來執行任務若任務多於一個任務將按先後順序執行
newCachedThreadPool(): 產生一個ExecutorService對象這個對象帶有一個線程池線程池的大小會根據需要調整線程執行完任務後返回線程池供執行下一次任務使用
newFixedThreadPool(int poolSize)產生一個ExecutorService對象這個對象帶有一個大小為poolSize的線程池若任務數量大於poolSize任務會被放在一個queue裡順序執行
newSingleThreadScheduledExecutor產生一個ScheduledExecutorService對象這個對象的線程池大小為若任務多於一個任務將按先後順序執行
newScheduledThreadPool(int poolSize): 產生一個ScheduledExecutorService對象這個對象的線程池大小為poolSize若任務數量大於poolSize任務會在一個queue裡等待執行

  以下是得到和使用ExecutorService的例子

  代碼如何調用Executors來獲得各種服務對象

  //Single Threaded ExecutorService

  ExecutorService singleThreadeService = ExecutorsnewSingleThreadExecutor();

  //Cached ExecutorService

  ExecutorService cachedService = ExecutorsnewCachedThreadPool();

  //Fixed number of ExecutorService

  ExecutorService fixedService = ExecutorsnewFixedThreadPool();

  //Single ScheduledExecutorService

  ScheduledExecutorService singleScheduledService =

  ExecutorsnewSingleThreadScheduledExecutor();

  //Fixed number of ScheduledExecutorService

  ScheduledExecutorService fixedScheduledService =

  ExecutorsnewScheduledThreadPool();

Lockers和Condition接口

  在多線程編程裡面一個重要的概念是鎖定如果一個資源是多個線程共享的為了保證數據的完整性在進行事務性操作時需要將共享資源鎖定這樣可以保證在做事務性操作時只有一個線程能對資源進行操作從而保證數據的完整性以前鎖定的功能是由Synchronized關鍵字來實現的這樣做存在幾個問題


每次只能對一個對象進行鎖定若需要鎖定多個對象編程就比較麻煩一不小心就會出現死鎖現象
如果線程因拿不到鎖定而進入等待狀況是沒有辦法將其打斷的

  在Java 裡出現兩種鎖的工具可供使用下圖是這兩個工具的接口及其實現

  

   

   

  Lock接口

  ReentrantLock是Lock的具體類Lock提供了以下一些方法


lock(): 請求鎖定如果鎖已被別的線程鎖定調用此方法的線程被阻斷進入等待狀態
tryLock()如果鎖沒被別的線程鎖定進入鎖定狀態並返回true若鎖已被鎖定返回false不進入等待狀態此方法還可帶時間參數如果鎖在方法執行時已被鎖定線程將繼續等待規定的時間若還不行才返回false
unlock()取消鎖定需要注意的是Lock不會自動取消編程時必須手動解鎖

  代碼

  //生成一個鎖

  Lock lock = new ReentrantLock();

  public void accessProtectedResource() {

  locklock(); //取得鎖定

  try {

  //對共享資源進行操作

  } finally {

  //一定記著把鎖取消掉鎖本身是不會自動解鎖的

  lockunlock();

  }

  }

  ReadWriteLock接口

  為了提高效率有些共享資源允許同時進行多個讀的操作但只允許一個寫的操作比如一個文件只要其內容不變可以讓多個線程同時讀不必做排他的鎖定排他的鎖定只有在寫的時候需要以保證別的線程不會看到數據不完整的文件ReadWriteLock可滿足這種需要ReadWriteLock內置兩個Lock一個是讀的Lock一個是寫的Lock多個線程可同時得到讀的Lock但只有一個線程能得到寫的Lock而且寫的Lock被鎖定後任何線程都不能得到LockReadWriteLock提供的方法有


readLock(): 返回一個讀的lock
writeLock(): 返回一個寫的lock 此lock是排他的

  ReadWriteLock的例子

  public class FileOperator{

  //初始化一個ReadWriteLock

  ReadWriteLock lock = new ReentrantReadWriteLock();

  public String read() {

  //得到readLock並鎖定

  Lock readLock = lockreadLock();

  readLocklock();

  try {

  //做讀的工作

  return Read something;

  } finally {

  readLockunlock();

  }

  }

  

  public void write(String content) {

  //得到writeLock並鎖定

  Lock writeLock = lockwriteLock();

  writeLocklock();

  try {

  //做讀的工作

  } finally {

  writeLockunlock();

  }

  }

  }

  

  需要注意的是ReadWriteLock提供了一個高效的鎖定機理但最終程序的運行效率是和程序的設計息息相關的比如說如果讀的線程和寫的線程同時在等待要考慮是先發放讀的lock還是先發放寫的lock如果寫發生的頻率不高而且快可以考慮先給寫的lock還要考慮的問題是如果一個寫正在等待讀完成此時一個新的讀進來是否要給這個新的讀發鎖如果發了可能導致寫的線程等很久等等此類問題在編程時都要給予充分的考慮

  Condition接口

  有時候線程取得lock後需要在一定條件下才能做某些工作比如說經典的Producer和Consumer問題Consumer必須在籃子裡有蘋果的時候才能吃蘋果否則它必須暫時放棄對籃子的鎖定等到Producer往籃子裡放了蘋果後再去拿來吃而Producer必須等到籃子空了才能往裡放蘋果否則它也需要暫時解鎖等Consumer把蘋果吃了才能往籃子裡放蘋果在Java 以前這種功能是由Object類的wait() notify()和notifyAll()等方法實現的裡面這些功能集中到了Condition這個接口來實現Condition提供以下方法


await()使調用此方法的線程放棄鎖定進入睡眠直到被打斷或被喚醒
signal(): 喚醒一個等待的線程
signalAll()喚醒所有等待的線程

  Condition的例子

  public class Basket {     

  Lock lock = new ReentrantLock();

  //產生Condition對象

  Condition produced = locknewCondition();

  Condition consumed = locknewCondition();

  boolean available = false;

  

  public void produce() throws InterruptedException {

  locklock();

  try {

  if(available){

  consumedawait(); //放棄lock進入睡眠 

  }

  /*生產蘋果*/

  Systemoutprintln(Apple produced);

  available = true;

  producedsignal(); //發信號喚醒等待這個Condition的線程

  } finally {

  lockunlock();

  }

  }

  

  public void consume() throws InterruptedException {

  locklock();

  try {

  if(!available){

  producedawait();//放棄lock進入睡眠 

  }

  /*吃蘋果*/

  Systemoutprintln(Apple consumed);

  available = false;

  consumedsignal();//發信號喚醒等待這個Condition的線程

  } finally {

  lockunlock();

  }

  }     

  }

  ConditionTester:

  public class ConditionTester {

  

  public static void main(String[] args) throws InterruptedException{

  final Basket basket = new Basket();

  //定義一個producer

  Runnable producer = new Runnable() {

  public void run() {

  try {

  basketproduce();

  } catch (InterruptedException ex) {

  exprintStackTrace();

  }

  }

  };

  //定義一個consumer

  Runnable consumer = new Runnable() {

  public void run() {

  try {

  nsume();

  } catch (InterruptedException ex) {

  exprintStackTrace();

  }

  }

  };

  //各產生個consumer和producer

  ExecutorService service = ExecutorsnewCachedThreadPool();

  for(int i=; i < ; i++)

  servicesubmit(consumer);

  Threadsleep();

  for(int i=; i<; i++)

  servicesubmit(producer);

  serviceshutdown();

  }     

  }

: Synchronizer同步裝置

  Java 裡新加了個協調線程間進程的同步裝置它們分別是Semaphore CountDownLatch CyclicBarrier和Exchanger

  Semaphore:

  用來管理一個資源池的工具Semaphore可以看成是個通行證線程要想從資源池拿到資源必須先拿到通行證Semaphore提供的通行證數量和資源池的大小一致如果線程暫時拿不到通行證線程就會被阻斷進入等待狀態以下是一個例子

  public class Pool {

  ArrayList pool = null;

  Semaphore pass = null;

  public Pool(int size){

  //初始化資源池

  pool = new ArrayList();

  for(int i=; i

  pool.add("Resource "+i);

  }

  //Semaphore的大小和資源池的大小一致

  pass = new Semaphore(size);

  }

  public String get() throws InterruptedException{

  //獲取通行證,只有得到通行證後才能得到資源

  pass.acquire();

  return getResource();

  }

  public void put(String resource){

  //歸還通行證,並歸還資源

  pass.release();

  releaseResource(resource);

  }

  private synchronized String getResource() {

  String result = pool.get(0);

  pool.remove(0);

  System.out.println("Give out "+result);

  return result;

  }

  private synchronized void releaseResource(String resource) {

  System.out.println("return "+resource);

  pool.add(resource);

  }

  }

  SemaphoreTest:

  public class SemaphoreTest {

  public static void main(String[] args){

  final Pool aPool = new Pool(2);

  Runnable worker = new Runnable() {

  public void run() {

  String resource = null;

  try {

  //取得resource

  resource = aPool.get();

  } catch (InterruptedException ex) {

  ex.printStackTrace();

  }

  //用resource做工作

  System.out.println("I worked on "+resource);

  //歸還resource

  aPool.put(resource);

  }

  };

  ExecutorService service = Executors.newCachedThreadPool();

  for(int i=0; i<20; i++){

  service.submit(worker);

  }

  service.shutdown();

  }    

  }

   

   

  CountDownLatch:

  CountDownLatch是個計數器,它有一個初始數,等待這個計數器的線程必須等到計數器倒數到零時才可繼續。TW.WinGWit.com比如說一個Server啟動時需要初始化4個部件,Server可以同時啟動4個線程去初始化這4個部件,然後調用CountDownLatch(4).await()阻斷進入等待,每個線程完成任務後會調用一次untDown()來倒計數, 當4個線程都結束時CountDownLatch的計數就會降低為0,此時Server就會被喚醒繼續下一步操作。CountDownLatch的方法主要有:


await():使調用此方法的線程阻斷進入等待
countDown(): 倒計數,將計數值減1
getCount(): 得到當前的計數值

  CountDownLatch的例子:一個server調了三個ComponentThread分別去啟動三個組件,然後server等到組件都啟動了再繼續。

  public class Server {

  public static void main(String[] args) throws InterruptedException{

  System.out.println("Server is starting.");

  //初始化一個初始值為3的CountDownLatch

  CountDownLatch latch = new CountDownLatch(3);

  //起3個線程分別去啟動3個組件

  ExecutorService service = Executors.newCachedThreadPool();

  service.submit(new ComponentThread(latch, 1));

  service.submit(new ComponentThread(latch, 2));

  service.submit(new ComponentThread(latch, 3));

  service.shutdown();

  //進入等待狀態

  latch.await();

  //當所需的三個組件都完成時,Server就可繼續了

  System.out.println("Server is up!");

  }

  }

  

  public class ComponentThread implements Runnable{

  CountDownLatch latch;

  int ID;

  /** Creates a new instance of ComponentThread */

  public ComponentThread(CountDownLatch latch, int ID) {

  this.latch = latch;

  this.ID = ID;

  }

  public void run() {

  System.out.println("Component "+ID + " initialized!");

  //將計數減一

  untDown();

  }    

  }

  運行結果:

  Server is starting.

  Component 1 initialized!

  Component 3 initialized!

  Component 2 initialized!

  Server is up!

  

  CyclicBarrier:

  CyclicBarrier類似於CountDownLatch也是個計數器,不同的是CyclicBarrier數的是調用了CyclicBarrier.await()進入等待的線程數,當線程數達到了CyclicBarrier初始時規定的數目時,所有進入等待狀態的線程被喚醒並繼續。CyclicBarrier就象它名字的意思一樣,可看成是個障礙,所有的線程必須到齊後才能一起通過這個障礙。CyclicBarrier初始時還可帶一個Runnable的參數,此Runnable任務在CyclicBarrier的數目達到後,所有其它線程被喚醒前被執行。

  CyclicBarrier提供以下幾個方法:


await():進入等待
getParties():返回此barrier需要的線程數
reset():將此barrier重置

  以下是使用CyclicBarrier的一個例子:兩個線程分別在一個數組裡放一個數,當這兩個線程都結束後,主線程算出數組裡的數的和(這個例子比較無聊,我沒有想到更合適的例子)

  public class MainThread {

  public static void main(String[] args)

  throws InterruptedException, BrokenBarrierException, TimeoutException{

  final int[] array = new int[2];

  CyclicBarrier barrier = new CyclicBarrier(2,

  new Runnable() {//在所有線程都到達Barrier時執行

  public void run() {

  System.out.println("Total is:"+(array[0]+array[1]));

  }

  });           

  //啟動線程

  new Thread(new ComponentThread(barrier, array, 0)).start();

  new Thread(new ComponentThread(barrier, array, 1)).start();   

  }     

  }

  

  public class ComponentThread implements Runnable{

  CyclicBarrier barrier;

  int ID;

  int[] array;

  public ComponentThread(CyclicBarrier barrier, int[] array, int ID) {

  this.barrier = barrier;

  this.ID = ID;

  this.array = array;

  }

  public void run() {

  try {

  array[ID] = new Random().nextInt();

  System.out.println(ID+ " generates:"+array[ID]);

  //該線程完成了任務等在Barrier處

  barrier.await();

  } catch (BrokenBarrierException ex) {

  ex.printStackTrace();

  } catch (InterruptedException ex) {

  ex.printStackTrace();

  }

  }

  }

  Exchanger:

  顧名思義Exchanger讓兩個線程可以互換信息。用一個例子來解釋比較容易。例子中服務生線程往空的杯子裡倒水,顧客線程從裝滿水的杯子裡喝水,然後通過Exchanger雙方互換杯子,服務生接著往空杯子裡倒水,顧客接著喝水,然後交換,如此周而復始。

  class FillAndEmpty {

  //初始化一個Exchanger,並規定可交換的信息類型是DataCup

  Exchanger exchanger = new Exchanger();

  Cup initialEmptyCup = ...; //初始化一個空的杯子

  Cup initialFullCup = ...; //初始化一個裝滿水的杯子

  //服務生線程

  class Waiter implements Runnable {

  public void run() {

  Cup currentCup = initialEmptyCup;

  try {

  //往空的杯子裡加水

  currentCup.addWater();

  //杯子滿後和顧客的空杯子交換

  currentCup = exchanger.exchange(currentCup);

  } catch (InterruptedException ex) { ... handle ... }

  }

  }

  //顧客線程

  class Customer implements Runnable {

  public void run() {

  DataCup currentCup = initialFullCup;

  try {

  //把杯子裡的水喝掉

  currentCup.drinkFromCup();

  //將空杯子和服務生的滿杯子交換

  currentCup = exchanger.exchange(currentCup);

  } catch (InterruptedException ex) { ... handle ...}

  }

  }

  

  void start() {

  new Thread(new Waiter()).start();

  new Thread(new Customer()).start();

  }

  }

6: BlockingQueue接口

  BlockingQueue是一種特殊的Queue,若BlockingQueue是空的,從BlockingQueue取東西的操作將會被阻斷進入等待狀態直到BlocingkQueue進了新貨才會被喚醒。同樣,如果BlockingQueue是滿的任何試圖往裡存東西的操作也會被阻斷進入等待狀態,直到BlockingQueue裡有新的空間才會被喚醒繼續操作。BlockingQueue提供的方法主要有:


add(anObject): 把anObject加到BlockingQueue裡,如果BlockingQueue可以容納返回true,否則拋出IllegalStateException異常。
offer(anObject):把anObject加到BlockingQueue裡,如果BlockingQueue可以容納返回true,否則返回false。
put(anObject):把anObject加到BlockingQueue裡,如果BlockingQueue沒有空間,調用此方法的線程被阻斷直到BlockingQueue裡有新的空間再繼續。
poll(time):取出BlockingQueue裡排在首位的對象,若不能立即取出可等time參數規定的時間。取不到時返回null。
take():取出BlockingQueue裡排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的對象被加入為止。

  根據不同的需要BlockingQueue有4種具體實現:


ArrayBlockingQueue:規定大小的BlockingQueue,其構造函數必須帶一個int參數來指明其大小。其所含的對象是以FIFO(先入先出)順序排序的。
LinkedBlockingQueue:大小不定的BlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定。其所含的對象是以FIFO(先入先出)順序排序的。LinkedBlockingQueue和ArrayBlockingQueue比較起來,它們背後所用的數據結構不一樣,導致LinkedBlockingQueue的數據吞吐量要大於ArrayBlockingQueue,但在線程數量很大時其性能的可預見性低於ArrayBlockingQueue。
PriorityBlockingQueue:類似於LinkedBlockingQueue,但其所含對象的排序不是FIFO,而是依據對象的自然排序順序或者是構造函數所帶的Comparator決定的順序。
SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的。

  下面是用BlockingQueue來實現Producer和Consumer的例子:

  public class BlockingQueueTest {

  static BlockingQueue basket;

  public BlockingQueueTest() {

  //定義了一個大小為2的BlockingQueue,也可根據需要用其他的具體類

  basket = new ArrayBlockingQueue(2);

  }

  class Producor implements Runnable {

  public void run() {

  while(true){

  try {

  //放入一個對象,若basket滿了,等到basket有位置

  basket.put("An apple");

  } catch (InterruptedException ex) {

  ex.printStackTrace();

  }

  }

  }

  }

  class Consumer implements Runnable {

  public void run() {

  while(true){

  try {

  //取出一個對象,若basket為空,等到basket有東西為止

  String result = basket.take();

  } catch (InterruptedException ex) {

  ex.printStackTrace();

  }

  }

  }           

  }

  public void execute(){

  for(int i=0; i<10; i++){

  new Thread(new Producor()).start();

  new Thread(new Consumer()).start();

  }           

  }

  public static void main(String[] args){

  BlockingQueueTest test = new BlockingQueueTest();

  test.execute();

  }     

  }

7:Atomics 原子級變量

  原子量級的變量,主要的類有AtomicBoolean, AtomicInteger, AotmicIntegerArray, AtomicLong, AtomicLongArray, AtomicReference ……。這些原子量級的變量主要提供兩個方法:


compareAndSet(expectedValue, newValue): 比較當前的值是否等於expectedValue,若等於把當前值改成newValue,並返回true。若不等,返回false。
getAndSet(newValue): 把當前值改為newValue,並返回改變前的值。

  這些原子級變量利用了現代處理器(CPU)的硬件支持可把兩步操作合為一步的功能,避免了不必要的鎖定,提高了程序的運行效率。

8:Concurrent Collections 共點聚集

  在Java的聚集框架裡可以調用Collections.synchronizeCollection(aCollection)將普通聚集改變成同步聚集,使之可用於多線程的環境下。 但同步聚集在一個時刻只允許一個線程訪問它,其它想同時訪問它的線程會被阻斷,導致程序運行效率不高。Java 5.0裡提供了幾個共點聚集類,它們把以前需要幾步才能完成的操作合成一個原子量級的操作,這樣就可讓多個線程同時對聚集進行操作,避免了鎖定,從而提高了程序的運行效率。Java 5.0目前提供的共點聚集類有:ConcurrentHashMap, ConcurrentLinkedQueue, CopyOnWriteArrayList和CopyOnWriteArraySet.


From:http://tw.wingwit.com/Article/program/Java/gj/201311/27633.html
    推薦文章
    Copyright © 2005-2013 電腦知識網 Computer Knowledge   All rights reserved.