熱點推薦:
您现在的位置: 電腦知識網 >> 編程 >> Java編程 >> Java核心技術 >> 正文

用java多線程斷點續傳實踐

2013-11-23 18:58:42  來源: Java核心技術 

  /**

  * authorannegu

  * date

  */

  annegu做了一個簡單的Http多線程的下載程序來討論一下多線程並發下載以及斷點續傳的問題

  這個程序的功能就是可以分多個線程從目標地址上下載數據每個線程負責下載一部分並可以支持斷點續傳和超時重連

  下載的方法是download()它接收兩個參數分別是要下載的頁面的url和編碼方式在這個負責下載的方法中主要分了三個步驟第一步是用來設置斷點續傳時候的一些信息的第二步就是主要的分多線程來下載了最後是數據的合並

  多線程下載

  /**
*/
public String download(String urlStr String charset) {
    thischarset = charset;
    long contentLength = ;
        CountDownLatch latch = new CountDownLatch(threadNum);
    long[] startPos = new long[threadNum];
    long endPos = ;

  try {
        // 從url中獲得下載的文件格式與名字
        thisfileName = urlStrsubstring(urlStrlastIndexOf(/) + );

  thisurl = new URL(urlStr);
        URLConnection con = urlopenConnection();
        setHeader(con);
        // 得到content的長度
        contentLength = congetContentLength();
        // 把context分為threadNum段的話每段的長度
        thisthreadLength = contentLength / threadNum;

  // 第一步分析已下載的臨時文件設置斷點如果是新的下載任務則建立目標文件在第點中說明
        startPos = setThreadBreakpoint(fileDir fileName contentLength startPos);

  //第二步分多個線程下載文件
        ExecutorService exec = ExecutorsnewCachedThreadPool();
        for (int i = ; i < threadNum; i++) {
            // 創建子線程來負責下載數據每段數據的起始位置為(threadLength * i + 已下載長度)
            startPos[i] += threadLength * i;

  /**//*設置子線程的終止位置非最後一個線程即為(threadLength * (i + ) )
            最後一個線程的終止位置即為下載內容的長度*/
            if (i == threadNum ) {
                endPos = contentLength;
            } else {
                endPos = threadLength * (i + ) ;
            }
            // 開啟子線程並執行
            ChildThread thread = new ChildThread(this latch i startPos[i] endPos);
            childThreads[i] = thread;
            execexecute(thread);
        }

  try {
            // 等待CountdownLatch信號為表示所有子線程都結束
                latchawait();
            execshutdown();

  // 第三步把分段下載下來的臨時文件中的內容寫入目標文件中在第點中說明
            tempFileToTargetFile(childThreads);

  } catch (InterruptedException e) {
            eprintStackTrace();
        }
}

  首先來看最主要的步驟多線程下載

  首先從url中提取目標文件的名稱並在對應的目錄創建文件然後取得要下載的文件大小根據分成的下載線程數量平均分配每個線程需要下載的數據量就是threadLength然後就可以分多個線程來進行下載任務了

  在這個例子中並沒有直接顯示的創建Thread對象而是用Executor來管理Thread對象並且用CachedThreadPool來創建的線程池當然也可以用FixedThreadPoolCachedThreadPool在程序執行的過程中會創建與所需數量相同的線程當程序回收舊線程的時候就停止創建新線程FixedThreadPool可以預先新建參數給定個數的線程這樣就不用在創建任務的時候再來創建線程了可以直接從線程池中取出已准備好的線程下載線程的數量是通過一個全局變量threadNum來控制的默認為

  好了個子線程已經通過Executor來創建了下面它們就會各自為政互不干涉的執行了線程有兩種實現方式實現Runnable接口繼承Thread類

  ChildThread就是子線程它作為DownloadTask的內部類繼承了Thread它的構造方法需要個參數依次是一個對DownloadTask的引用一個CountDownLatchid(標識線程的id號)startPosition(下載內容的開始位置)endPosition(下載內容的結束位置)

  這個CountDownLatch是做什麼用的呢?

  現在我們整理一下思路要實現分多個線程來下載數據的話我們肯定還要把這多個線程下載下來的數據進行合主線程必須等待所有的子線程都執行結束之後才能把所有子線程的下載數據按照各自的id順序進行合並CountDownLatch就是來做這個工作的

  CountDownLatch用來同步主線程強制主線程等待所有的子線程執行的下載操作完成在主線程中CountDownLatch對象被設置了一個初始計數器就是子線程的個數代碼①處在新建了個子線程並開始執行之後主線程用CountDownLatch的await()方法來阻塞主線程直到這個計數器的值到達才會進行下面的操作代碼②處

  對每個子線程來說在執行完下載指定區間與長度的數據之後必須通過調用CountDownLatch的countDown()方法來把這個計數器減

  在全面開啟下載任務之後主線程就開始阻塞等待子線程執行完畢所以下面我們來看一下具體的下載線程ChildThread

  /**
*author by
*/
public class ChildThread extends Thread {
    public static final int STATUS_HASNOT_FINISHED = ;
    public static final int STATUS_HAS_FINISHED = ;
    public static final int STATUS_HTTPSTATUS_ERROR = ;
    private DownloadTask task;
    private int id;
    private long startPosition;
    private long endPosition;
    private final CountDownLatch latch;
    private File tempFile = null;
    //線程狀態碼
    private int status = ChildThreadSTATUS_HASNOT_FINISHED;

  public ChildThread(DownloadTask task CountDownLatch latch int id long startPos long endPos) {
        super();
        thistask = task;
        thisid = id;
        thisstartPosition = startPos;
        thisendPosition = endPos;
        thislatch = latch;

  try {
            tempFile = new File(thistaskfileDir + thistaskfileName + _ + id);
            if(!tempFileexists()){
                tempFilecreateNewFile();
            }
        } catch (IOException e) {
            eprintStackTrace();
        }

  }

  public void run() {
        Systemoutprintln(Thread + id + run );
        HttpURLConnection con = null;
        InputStream inputStream = null;
        BufferedOutputStream outputStream = null;
        int count = ;
        long threadDownloadLength = endPosition startPosition;

  try {
            outputStream = new BufferedOutputStream(new FileOutputStream(tempFilegetPath() true));
        } catch (FileNotFoundException e) {
            eprintStackTrace();
        }

  ③       for(;;){
④           startPosition += count;
            try {
                //打開URLConnection
                con = (HttpURLConnection) taskurlopenConnection();
                setHeader(con);
                consetAllowUserInteraction(true);
                //設置連接超時時間為ms
⑤               consetConnectTimeout();
                //設置讀取數據超時時間為ms
                consetReadTimeout();

  if(startPosition < endPosition){
                    //設置下載數據的起止區間
                    consetRequestProperty(Range bytes= + startPosition +
                            + endPosition);
                    Systemoutprintln(Thread + id + startPosition is + startPosition);
                    Systemoutprintln(Thread + id + endPosition is + endPosition);

  //判斷http status是否為HTTP/ Partial Content或者 OK
                    //如果不是以上兩種狀態把status改為STATUS_HTTPSTATUS_ERROR
⑥                   if (congetResponseCode() != HttpURLConnectionHTTP_OK
                            && congetResponseCode() != HttpURLConnectionHTTP_PARTIAL) {
                        Systemoutprintln(Thread + id + : code =
                                + congetResponseCode() + status =
                                + congetResponseMessage());
                        status = ChildThreadSTATUS_HTTPSTATUS_ERROR;
                        thistaskstatusError = true;
                        outputStreamclose();
                        condisconnect();
                        Systemoutprintln(Thread + id + finished);
                        untDown();
                        break;
                    }

  inputStream = congetInputStream();

  int len = ;
                    byte[] b = new byte[];
                    while ((len = inputStreamread(b)) != ) {
                        outputStreamwrite(b len);
                        count += len;

  //每讀滿個byte往磁盤上flush一下
                        if(count % == ){
⑦                           outputStreamflush();
                        }
                    }

  Systemoutprintln(count is + count);
                    if(count >= threadDownloadLength){
                        hasFinished = true;
                    }
⑧                   outputStreamflush();
                    outputStreamclose();
                    inputStreamclose();
                    condisconnect();
                }

  Systemoutprintln(Thread + id + finished);
                untDown();
                break;
            } catch (IOException e) {
                try {
⑨                   outputStreamflush();
⑩                   TimeUnitSECONDSsleep(getSleepSeconds());
                } catch (InterruptedException e) {
                    eprintStackTrace();
                } catch (IOException e) {
                    eprintStackTrace();
                }
                continue;
            }
        }
    }
}

  在ChildThread的構造方法中除了設置一些從主線程中帶來的id 起始位置之外就是新建了一個臨時文件用來存放當前線程的下載數據臨時文件的命名規則是這樣的下載的目標文件名+_+線程編號

  現在讓我們來看看從網絡中讀數據是怎麼讀的我們通過URLConnection來獲得一個http的連接有些網站為了安全起見會對請求的http連接進行過濾因此為了偽裝這個http的連接請求我們給httpHeader穿一件偽裝服下面的setHeader方法展示了一些非常常用的典型的httpHeader的偽裝方法比較重要的有UerAgent模擬從Ubuntu的firefox浏覽器發出的請求Referer模擬浏覽器請求的前一個觸發頁面例如從skycn站點來下載軟件的話Referer設置成skycn的首頁域名就可以了Range就是這個連接獲取的流文件的起始區間
 private void setHeader(URLConnection con) {
    consetRequestProperty(UserAgent Mozilla/ (X; U; Linux i; enUS; rv:) Gecko/ Ubuntu/ (hardy) Firefox/);
    consetRequestProperty(AcceptLanguage enusen;q=zhcn;q=);
    consetRequestProperty(AcceptEncoding aa);
    consetRequestProperty(AcceptCharset ISOutf;q=*;q=);
    consetRequestProperty(KeepAlive );
    consetRequestProperty(Connection keepalive);
    consetRequestProperty(IfModifiedSince Fri Jan :: GMT);
    consetRequestProperty(IfNoneMatch \ddfd\);
    consetRequestProperty(CacheControl maxage=);
    consetRequestProperty(Referer );
}

  另外為了避免線程因為網絡原因而阻塞設置了ConnectTimeout和ReadTimeout代碼⑤⑥處setConnectTimeout設置的連接的超時時間而setReadTimeout設置的是讀取數據的超時時間發生超時的話就會拋出socketTimeout異常兩個方法的參數都是超時的毫秒數

  這裡對超時的發生采用的是等候一段時間重新連接的方法整個獲取網絡連接並讀取下載數據的過程都包含在一個循環之中(代碼③處)如果發生了連接或者讀取數據的超時在拋出的異常裡面就會sleep一定的時間(代碼⑩處)然後continue再次嘗試獲取連接並讀取數據這個時間可以通過setSleepSeconds()方法來設置我們在迅雷等下載工具的使用中經常可以看到狀態欄會輸出類似連接超時等待*秒後重試的話這個就是通過ConnectTimeoutReadTimeout來實現的

  連接建立好之後我們要檢查一下返回響應的狀態碼常見的Http Response Code有以下幾種

  a) OK 一切正常對GET和POST請求的應答文檔跟在後面

  b) Partial Content 客戶發送了一個帶有Range頭的GET請求服務器完成

  c) Not Found 無法找到指定位置的資源這也是一個常用的應答

  d) Request URI Too Long URI太長

  e) Requested Range Not Satisfiable 服務器不能滿足客戶在請求中指定的Range頭

  f) Internal Server Error 服務器遇到了意料不到的情況不能完成客戶的請求

  g) Service Unavailable 服務器由於維護或者負載過重未能應答例如Servlet可能在數據庫連接池已滿的情況下返回

  在這些狀態裡面只有才是我們需要的正確的狀態所以在代碼⑥處進行了狀態碼的判斷如果返回不符合要求的狀態碼則結束線程返回主線程並提示報錯

  假設一切正常下面我們就要考慮從網絡中讀數據了正如我之前在分析mysql的數據庫驅動中看的一樣網絡中發送數據都是以數據包的形式來發送的也就是說不管是客戶端向服務器發出的請求數據還是從服務器返回給客戶端的響應數據都會被拆分成若干個小型數據包在網絡中傳遞等數據包到達了目的地網絡接口會依據數據包的編號來組裝它們成為完整的比特數據因此我們可以想到在這裡也是一樣的我們用inputStream的read方法來通過網卡從網絡中讀取數據並不一定一次就能把所有的數據包都讀完所以我們要不斷的循環來從inputStream中讀取數據Read方法有一個int型的返回值表示每次從inputStream中讀取的字節數如果把這個inputStream中的數據讀完了那麼就返回 Read方法最多可以有三個參數byte b[]是讀取數據之後存放的目標數組off標識了目標數組中存儲的開始位置len是想要讀取的數據長度這個長度必定不能大於b[]的長度

  public synchronized int read(byte b[] int off int len)

  我們的目標是要把目標地址的內容下載下來現在分了個線程來分段下載那麼這些分段下載的數據保存在哪裡呢?如果把它們都保存在內存中是非常糟糕的做法如果文件相當之大例如是一個視頻的話難道把這麼大的數據都放在內存中嗎這樣的話萬一連接中斷那前面下載的東西就都沒有了?我們當然要想辦法及時的把下載的數據刷到磁盤上保存下來當用bt下載視頻的時候通常都會有個臨時文件當視頻完全下載結束之後這個臨時文件就會被刪除那麼下次繼續下載的時候就會接著上次下載的點繼續下載所以我們的outputStream就是往這個臨時文件來輸出了

  OutputStream的write方法和上面InputStream的read方法有類似的參數byte b[]是輸出數據的來源off標識了開始位置len是數據長度

  public synchronized void write(byte b[] int off int len) throws IOException在往臨時文件的outputStream中寫數據的時候我會加上一個計數器每滿個比特就往文件中flush一下(代碼⑦處)

  對於輸出流的flush有些要注意的地方在程序中有三個地方調用了outputStreamflush()第一個是在循環的讀取網絡數據並往outputStream中寫入的時候每滿個byte就flush一下(代碼⑦處)第二個是循環之後(代碼⑧處)這時候正常的讀取寫入操作已經完成但是outputStream中還有沒有刷入磁盤的數據所以要flush一下才能關閉連接第三個就是在異常中的flush(代碼⑨處)因為如果發生了連接超時或者讀取數據超時的話就會直接跑到catch的exception中去這個時候outputStream中的數據如果不flush的話重新連接的時候這部分數據就會丟失了另外當拋出異常重新連接的時候下載的起始位置也要重新設置(代碼④處)count就是用來標識已經下載的字節數的把count+startPosition就是新一次連接需要的下載起始位置了

  現在每個分段的下載線程都順利結束了也都創建了相應的臨時文件接下來在主線程中會對臨時文件進行合並並寫入目標文件最後刪除臨時文件這部分很簡單就是一個對所有下載線程進行遍歷的過程這裡outputStream也有兩次flush與上面類似不再贅述

  /**author by */
private void tempFileToTargetFile(ChildThread[] childThreads) {
    try {
        BufferedOutputStream outputStream = new BufferedOutputStream(
                new FileOutputStream(fileDir + fileName));

  // 遍歷所有子線程創建的臨時文件按順序把下載內容寫入目標文件中
        for (int i = ; i < threadNum; i++) {
            if (statusError) {
                for (int k = ; k < threadNum; k++) {
                    if (childThreads[k]tempFilelength() == )
                        childThreads[k]tempFiledelete();
                }
                Systemoutprintln(本次下載任務不成功請重新設置線程數);
                break;
            }

  BufferedInputStream inputStream = new BufferedInputStream(
                    new FileInputStream(childThreads[i]tempFile));
            Systemoutprintln(Now is file + childThreads[i]id);
            int len = ;
            int count = ;
            byte[] b = new byte[];
            while ((len = inputStreamread(b)) != ) {
                count += len;
                outputStreamwrite(b len);
                if ((count % ) == ) {
                    outputStreamflush();
                }

  // b = new byte[];
            }

  inputStreamclose();
            // 刪除臨時文件
            if (childThreads[i]status == ChildThreadSTATUS_HAS_FINISHED) {
                childThreads[i]tempFiledelete();
            }
        }

  outputStreamflush();
        outputStreamclose();
    } catch (FileNotFoundException e) {
        eprintStackTrace();
    } catch (IOException e) {
        eprintStackTrace();
    }
}

    最後說說斷點續傳前面為了實現斷點續傳在每個下載線程中都創建了一個臨時文件現在我們就要利用這個臨時文件來設置斷點的位置由於臨時文件的命名方式都是固定的所以我們就專門找對應下載的目標文件的臨時文件臨時文件中已經下載的字節數就是我們需要的斷點位置startPos是一個數組存放了每個線程的已下載的字節數

  //第一步分析已下載的臨時文件設置斷點如果是新的下載任務則建立目標文件

  private long[] setThreadBreakpoint(String fileDir String fileName
        long contentLength long[] startPos) {
    File file = new File(fileDir + fileName);
    long localFileSize = filelength();

  if (fileexists()) {
        Systemoutprintln(file + fileName + has exists!);
        // 下載的目標文件已存在判斷目標文件是否完整
        if (localFileSize < contentLength) {
            Systemoutprintln(Now download continue  );

  // 遍歷目標文件的所有臨時文件設置斷點的位置即每個臨時文件的長度
            File tempFileDir = new File(fileDir);
            File[] files = tempFileDirlistFiles();
            for (int k = ; k < fileslength; k++) {
                String tempFileName = files[k]getName();
                // 臨時文件的命名方式為目標文件名+_+編號
                if (tempFileName != null && files[k]length() >
                        && tempFileNamestartsWith(fileName + _)) {
                    int fileLongNum = IntegerparseInt(tempFileName
                            substring(tempFileNamelastIndexOf(_) +
                                    tempFileNamelastIndexOf(_) + ));
                    // 為每個線程設置已下載的位置
                    startPos[fileLongNum] = files[k]length();
                }
            }
        }
    } else {
        // 如果下載的目標文件不存在則創建新文件
        try {
            filecreateNewFile();
        } catch (IOException e) {
            eprintStackTrace();
        }
    }

  return startPos;
}

    測試
 public class DownloadStartup {
    private static final String encoding = utf;
    public static void main(String[] args) {
        DownloadTask downloadManager = new DownloadTask();
        String urlStr = toolszip;
        downloadManagersetSleepSeconds();
        downloadManagerdownload(urlStr encoding);
    }
}


From:http://tw.wingwit.com/Article/program/Java/hx/201311/26073.html
    推薦文章
    Copyright © 2005-2013 電腦知識網 Computer Knowledge   All rights reserved.