熱點推薦:
您现在的位置: 電腦知識網 >> 編程 >> Java編程 >> Java核心技術 >> 正文

Java實現通用線程池

2013-11-23 19:37:28  來源: Java核心技術 

  線程池通俗的描述就是預先創建若干空閒線程等到需要用多線程去處理事務的時候去喚醒某些空閒線程執行處理任務這樣就省去了頻繁創建線程的時間因為頻 繁創建線程是要耗費大量的CPU資源的如果一個應用程序需要頻繁地處理大量並發事務不斷的創建銷毀線程往往會大大地降低系統的效率這時候線程池就派 上用場了

  本文旨在使用Java語言編寫一個通用的線程池當需要使用線程池處理事務時只需按照指定規范封裝好事務處理對象然後用已有的線程池對象去自動選擇空 閒線程自動調用事務處理對象即可並實現線程池的動態修改(修改當前線程數最大線程數等)下面是實現代碼

  //ThreadTask java

  package polarmanthreadpool;

  /** *//**

  *線程任務

  * @author ryang

  *

  */

  public interface ThreadTask {

  public void run();

  }

  //PooledThreadjava

  package polarmanthreadpool;

  import javautilCollection; import javautilVector;

  /** *//**

  *接受線程池管理的線程

  * @author ryang

  *

  */

  public class PooledThread extends Thread {

  protected Vector tasks = new Vector();

  protected boolean running = false;

  protected boolean stopped = false;

  protected boolean paused = false;

  protected boolean killed = false;

  private ThreadPool pool;

  public PooledThread(ThreadPool pool){ thispool = pool;

  }

  public void putTask(ThreadTask task){ tasksadd(task);

  }

  public void putTasks(ThreadTask[] tasks){ for(int i=; i<taskslength; i++) thistasksadd(tasks[i]);

  }

  public void putTasks(Collection tasks){ thistasksaddAll(tasks);

  }

  protected ThreadTask popTask(){ if(taskssize() > ) return (ThreadTask)tasksremove();

  else

  return null;

  }

  public boolean isRunning(){

  return running;

  }

  public void stopTasks(){

  stopped = true;

  }

  public void stopTasksSync(){

  stopTasks();

  while(isRunning()){ try {

  sleep();

  } catch (InterruptedException e) {

  }

  }

  }

  public void pauseTasks(){

  paused = true;

  }

  public void pauseTasksSync(){

  pauseTasks();

  while(isRunning()){ try {

  sleep();

  } catch (InterruptedException e) {

  }

  }

  }

  public void kill(){ if(!running)

  interrupt();

  else

  killed = true;

  }

  public void killSync(){

  kill();

  while(isAlive()){ try {

  sleep();

  } catch (InterruptedException e) {

  }

  }

  }

  public synchronized void startTasks(){

  running = true;

  thisnotify();

  }

  public synchronized void run(){ try{ while(true){ if(!running || taskssize() == ){ poolnotifyForIdleThread(); //Systemoutprintln(ThreadcurrentThread()getId() + : 空閒); thiswait(); }else{

  ThreadTask task;

  while((task = popTask()) != null){ taskrun(); if(stopped){

  stopped = false;

  if(taskssize() > ){ tasksclear(); Systemoutprintln(ThreadcurrentThread()getId() + : Tasks are stopped);

  break;

  }

  }

  if(paused){

  paused = false;

  if(taskssize() > ){ Systemoutprintln(ThreadcurrentThread()getId() + : Tasks are paused);

  break;

  }

  }

  }

  running = false;

  }

  if(killed){

  killed = false;

  break;

  }

  }

  }catch(InterruptedException e){

  return;

  }

  //Systemoutprintln(ThreadcurrentThread()getId() + : Killed);

  }

  }

  //ThreadPooljava

  package polarmanthreadpool;

  import javautilCollection; import javautilIterator; import javautilVector;

  /** *//**

  *線程池

  * @author ryang

  *

  */

  public class ThreadPool {

  protected int maxPoolSize;

  protected int initPoolSize;

  protected Vector threads = new Vector();

  protected boolean initialized = false;

  protected boolean hasIdleThread = false;

  public ThreadPool(int maxPoolSize int initPoolSize){ thismaxPoolSize = maxPoolSize; thisinitPoolSize = initPoolSize;

  }

  public void init(){

  initialized = true;

  for(int i=; i<initPoolSize; i++){

  PooledThread thread = new PooledThread(this);

  threadstart(); threadsadd(thread);

  }

  //Systemoutprintln(線程池初始化結束線程數= + threadssize() + 最大線程數= + maxPoolSize);

  }

  public void setMaxPoolSize(int maxPoolSize){ //Systemoutprintln(重設最大線程數最大線程數= + maxPoolSize); thismaxPoolSize = maxPoolSize;

  if(maxPoolSize < getPoolSize())

  setPoolSize(maxPoolSize);

  }

  /** *//**

  *重設當前線程數

  * 若需殺掉某線程線程不會立刻殺掉而會等到線程中的事務處理完成* 但此方法會立刻從線程池中移除該線程不會等待事務處理結束

  * @param size

  */

  public void setPoolSize(int size){ if(!initialized){

  initPoolSize = size;

  return;

  }else if(size > getPoolSize()){ for(int i=getPoolSize(); i<size && i<maxPoolSize; i++){

  PooledThread thread = new PooledThread(this);

  threadstart(); threadsadd(thread);

  }

  }else if(size < getPoolSize()){ while(getPoolSize() > size){ PooledThread th = (PooledThread)threadsremove(); thkill();

  }

  }

  //Systemoutprintln(重設線程數線程數= + threadssize());

  }

  public int getPoolSize(){ return threadssize();

  }

  protected void notifyForIdleThread(){

  hasIdleThread = true;

  }

  protected boolean waitForIdleThread(){

  hasIdleThread = false;

  while(!hasIdleThread && getPoolSize() >= maxPoolSize){ try { Threadsleep(); } catch (InterruptedException e) {

  return false;

  }

  }

  return true;

  }

  public synchronized PooledThread getIdleThread(){ while(true){ for(Iterator itr=erator(); itrhasNext();){ PooledThread th = (PooledThread)itrnext(); if(!thisRunning())

  return th;

  }

  if(getPoolSize() < maxPoolSize){

  PooledThread thread = new PooledThread(this);

  threadstart(); threadsadd(thread);

  return thread;

  }

  //Systemoutprintln(線程池已滿等待);

  if(waitForIdleThread() == false)

  return null;

  }

  }

  public void processTask(ThreadTask task){

  PooledThread th = getIdleThread();

  if(th != null){ thputTask(task); thstartTasks();

  }

  }

  public void processTasksInSingleThread(ThreadTask[] tasks){

  PooledThread th = getIdleThread();

  if(th != null){ thputTasks(tasks); thstartTasks();

  }

  }

  public void processTasksInSingleThread(Collection tasks){

  PooledThread th = getIdleThread();

  if(th != null){ thputTasks(tasks); thstartTasks();

  }

  }

  }

  下面是線程池的測試程序

  //ThreadPoolTestjava

  import javaioBufferedReader; import javaioIOException; import javaioInputStreamReader;

  import polarmanthreadpoolThreadPool; import polarmanthreadpoolThreadTask;

  public class ThreadPoolTest {

  public static void main(String[] args) { Systemoutprintln(quit 退出); Systemoutprintln(task A 啟動任務A時長為); Systemoutprintln(size 設置當前線程池大小為); Systemoutprintln(max 設置線程池最大線程數為); Systemoutprintln();

  final ThreadPool pool = new ThreadPool( ); poolinit();

  Thread cmdThread = new Thread(){ public void run(){

  BufferedReader reader = new BufferedReader(new InputStreamReader(Systemin));

  while(true){ try { String line = readerreadLine(); String words[] = linesplit( ); if(words[]equalsIgnoreCase(quit)){ Systemexit(); }else if(words[]equalsIgnoreCase(size) && wordslength >= ){ try{ int size = IntegerparseInt(words[]); poolsetPoolSize(size); }catch(Exception e){

  }

  }else if(words[]equalsIgnoreCase(max) && wordslength >= ){ try{ int max = IntegerparseInt(words[]); poolsetMaxPoolSize(max); }catch(Exception e){

  }

  }else if(words[]equalsIgnoreCase(task) && wordslength >= ){ try{ int timelen = IntegerparseInt(words[]); SimpleTask task = new SimpleTask(words[] timelen * ); poolprocessTask(task); }catch(Exception e){

  }

  }

  } catch (IOException e) { eprintStackTrace();

  }

  }

  }

  };

  cmdThreadstart();

  /**//*

  for(int i=; i<; i++){

  SimpleTask task = new SimpleTask(Task + i (i+)*); poolprocessTask(task);

  }*/

  }

  }

  class SimpleTask implements ThreadTask{

  private String taskName;

  private int timeLen;

  public SimpleTask(String taskName int timeLen){ thistaskName = taskName; thistimeLen = timeLen;

  }

  public void run() { Systemoutprintln(ThreadcurrentThread()getId() +

  : START TASK + taskName + );

  try { Threadsleep(timeLen); } catch (InterruptedException e) {

  }

  Systemoutprintln(ThreadcurrentThread()getId() +

  : END TASK + taskName + );

  }

  }

  使用此線程池相當簡單下面兩行代碼初始化線程池

  ThreadPool pool = new ThreadPool( ); poolinit();

  要處理的任務實現ThreadTask接口即可(如測試代碼裡的SimpleTask)這個接口只有一個方法run()

  兩行代碼即可調用

  ThreadTask task = //實例化你的任務對象poolprocessTask(task);


From:http://tw.wingwit.com/Article/program/Java/hx/201311/27203.html
    推薦文章
    Copyright © 2005-2013 電腦知識網 Computer Knowledge   All rights reserved.