熱點推薦:
您现在的位置: 電腦知識網 >> 編程 >> Java編程 >> Java高級技術 >> 正文

用Java多線程實現無阻塞讀取遠程文件

2013-11-23 19:53:14  來源: Java高級技術 

  我是不怎麼贊同使用Java多線程下載的加之有的鏈接下載速度本身就比較快所以在下載速度足夠的情況下就讓下載線程退出直到只剩下一個下載線程當然多線程中令人頭痛的死鎖問題HttpURLConnection的超時阻塞問題都會使代碼看起來異常復雜

  簡要介紹一下使用Java多線程實現無阻塞讀取遠程文件的方法將緩沖區buf[]分為每塊K下載線程負責向緩沖區寫數據每次寫一塊讀線程(BuffRandAcceURL類)每次讀小於K的任意字節同步描述寫/寫互斥等待空閒塊寫/寫並發填寫buf[]讀/寫並發使用buf[]

  經過我很長一段時間使用我認為比較滿意地實現了我的目標同其它MP播放器對比我的這種方法能夠比較流暢穩定地下載並播放我把實現多線程下載緩沖的方法寫出來不足之處懇請批評指正

  一HttpReader類功能HTTP協議從指定URL讀取數據

  /** *//**

  * author by

  */

  package instream;

  import javaioIOException;

  import javaioInputStream;

  import HttpURLConnection;

  import URL;

  public final class HttpReader {

  public static final int MAX_RETRY = ;

  private static long content_length;

  private URL url;

  private HttpURLConnection httpConnection;

  private InputStream in_stream;

  private long cur_pos;           //用於決定seek方法中是否執行文件定位

  private int connect_timeout;

  private int read_timeout;

  public HttpReader(URL u) {

  this(u );

  }

  public HttpReader(URL u int connect_timeout int read_timeout) {

  nnect_timeout = connect_timeout;

  thisread_timeout = read_timeout;

  url = u;

  if (content_length == ) {

  int retry = ;

  while (retry < HttpReaderMAX_RETRY)

  try {

  thisseek();

  content_length = ();

  break;

  } catch (Exception e) {

  retry++;

  }

  }

  }

  public static long getContentLength() {

  return content_length;

  }

  public int read(byte[] b int off int len) throws IOException {

  int r = in_streamread(b off len);

  cur_pos += r;

  return r;

  }

  public int getData(byte[] b int off int len) throws IOException {

  int r rema = len;

  while (rema > ) {

  if ((r = in_streamread(b off rema)) == ) {

  return ;

  }

  rema = r;

  off += r;

  cur_pos += r;

  }

  return len;

  }

  public void close() {

  if (httpConnection != null) {

  ();

  httpConnection = null;

  }

  if (in_stream != null) {

  try {

  in_streamclose();

  } catch (IOException e) {}

  in_stream = null;

  }

  url = null;

  }

  /**//*

  * 拋出異常通知再試

  * 響應碼可能是由某種暫時的原因引起的例如同一IP頻繁的連接請求可能遭服務器拒絕

  */

  public void seek(long start_pos) throws IOException {

  if (start_pos == cur_pos && in_stream != null)

  return;

  if (httpConnection != null) {

  ();

  httpConnection = null;

  }

  if (in_stream != null) {

  in_streamclose();

  in_stream = null;

  }

  httpConnection = (HttpURLConnection) urlopenConnection();

  (connect_timeout);

  (read_timeout);

  String sProperty = bytes= + start_pos + ;

  (Range sProperty);

  //(Connection KeepAlive);

  int responseCode = ();

  if (responseCode < || responseCode >= ) {

  try {

  Threadsleep();

  } catch (InterruptedException e) {

  eprintStackTrace();

  }

  throw new IOException(HTTP responseCode=+responseCode);

  }

  in_stream = ();

  cur_pos = start_pos;

  }

  }

  二IWriterCallBack接口功能實現讀/寫通信

  package instream;

  public interface IWriterCallBack {

  public boolean tryWriting(Writer w) throws InterruptedException;

  public void updateBuffer(int i int len);

  public void updateWriterCount();

  public void terminateWriters();

  }

  三Writer類下載線程負責向buf[]寫數據

  /** *//**

  *

  */

  package instream;

  import javaioIOException;

  import URL;

  public final class Writer implements Runnable {

  private static boolean isalive = true;

  private byte[] buf;

  private IWriterCallBack icb;

  protected int index;            //buf[]內索引號

  protected long start_pos;       //index對應的文件位置(相對於文件首的偏移量)

  protected int await_count;      //用於判斷:下載速度足夠就退出一個線程

  private HttpReader hr;

  public Writer(IWriterCallBack call_back URL u byte[] b int i) {

  hr = new HttpReader(u);

  if(HttpReadergetContentLength() == )  //實例化HttpReader對象都不成功

  return;

  icb = call_back;

  buf = b;

  Thread t = new Thread(thisdt_+i);

  tsetPriority(ThreadNORM_PRIORITY + );

  tstart();

  }

  public void run() {

  int write_bytes= write_pos= rema = retry = ;

  boolean cont = true;

  while (cont) {

  try {

  // 等待空閒塊

  if(retry == ) {

  if (icbtryWriting(this) == false)

  break;

  write_bytes = ;

  rema = BuffRandAcceURLUNIT_LENGTH;

  write_pos = index << BuffRandAcceURLUNIT_LENGTH_BITS;

  }

  // 定位

  hrseek(start_pos);

  // 下載一塊

  int w;

  while (rema > && isalive) {

  w = (rema < ) ? rema : ; //每次讀幾K合適?

  if ((w = hrread(buf write_pos w)) == ) {

  cont = false;

  break;

  }

  rema = w;

  write_pos += w;

  start_pos += w;

  write_bytes += w;

  }

  //通知線程

  retry = ;

  icbupdateBuffer(index write_bytes);

  } catch (InterruptedException e) {

  isalive = false;

  icbterminateWriters();

  break;

  } catch (IOException e) {

  if(++retry == HttpReaderMAX_RETRY) {

  isalive = false;

  icbterminateWriters();

  break;

  }

  }

  }

  icbupdateWriterCount();

  try {

  hrclose();

  } catch (Exception e) {}

  hr = null;

  buf = null;

  icb = null;

  }

  }

  四IRandomAccess接口

  無阻塞讀取遠程文件中需要隨機讀取文件接口BuffRandAcceURL類和BuffRandAcceFile類實現接口方法BuffRandAcceFile類實現讀取本地磁盤文件這兒就不給出其源碼了

  package instream;

  public interface IRandomAccess {

  public int read() throws Exception;

  public int read(byte b[]) throws Exception;

  public int read(byte b[] int off int len) throws Exception;

  public int dump(int src_off byte b[] int dst_off int len) throws Exception;

  public void seek(long pos) throws Exception;

  public long length();

  public long getFilePointer();

  public void close();

  }

  五BuffRandAcceURL類功能創建下載線程read方法從buf[]讀數據

  關鍵是如何簡單有效防止死鎖?以下只是我的一次嘗試請指正

  /** *//**

  * ;

  */

  package instream;

  import URL;

  import URLDecoder;

  import decodeHeader;

  import tagMPTag;

  import tagTagThread;

  public final class BuffRandAcceURL implements IRandomAccess IWriterCallBack {

  public static final int UNIT_LENGTH_BITS = ;                  //K

  public static final int UNIT_LENGTH = << UNIT_LENGTH_BITS;

  public static final int BUF_LENGTH = UNIT_LENGTH << ;            //

  public static final int UNIT_COUNT = BUF_LENGTH >> UNIT_LENGTH_BITS;

  public static final int BUF_LENGTH_MASK = (BUF_LENGTH );

  private static final int MAX_WRITER = ;

  private static long file_pointer;

  private static int read_pos;

  private static int fill_bytes;

  private static byte[] buf;      //同時也作讀寫同步鎖:bufwait()/bufnotify()

  private static int[] buf_bytes;

  private static int buf_index;

  private static int alloc_pos;

  private static URL url = null;

  private static boolean isalive = true;

  private static int writer_count;

  private static int await_count;

  private long file_length;

  private long frame_bytes;

  public BuffRandAcceURL(String sURL) throws Exception {

  this(sURLMAX_WRITER);

  }

  public BuffRandAcceURL(String sURL int download_threads) throws Exception {

  buf = new byte[BUF_LENGTH];

  buf_bytes = new int[UNIT_COUNT];

  url = new URL(sURL);

  //創建線程以異步方式解析ID

  new TagThread(url);

  //打印當前文件名

  try {

  String s = URLDecoderdecode(sURL GBK);

  Systemoutprintln(start>> + ssubstring(slastIndexOf(/) + ));

  s = null;

  } catch (Exception e) {

  Systemoutprintln(start>> + sURL);

  }

  //創建線程

  for(int i = ; i < download_threads; i++)

  new Writer(this url buf i+);

  frame_bytes = file_length = HttpReadergetContentLength();

  if(file_length == ) {

  HeaderstrLastErr = 連接URL出錯重試 + HttpReaderMAX_RETRY + 次後放棄;

  throw new Exception(retry + HttpReaderMAX_RETRY);

  }

  writer_count = download_threads;

  //緩沖

  try_cache();

  //跳過ID v

  MPTag mPTag = new MPTag();

  int v_size = mPTagcheckIDV(buf);

  if (v_size > ) {

  frame_bytes = v_size;

  //seek(v_size):

  fill_bytes = v_size;

  file_pointer = v_size;

  read_pos = v_size;

  read_pos &= BUF_LENGTH_MASK;

  int units = v_size >> UNIT_LENGTH_BITS;

  for(int i = ; i < units; i++) {

  buf_bytes[i] = ;

  thisnotifyWriter();

  }

  buf_bytes[units] = v_size;

  thisnotifyWriter();

  }

  mPTag = null;

  }

  private void try_cache() throws InterruptedException {

  int cache_size = BUF_LENGTH;

  if(cache_size > (int)file_length alloc_pos)

  cache_size = (int)file_length alloc_pos;

  cache_size = UNIT_LENGTH;

  //等待填寫當前正在讀的那一塊緩沖區

  /**//*if(fill_bytes >= cache_size && writer_count > ) {

  synchronized (buf) {

  bufwait();

  }

  return;

  }*/

  //等待填滿緩沖區

  while (fill_bytes < cache_size) {

  if (writer_count == || isalive == false)

  return;

  if(BUF_LENGTH > (int)file_length alloc_pos)

  cache_size = (int)file_length alloc_pos UNIT_LENGTH;

  Systemoutprintf(\r[緩沖%$f%%] (float)fill_bytes / cache_size * );

  synchronized (buf) {

  bufwait();

  }

  }

  Systemoutprintf(\r);

  }

  private int try_reading(int i int len) throws Exception {

  int n = (i == UNIT_COUNT ) ? : (i + );

  int r = (buf_bytes[i] == ) ? : (buf_bytes[i] + buf_bytes[n]);

  while (r < len) {

  if (writer_count == || isalive == false)

  return r;

  try_cache();

  r = (buf_bytes[i] == ) ? : (buf_bytes[i] + buf_bytes[n]);

  }

  return len;

  }

  /**//*

  * 各個線程互斥等待空閒塊

  */

  public synchronized boolean tryWriting(Writer w) throws InterruptedException {

  await_count++;

  while (buf_bytes[buf_index] != && isalive) {

  thiswait();

  }

  //下載速度足夠就結束一個線程

  if(writer_count > && wawait_count >= await_count &&

  wawait_count >= writer_count)

  return false;

  if(alloc_pos >= file_length)

  return false;

  wawait_count = await_count;

  await_count;

  wstart_pos = alloc_pos;

  windex = buf_index;

  alloc_pos += UNIT_LENGTH;

  buf_index = (buf_index == UNIT_COUNT ) ? : buf_index + ;

  return isalive;

  }

  public void updateBuffer(int i int len) {

  synchronized (buf) {

  buf_bytes[i] = len;

  fill_bytes += len;

  bufnotify();

  }

  }

  public void updateWriterCount() {

  synchronized (buf) {

  writer_count;

  bufnotify();

  }

  }

  public synchronized void notifyWriter() {

  thisnotifyAll();

  }

  public void terminateWriters() {

  synchronized (buf) {

  if (isalive) {

  isalive = false;

  HeaderstrLastErr = 讀取文件超時重試 + HttpReaderMAX_RETRY

  + 次後放棄請您稍後再試;

  }

  bufnotify();

  }

  notifyWriter();

  }

  public int read() throws Exception {

  int iret = ;

  int i = read_pos >> UNIT_LENGTH_BITS;

  // 等待字節可讀

  while (buf_bytes[i] < ) {

  try_cache();

  if (writer_count == )

  return ;

  }

  if(isalive == false)

  return ;

  // 讀取

  iret = buf[read_pos] & xff;

  fill_bytes;

  file_pointer++;

  read_pos++;

  read_pos &= BUF_LENGTH_MASK;

  if (buf_bytes[i] == )

  notifyWriter();     // 通知

  return iret;

  }

  public int read(byte b[]) throws Exception {

  return read(b blength);

  }

  public int read(byte[] b int off int len) throws Exception {

  if(len > UNIT_LENGTH)

  len = UNIT_LENGTH;

  int i = read_pos >> UNIT_LENGTH_BITS;

  // 等待有足夠內容可讀

  if(try_reading(i len) < len || isalive == false)

  return ;

  // 讀取

  int tail_len = BUF_LENGTH read_pos; // write_pos != BUF_LENGTH

  if (tail_len < len) {

  Systemarraycopy(buf read_pos b off tail_len);

  Systemarraycopy(buf b off + tail_len len tail_len);

  } else

  Systemarraycopy(buf read_pos b off len);

  fill_bytes = len;

  file_pointer += len;

  read_pos += len;

  read_pos &= BUF_LENGTH_MASK;

  buf_bytes[i] = len;

  if (buf_bytes[i] < ) {

  int ni = read_pos >> UNIT_LENGTH_BITS;

  buf_bytes[ni] += buf_bytes[i];

  buf_bytes[i] = ;

  notifyWriter();

  } else if (buf_bytes[i] == )

  notifyWriter();

  return len;

  }

  /**//*

  * 從src_off位置復制不移動文件指針

  */

  public int dump(int src_off byte b[] int dst_off int len) throws Exception {

  int rpos = read_pos + src_off;

  if(try_reading(rpos >> UNIT_LENGTH_BITS len) < len || isalive == false)

  return ;

  int tail_len = BUF_LENGTH rpos;

  if (tail_len < len) {

  Systemarraycopy(buf rpos b dst_off tail_len);

  Systemarraycopy(buf b dst_off + tail_len len tail_len);

  } else

  Systemarraycopy(buf rpos b dst_off len);

  // 不發信號

  return len;

  }

  public long length() {

  return file_length;

  }

  public long getFilePointer() {

  return file_pointer;

  }

  public void close() {

  //

  }

  //

  public void seek(long pos) throws Exception {

  //

  }

  }


From:http://tw.wingwit.com/Article/program/Java/gj/201311/27619.html
    推薦文章
    Copyright © 2005-2013 電腦知識網 Computer Knowledge   All rights reserved.