我是不怎麼贊同使用Java多線程下載的
簡要介紹一下使用Java多線程實現無阻塞讀取遠程文件的方法
經過我很長一段時間使用
一
/** *//**
* author by
*/
package instream;
import java
import java
import
import
public final class HttpReader {
public static final int MAX_RETRY =
private static long content_length;
private URL url;
private HttpURLConnection httpConnection;
private InputStream in_stream;
private long cur_pos; //用於決定seek方法中是否執行文件定位
private int connect_timeout;
private int read_timeout;
public HttpReader(URL u) {
this(u
}
public HttpReader(URL u
nnect_timeout = connect_timeout;
this
url = u;
if (content_length ==
int retry =
while (retry < HttpReader
try {
this
content_length = ();
break;
} catch (Exception e) {
retry++;
}
}
}
public static long getContentLength() {
return content_length;
}
public int read(byte[] b
int r = in_stream
cur_pos += r;
return r;
}
public int getData(byte[] b
int r
while (rema >
if ((r = in_stream
return
}
rema
off += r;
cur_pos += r;
}
return len;
}
public void close() {
if (httpConnection != null) {
();
httpConnection = null;
}
if (in_stream != null) {
try {
in_stream
} catch (IOException e) {}
in_stream = null;
}
url = null;
}
/**//*
* 拋出異常通知再試
* 響應碼
*/
public void seek(long start_pos) throws IOException {
if (start_pos == cur_pos && in_stream != null)
return;
if (httpConnection != null) {
();
httpConnection = null;
}
if (in_stream != null) {
in_stream
in_stream = null;
}
httpConnection = (HttpURLConnection) url
(connect_timeout);
(read_timeout);
String sProperty =
(
//(
int responseCode = ();
if (responseCode <
try {
Thread
} catch (InterruptedException e) {
e
}
throw new IOException(
}
in_stream = ();
cur_pos = start_pos;
}
}
二
package instream;
public interface IWriterCallBack {
public boolean tryWriting(Writer w) throws InterruptedException;
public void updateBuffer(int i
public void updateWriterCount();
public void terminateWriters();
}
三
/** *//**
*
*/
package instream;
import java
import
public final class Writer implements Runnable {
private static boolean isalive = true;
private byte[] buf;
private IWriterCallBack icb;
protected int index; //buf[]內
protected long start_pos; //index對應的文件位置(相對於文件首的偏移量)
protected int await_count; //用於判斷:下載速度足夠就退出一個
private HttpReader hr;
public Writer(IWriterCallBack call_back
hr = new HttpReader(u);
if(HttpReader
return;
icb = call_back;
buf = b;
Thread t = new Thread(this
t
t
}
public void run() {
int write_bytes=
boolean cont = true;
while (cont) {
try {
//
if(retry ==
if (icb
break;
write_bytes =
rema = BuffRandAcceURL
write_pos = index << BuffRandAcceURL
}
//
hr
//
int w;
while (rema >
w = (rema <
if ((w = hr
cont = false;
break;
}
rema
write_pos += w;
start_pos += w;
write_bytes += w;
}
//
retry =
icb
} catch (InterruptedException e) {
isalive = false;
icb
break;
} catch (IOException e) {
if(++retry == HttpReader
isalive = false;
icb
break;
}
}
}
icb
try {
hr
} catch (Exception e) {}
hr = null;
buf = null;
icb = null;
}
}
四
無阻塞讀取遠程文件中需要隨機讀取文件接口
package instream;
public interface IRandomAccess {
public int read() throws Exception;
public int read(byte b[]) throws Exception;
public int read(byte b[]
public int dump(int src_off
public void seek(long pos) throws Exception;
public long length();
public long getFilePointer();
public void close();
}
五
關鍵是如何簡單有效防止死鎖?以下只是我的一次嘗試
/** *//**
* ;
*/
package instream;
import
import
import decode
import tag
import tag
public final class BuffRandAcceURL implements IRandomAccess
public static final int UNIT_LENGTH_BITS =
public static final int UNIT_LENGTH =
public static final int BUF_LENGTH = UNIT_LENGTH <<
public static final int UNIT_COUNT = BUF_LENGTH >> UNIT_LENGTH_BITS;
public static final int BUF_LENGTH_MASK = (BUF_LENGTH
private static final int MAX_WRITER =
private static long file_pointer;
private static int read_pos;
private static int fill_bytes;
private static byte[] buf; //同時也作讀寫同步鎖:buf
private static int[] buf_bytes;
private static int buf_index;
private static int alloc_pos;
private static URL url = null;
private static boolean isalive = true;
private static int writer_count;
private static int await_count;
private long file_length;
private long frame_bytes;
public BuffRandAcceURL(String sURL) throws Exception {
this(sURL
}
public BuffRandAcceURL(String sURL
buf = new byte[BUF_LENGTH];
buf_bytes = new int[UNIT_COUNT];
url = new URL(sURL);
//創建線程以異步方式解析ID
new TagThread(url);
//打印當前文件名
try {
String s = URLDecoder
System
s = null;
} catch (Exception e) {
System
}
//創建
for(int i =
new Writer(this
frame_bytes = file_length = HttpReader
if(file_length ==
Header
throw new Exception(
}
writer_count = download_threads;
//緩沖
try_cache();
//跳過ID
MP
int v
if (v
frame_bytes
//seek(v
fill_bytes
file_pointer = v
read_pos = v
read_pos &= BUF_LENGTH_MASK;
int units = v
for(int i =
buf_bytes[i] =
this
}
buf_bytes[units]
this
}
mP
}
private void try_cache() throws InterruptedException {
int cache_size = BUF_LENGTH;
if(cache_size > (int)file_length
cache_size = (int)file_length
cache_size
//等待填寫當前正在讀的那
/**//*if(fill_bytes >= cache_size && writer_count >
synchronized (buf) {
buf
}
return;
}*/
//等待填滿緩沖區
while (fill_bytes < cache_size) {
if (writer_count ==
return;
if(BUF_LENGTH > (int)file_length
cache_size = (int)file_length
System
synchronized (buf) {
buf
}
}
System
}
private int try_reading(int i
int n = (i == UNIT_COUNT
int r = (buf_bytes[i] ==
while (r < len) {
if (writer_count ==
return r;
try_cache();
r = (buf_bytes[i] ==
}
return len;
}
/**//*
* 各個
*/
public synchronized boolean tryWriting(Writer w) throws InterruptedException {
await_count++;
while (buf_bytes[buf_index] !=
this
}
//下載速度足夠就結束一個
if(writer_count >
w
return false;
if(alloc_pos >= file_length)
return false;
w
await_count
w
w
alloc_pos += UNIT_LENGTH;
buf_index = (buf_index == UNIT_COUNT
return isalive;
}
public void updateBuffer(int i
synchronized (buf) {
buf_bytes[i] = len;
fill_bytes += len;
buf
}
}
public void updateWriterCount() {
synchronized (buf) {
writer_count
buf
}
}
public synchronized void notifyWriter() {
this
}
public void terminateWriters() {
synchronized (buf) {
if (isalive) {
isalive = false;
Header
+
}
buf
}
notifyWriter();
}
public int read() throws Exception {
int iret =
int i = read_pos >> UNIT_LENGTH_BITS;
//
while (buf_bytes[i] <
try_cache();
if (writer_count ==
return
}
if(isalive == false)
return
//
iret = buf[read_pos] &
fill_bytes
file_pointer++;
read_pos++;
read_pos &= BUF_LENGTH_MASK;
if (
notifyWriter(); //
return iret;
}
public int read(byte b[]) throws Exception {
return read(b
}
public int read(byte[] b
if(len > UNIT_LENGTH)
len = UNIT_LENGTH;
int i = read_pos >> UNIT_LENGTH_BITS;
//
if(try_reading(i
return
//
int tail_len = BUF_LENGTH
if (tail_len < len) {
System
System
} else
System
fill_bytes
file_pointer += len;
read_pos += len;
read_pos &= BUF_LENGTH_MASK;
buf_bytes[i]
if (buf_bytes[i] <
int ni = read_pos >> UNIT_LENGTH_BITS;
buf_bytes[ni] += buf_bytes[i];
buf_bytes[i] =
notifyWriter();
} else if (buf_bytes[i] ==
notifyWriter();
return len;
}
/**//*
* 從src_off位置復制
*/
public int dump(int src_off
int rpos = read_pos + src_off;
if(try_reading(rpos >> UNIT_LENGTH_BITS
return
int tail_len = BUF_LENGTH
if (tail_len < len) {
System
System
} else
System
// 不發信號
return len;
}
public long length() {
return file_length;
}
public long getFilePointer() {
return file_pointer;
}
public void close() {
//
}
//
public void seek(long pos) throws Exception {
//
}
}
From:http://tw.wingwit.com/Article/program/Java/gj/201311/27619.html