Java自
synchronized(lock){
//執行對共享資源的操作
……
}
一些復雜的功能就很難被實現
因為這些問題
在我們開始介紹Java
public class ServerThread {
Object concLock = new Object();
int count =
public void runTwoThreads() {
//啟動兩個線程去初始化組件
new Thread(new ComponentThread
new Thread(new ComponentThread
// Wait for other thread
while(count !=
synchronized(concLock) {
try {
concLock
System
} catch (InterruptedException ie) { //處理異常}
}
}
System
}
public void callBack() {
synchronized(concLock) {
count
concLock
}
}
public static void main(String[] args){
ServerThread server = new ServerThread();
server
}
}
public class ComponentThread
private ServerThread server;
public ComponentThread
this
}
public void run() {
//做組件初始化的工作
System
server
}
}
三個新加的多線程包
Java
ncurrent包含了常用的多線程工具
ncurrent
ncurrent
Callable 和 Future接口
Callable是類似於Runnable的接口
Callable規定的方法是call()
Callable的任務執行後可返回值
call()方法可拋出異常
運行Callable任務可拿到一個Future對象
以下是Callable的一個例子
public class DoCallStuff implements Callable
private int aInt;
public DoCallStuff(int aInt) {
this
}
public String call() throws Exception { //*
boolean resultOk = false;
if(aInt ==
resultOk = true;
} else if(aInt ==
while(true){ //infinite loop
System
Thread
}
} else {
throw new Exception(
}
if(resultOk){
return
} else {
return
}
}
}
*
*
*
以下是調用DoCallStuff的主程序
import ncurrent
import ncurrent
import ncurrent
import ncurrent
public class Executor {
public static void main(String[] args){
//*
DoCallStuff call
DoCallStuff call
DoCallStuff call
//*
ExecutorService es = Executors
//*
Future
Future
Future
try {
//*
System
//*
Thread
System
//*
System
} catch (ExecutionException ex) {
ex
} catch (InterruptedException ex) {
ex
}
}
}
*
*
*
*
*
*
運行Executor會有以下運行結果
looping
Task done
looping
looping
looping
looping
looping
looping
Thread
//*
ncurrent
at ncurrent
at ncurrent
at concurrent
……
*
*
*
*
新的任務執行架構
在Java
圖的左側是接口
Executor接口
是用來執行Runnable任務的
execute(Runnable command)
ExecutorService接口
ExecutorService繼承了Executor的方法
submit(task)
invokeAll(collection of tasks)
shutdown()
shutdownNow()
isTerminated()
isShutdown()
ScheduledExecutorService接口
在ExecutorService的基礎上
schedule(task
scheduleAtFixedRate()
scheduleWithFixedDelay()
代碼
public class ScheduledExecutorServiceTest {
public static void main(String[] args)
throws InterruptedException
//*
ScheduledExecutorService service = Executors
//*
Runnable task
public void run() {
System
}
};
//*
final ScheduledFuture future
service
//*
ScheduledFuture
public String call(){
future
return
}
}
System
//*
service
}
}
這個例子有兩個任務
*
*
*
*
*
Executors類
雖然以上提到的接口有其實現的具體類
callable(Runnable task): 將Runnable的任務轉化成Callable的任務
newSingleThreadExecutor: 產生一個ExecutorService對象
newCachedThreadPool(): 產生一個ExecutorService對象
newFixedThreadPool(int poolSize)
newSingleThreadScheduledExecutor
newScheduledThreadPool(int poolSize): 產生一個ScheduledExecutorService對象
以下是得到和使用ExecutorService的例子
代碼
//Single Threaded ExecutorService
ExecutorService singleThreadeService = Executors
//Cached ExecutorService
ExecutorService cachedService = Executors
//Fixed number of ExecutorService
ExecutorService fixedService = Executors
//Single ScheduledExecutorService
ScheduledExecutorService singleScheduledService =
Executors
//Fixed number of ScheduledExecutorService
ScheduledExecutorService fixedScheduledService =
Executors
Lockers和Condition接口
在多線程編程裡面一個重要的概念是鎖定
每次只能對一個對象進行鎖定
如果線程因拿不到鎖定而進入等待狀況
在Java
Lock接口
ReentrantLock是Lock的具體類
lock(): 請求鎖定
tryLock()
unlock()
代碼
//生成一個鎖
Lock lock = new ReentrantLock();
public void accessProtectedResource() {
lock
try {
//對共享資源進行操作
} finally {
//一定記著把鎖取消掉
lock
}
}
ReadWriteLock接口
為了提高效率有些共享資源允許同時進行多個讀的操作
readLock(): 返回一個讀的lock
writeLock(): 返回一個寫的lock
ReadWriteLock的例子
public class FileOperator{
//初始化一個ReadWriteLock
ReadWriteLock lock = new ReentrantReadWriteLock();
public String read() {
//得到readLock並鎖定
Lock readLock = lock
readLock
try {
//做讀的工作
return
} finally {
readLock
}
}
public void write(String content) {
//得到writeLock並鎖定
Lock writeLock = lock
writeLock
try {
//做讀的工作
} finally {
writeLock
}
}
}
需要注意的是ReadWriteLock提供了一個高效的鎖定機理
Condition接口
有時候線程取得lock後需要在一定條件下才能做某些工作
await()
signal(): 喚醒一個等待的線程
signalAll()
Condition的例子
public class Basket {
Lock lock = new ReentrantLock();
//產生Condition對象
Condition produced = lock
Condition consumed = lock
boolean available = false;
public void produce() throws InterruptedException {
lock
try {
if(available){
consumed
}
/*生產蘋果*/
System
available = true;
produced
} finally {
lock
}
}
public void consume() throws InterruptedException {
lock
try {
if(!available){
produced
}
/*吃蘋果*/
System
available = false;
consumed
} finally {
lock
}
}
}
ConditionTester:
public class ConditionTester {
public static void main(String[] args) throws InterruptedException{
final Basket basket = new Basket();
//定義一個producer
Runnable producer = new Runnable() {
public void run() {
try {
basket
} catch (InterruptedException ex) {
ex
}
}
};
//定義一個consumer
Runnable consumer = new Runnable() {
public void run() {
try {
nsume();
} catch (InterruptedException ex) {
ex
}
}
};
//各產生
ExecutorService service = Executors
for(int i=
service
Thread
for(int i=
service
service
}
}
: Synchronizer 同步裝置
Java
Semaphore:
用來管理一個資源池的工具
public class Pool {
ArrayList
Semaphore pass = null;
public Pool(int size){
//初始化資源池
pool = new ArrayList
for(int i=
pool.add("Resource "+i);
}
//Semaphore的大小和資源池的大小一致
pass = new Semaphore(size);
}
public String get() throws InterruptedException{
//獲取通行證,只有得到通行證後才能得到資源
pass.acquire();
return getResource();
}
public void put(String resource){
//歸還通行證,並歸還資源
pass.release();
releaseResource(resource);
}
private synchronized String getResource() {
String result = pool.get(0);
pool.remove(0);
System.out.println("Give out "+result);
return result;
}
private synchronized void releaseResource(String resource) {
System.out.println("return "+resource);
pool.add(resource);
}
}
SemaphoreTest:
public class SemaphoreTest {
public static void main(String[] args){
final Pool aPool = new Pool(2);
Runnable worker = new Runnable() {
public void run() {
String resource = null;
try {
//取得resource
resource = aPool.get();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
//用resource做工作
System.out.println("I worked on "+resource);
//歸還resource
aPool.put(resource);
}
};
ExecutorService service = Executors.newCachedThreadPool();
for(int i=0; i<20; i++){
service.submit(worker);
}
service.shutdown();
}
}
CountDownLatch:
CountDownLatch是個計數器,它有一個初始數,等待這個計數器的線程必須等到計數器倒數到零時才可繼續。TW.WInGWIT.cOm比如說一個Server啟動時需要初始化4個部件,Server可以同時啟動4個線程去初始化這4個部件,然後調用CountDownLatch(4).await()阻斷進入等待,每個線程完成任務後會調用一次untDown()來倒計數, 當4個線程都結束時CountDownLatch的計數就會降低為0,此時Server就會被喚醒繼續下一步操作。CountDownLatch的方法主要有:
await():使調用此方法的線程阻斷進入等待
countDown(): 倒計數,將計數值減1
getCount(): 得到當前的計數值
CountDownLatch的例子:一個server調了三個ComponentThread分別去啟動三個組件,然後server等到組件都啟動了再繼續。
public class Server {
public static void main(String[] args) throws InterruptedException{
System.out.println("Server is starting.");
//初始化一個初始值為3的CountDownLatch
CountDownLatch latch = new CountDownLatch(3);
//起3個線程分別去啟動3個組件
ExecutorService service = Executors.newCachedThreadPool();
service.submit(new ComponentThread(latch, 1));
service.submit(new ComponentThread(latch, 2));
service.submit(new ComponentThread(latch, 3));
service.shutdown();
//進入等待狀態
latch.await();
//當所需的三個組件都完成時,Server就可繼續了
System.out.println("Server is up!");
}
}
public class ComponentThread implements Runnable{
CountDownLatch latch;
int ID;
/** Creates a new instance of ComponentThread */
public ComponentThread(CountDownLatch latch, int ID) {
this.latch = latch;
this.ID = ID;
}
public void run() {
System.out.println("Component "+ID + " initialized!");
//將計數減一
untDown();
}
}
運行結果:
Server is starting.
Component 1 initialized!
Component 3 initialized!
Component 2 initialized!
Server is up!
CyclicBarrier:
CyclicBarrier類似於CountDownLatch也是個計數器,不同的是CyclicBarrier數的是調用了CyclicBarrier.await()進入等待的線程數,當線程數達到了CyclicBarrier初始時規定的數目時,所有進入等待狀態的線程被喚醒並繼續。CyclicBarrier就象它名字的意思一樣,可看成是個障礙,所有的線程必須到齊後才能一起通過這個障礙。CyclicBarrier初始時還可帶一個Runnable的參數,此Runnable任務在CyclicBarrier的數目達到後,所有其它線程被喚醒前被執行。
CyclicBarrier提供以下幾個方法:
await():進入等待
getParties():返回此barrier需要的線程數
reset():將此barrier重置
以下是使用CyclicBarrier的一個例子:兩個線程分別在一個數組裡放一個數,當這兩個線程都結束後,主線程算出數組裡的數的和(這個例子比較無聊,我沒有想到更合適的例子)
public class MainThread {
public static void main(String[] args)
throws InterruptedException, BrokenBarrierException, TimeoutException{
final int[] array = new int[2];
CyclicBarrier barrier = new CyclicBarrier(2,
new Runnable() {//在所有線程都到達Barrier時執行
public void run() {
System.out.println("Total is:"+(array[0]+array[1]));
}
});
//啟動線程
new Thread(new ComponentThread(barrier, array, 0)).start();
new Thread(new ComponentThread(barrier, array, 1)).start();
}
}
public class ComponentThread implements Runnable{
CyclicBarrier barrier;
int ID;
int[] array;
public ComponentThread(CyclicBarrier barrier, int[] array, int ID) {
this.barrier = barrier;
this.ID = ID;
this.array = array;
}
public void run() {
try {
array[ID] = new Random().nextInt();
System.out.println(ID+ " generates:"+array[ID]);
//該線程完成了任務等在Barrier處
barrier.await();
} catch (BrokenBarrierException ex) {
ex.printStackTrace();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
Exchanger:
顧名思義Exchanger讓兩個線程可以互換信息。用一個例子來解釋比較容易。例子中服務生線程往空的杯子裡倒水,顧客線程從裝滿水的杯子裡喝水,然後通過Exchanger雙方互換杯子,服務生接著往空杯子裡倒水,顧客接著喝水,然後交換,如此周而復始。
class FillAndEmpty {
//初始化一個Exchanger,並規定可交換的信息類型是DataCup
Exchanger
Cup initialEmptyCup = ...; //初始化一個空的杯子
Cup initialFullCup = ...; //初始化一個裝滿水的杯子
//服務生線程
class Waiter implements Runnable {
public void run() {
Cup currentCup = initialEmptyCup;
try {
//往空的杯子裡加水
currentCup.addWater();
//杯子滿後和顧客的空杯子交換
currentCup = exchanger.exchange(currentCup);
} catch (InterruptedException ex) { ... handle ... }
}
}
//顧客線程
class Customer implements Runnable {
public void run() {
DataCup currentCup = initialFullCup;
try {
//把杯子裡的水喝掉
currentCup.drinkFromCup();
//將空杯子和服務生的滿杯子交換
currentCup = exchanger.exchange(currentCup);
} catch (InterruptedException ex) { ... handle ...}
}
}
void start() {
new Thread(new Waiter()).start();
new Thread(new Customer()).start();
}
}
6: BlockingQueue接口
BlockingQueue是一種特殊的Queue,若BlockingQueue是空的,從BlockingQueue取東西的操作將會被阻斷進入等待狀態直到BlocingkQueue進了新貨才會被喚醒。同樣,如果BlockingQueue是滿的任何試圖往裡存東西的操作也會被阻斷進入等待狀態,直到BlockingQueue裡有新的空間才會被喚醒繼續操作。BlockingQueue提供的方法主要有:
add(anObject): 把anObject加到BlockingQueue裡,如果BlockingQueue可以容納返回true,否則拋出IllegalStateException異常。
offer(anObject):把anObject加到BlockingQueue裡,如果BlockingQueue可以容納返回true,否則返回false。
put(anObject):把anObject加到BlockingQueue裡,如果BlockingQueue沒有空間,調用此方法的線程被阻斷直到BlockingQueue裡有新的空間再繼續。
poll(time):取出BlockingQueue裡排在首位的對象,若不能立即取出可等time參數規定的時間。取不到時返回null。
take():取出BlockingQueue裡排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的對象被加入為止。
根據不同的需要BlockingQueue有4種具體實現:
ArrayBlockingQueue:規定大小的BlockingQueue,其構造函數必須帶一個int參數來指明其大小。其所含的對象是以FIFO(先入先出)順序排序的。
LinkedBlockingQueue:大小不定的BlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定。其所含的對象是以FIFO(先入先出)順序排序的。LinkedBlockingQueue和ArrayBlockingQueue比較起來,它們背後所用的數據結構不一樣,導致LinkedBlockingQueue的數據吞吐量要大於ArrayBlockingQueue,但在線程數量很大時其性能的可預見性低於ArrayBlockingQueue。
PriorityBlockingQueue:類似於LinkedBlockingQueue,但其所含對象的排序不是FIFO,而是依據對象的自然排序順序或者是構造函數所帶的Comparator決定的順序。
SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的。
下面是用BlockingQueue來實現Producer和Consumer的例子:
public class BlockingQueueTest {
static BlockingQueue
public BlockingQueueTest() {
//定義了一個大小為2的BlockingQueue,也可根據需要用其他的具體類
basket = new ArrayBlockingQueue
}
class Producor implements Runnable {
public void run() {
while(true){
try {
//放入一個對象,若basket滿了,等到basket有位置
basket.put("An apple");
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
public void run() {
while(true){
try {
//取出一個對象,若basket為空,等到basket有東西為止
String result = basket.take();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
public void execute(){
for(int i=0; i<10; i++){
new Thread(new Producor()).start();
new Thread(new Consumer()).start();
}
}
public static void main(String[] args){
BlockingQueueTest test = new BlockingQueueTest();
test.execute();
}
}
7:Atomics 原子級變量
原子量級的變量,主要的類有AtomicBoolean, AtomicInteger, AotmicIntegerArray, AtomicLong, AtomicLongArray, AtomicReference ……。這些原子量級的變量主要提供兩個方法:
compareAndSet(expectedValue, newValue): 比較當前的值是否等於expectedValue,若等於把當前值改成newValue,並返回true。若不等,返回false。
getAndSet(newValue): 把當前值改為newValue,並返回改變前的值。
這些原子級變量利用了現代處理器(CPU)的硬件支持可把兩步操作合為一步的功能,避免了不必要的鎖定,提高了程序的運行效率。
8:Concurrent Collections 共點聚集
在Java的聚集框架裡可以調用Collections.synchronizeCollection(aCollection)將普通聚集改變成同步聚集,使之可用於多線程的環境下。 但同步聚集在一個時刻只允許一個線程訪問它,其它想同時訪問它的線程會被阻斷,導致程序運行效率不高。Java 5.0裡提供了幾個共點聚集類,它們把以前需要幾步才能完成的操作合成一個原子量級的操作,這樣就可讓多個線程同時對聚集進行操作,避免了鎖定,從而提高了程序的運行效率。Java 5.0目前提供的共點聚集類有:ConcurrentHashMap, ConcurrentLinkedQueue, CopyOnWriteArrayList和CopyOnWriteArraySet.
From:http://tw.wingwit.com/Article/program/Java/gj/201311/27633.html