熱點推薦:
您现在的位置: 電腦知識網 >> 編程 >> Java編程 >> Java高級技術 >> 正文

任務列表分派給多個線程的策略和方法

2013-11-23 19:47:42  來源: Java高級技術 

  多線程下載由來已久如 FlashGetNetAnts 等工具它們都是依懶於 HTTP 協議的支持(Range 字段指定請求內容范圍)首先能讀取出請求內容 (即欲下載的文件) 的大小劃分出若干區塊把區塊分段分發給每個線程去下載線程從本段起始處下載數據及至段尾多個線程下載的內容最終會寫入到同一個文件中

  只研究有用的工作中的需求要把多個任務分派給多個線程去執行這其中就會有一個任務列表指派到線程的策略思考已知 一個待執行的任務列表 指定要啟動的線程數;問題是每個線程實際要執行哪些任務

  策略是任務列表連續按線程數分段先保證每線程平均能分配到的任務數余下的任務從前至後依次附加到線程中只是數量上實際每個線程執行的任務都還是連續的如果出現那種僧多(線程) 粥(任務) 少的情況實際啟動的線程數就等於任務數一挑一這裡只實現了每個線程各掃自家門前雪動作快的完成後眼見別的線程再累都是愛莫能助

  實現及演示代碼如下由三個類實現寫在了一個 java 文件中TaskDistributor 為任務分發器Task 為待執行的任務WorkThread 為自定的工作線程代碼中運用了命令模式如若能配以監聽器用上觀察者模式來控制 UI 顯示就更絕妙不過了就能實現像下載中的區塊著色跳躍的動感了在此定義下一步的著眼點了

  代碼中有較為詳細的注釋看這些注釋和執行結果就很容易理解的main() 是測試方法

  
 package mon;
  import javautilArrayList;
  import javautilList;
  /**
  * 指派任務列表給線程的分發器
  * @author Unmi
  * QQ:  Email: 
  * MSN:  
  */
  public class TaskDistributor {
  /**
  * 測試方法
  * @param args
  */
  public static void main(String[] args) {
  //初始化要執行的任務列表
  List taskList = new ArrayList();
  for (int i = ; i < ; i++) {
  taskListadd(new Task(i));
  }
  //設定要啟動的工作線程數為  個
  int threadCount = ;
  List[] taskListPerThread = distributeTasks(taskList threadCount);
  Systemoutprintln(實際要啟動的工作線程數+taskListPerThreadlength);
  for (int i = ; i < taskListPerThreadlength; i++) {
  Thread workThread = new WorkThread(taskListPerThread[i]i);
  workThreadstart();
  }
  }
  /**
  * 把 List 中的任務分配給每個線程先平均分配剩於的依次附加給前面的線程
  * 返回的數組有多少個元素 (List) 就表明將啟動多少個工作線程
  * @param taskList 待分派的任務列表
  * @param threadCount 線程數
  * @return 列表的數組每個元素中存有該線程要執行的任務列表
  */
  public static List[] distributeTasks(List taskList int threadCount) {
  // 每個線程至少要執行的任務數假如不為零則表示每個線程都會分配到任務
  int minTaskCount = taskListsize() / threadCount;
  // 平均分配後還剩下的任務數不為零則還有任務依個附加到前面的線程中
  int remainTaskCount = taskListsize() % threadCount;
  // 實際要啟動的線程數如果工作線程比任務還多
  // 自然只需要啟動與任務相同個數的工作線程一對一的執行
  // 畢竟不打算實現了線程池所以用不著預先初始化好休眠的線程
  int actualThreadCount = minTaskCount >  ? threadCount : remainTaskCount;
  // 要啟動的線程數組以及每個線程要執行的任務列表
  List[] taskListPerThread = new List[actualThreadCount];
  int taskIndex = ;
  //平均分配後多余任務每附加給一個線程後的剩余數重新聲明與 remainTaskCount
  //相同的變量不然會在執行中改變 remainTaskCount 原有值產生麻煩
  int remainIndces = remainTaskCount;
  for (int i = ; i < taskListPerThreadlength; i++) {
  taskListPerThread[i] = new ArrayList();
  // 如果大於零線程要分配到基本的任務
  if (minTaskCount > ) {
  for (int j = taskIndex; j < minTaskCount + taskIndex; j++) {
  taskListPerThread[i]add(taskListget(j));
  }
  taskIndex += minTaskCount;
  }
  // 假如還有剩下的則補一個到這個線程中
  if (remainIndces > ) {
  taskListPerThread[i]add(taskListget(taskIndex++));
  remainIndces;
  }
  }
  // 打印任務的分配情況
  for (int i = ; i < taskListPerThreadlength; i++) {
  Systemoutprintln(線程  + i +  的任務數 + 

  taskListPerThread[i]size() +  區間[
  + taskListPerThread[i]get()getTaskId() + 
  + taskListPerThread[i]get(taskListPerThread[i]size()  )getTaskId() + ]);
  }
  return taskListPerThread;
  }
  }
  /**
  * 要執行的任務可在執行時改變它的某個狀態或調用它的某個操作
  * 例如任務有三個狀態就緒運行完成默認為就緒態
  * 要進一步完善可為 Task 加上狀態變遷的監聽器因之決定UI的顯示
  */
  class Task {
  public static final int READY = ;
  public static final int RUNNING = ;
  public static final int FINISHED = ;
  private int status;
  //聲明一個任務的自有業務含義的變量用於標識任務
  private int taskId;
  //任務的初始化方法
  public Task(int taskId){
  thisstatus = READY;
  thistaskId = taskId;
  }
  /**
  * 執行任務
  */
  public void execute() {
  // 設置狀態為運行中
  setStatus(TaskRUNNING);
  Systemoutprintln(當前線程 ID 是 + ThreadcurrentThread()getName()
  + | 任務 ID 是+thistaskId);
  // 附加一個延時
  try {
  Threadsleep();
  } catch (InterruptedException e) {
  eprintStackTrace();
  }
  // 執行完成改狀態為完成
  setStatus(FINISHED);
  }
  public void setStatus(int status) {
  thisstatus = status;
  }
  public int getTaskId() {
  return taskId;
  }
  }
  /**
  * 自定義的工作線程持有分派給它執行的任務列表
  */
  class WorkThread extends Thread {
  //本線程待執行的任務列表你也可以指為任務索引的起始值
  private List taskList = null;
  private int threadId;
  /**
  * 構造工作線程為其指派任務列表及命名線程 ID
  * @param taskList 欲執行的任務列表
  * @param threadId 線程 ID
  */
  public WorkThread(List taskListint threadId) {
  thistaskList = taskList;
  thisthreadId = threadId;
  }
  /**
  * 執行被指派的所有任務
  */
  public void run() {
  for (Task task : taskList) {
  taskexecute();
  }
  }
  }

  執行結果如下注意觀察每個線程分配到的任務數量及區間直到所有的線程完成了所分配到的任務後程序結束

  
    線程  的任務數 區間[]
  線程  的任務數 區間[]
  線程  的任務數 區間[]
  線程  的任務數 區間[]
  線程  的任務數 區間[]
  實際要啟動的工作線程數
  當前線程 ID 是Thread | 任務 ID 是
  當前線程 ID 是Thread | 任務 ID 是
  當前線程 ID 是Thread | 任務 ID 是
  當前線程 ID 是Thread | 任務 ID 是
  當前線程 ID 是Thread | 任務 ID 是
  當前線程 ID 是Thread | 任務 ID 是
  當前線程 ID 是Thread | 任務 ID 是
  當前線程 ID 是Thread | 任務 ID 是

  

  上面坦白來只算是基本功夫貼出來還真見笑了還有更為復雜的功能

  像多線程的下載工具的確更充分利用了網絡資源而且像 FlashGetNetAnts 都實現了假如某個線程下載完了欲先所分配段的內容之後會幫其他線程下載未完成數據直到任務完成;或某一下載線程的未完成段區間已經很小了用不著別人來幫忙時這就涉及到任務的進一步分配再如以上兩個工具都能動態增加減小或中止線程越說越復雜了它們原本比這復雜多了這些實現可能定義各種隊列來實現如未完成任務隊列下載中任務隊列和已完成隊列難以細究了


From:http://tw.wingwit.com/Article/program/Java/gj/201311/27458.html
    推薦文章
    Copyright © 2005-2013 電腦知識網 Computer Knowledge   All rights reserved.