閒來沒事本來是在學習nio框架的突然發現對最原始的多線程服務器都不是很了解遂自己寫了個簡單的例子
package testmutithreadserverold;
import javaioIOException;
import ServerSocket;
import Socket;
import testmutithreadserveroldthreadpoolThreadPool;
/**
* 簡單阻塞式多線程服務器(線程池處理)
*
* @author zhangjun
*
*/
public class Server {
private int port;
private ServerSocket serverSocket;
private ThreadPool threadPool;
private PortListenThread listener;
public Server(int port) {
thisport = port;
threadPool = new ThreadPool();
}
public void start() {
try {
serverSocket = new ServerSocket(port);
listener = new PortListenThread();
listenerstart();
} catch (IOException e) {
eprintStackTrace();
}
}
public void shutdown() {
threadPoolshutdown();
listenerfinish();
}
private class PortListenThread extends Thread {
private Boolean finish = false;
@Override
public void run() {
while (!finish) {
try {
final Socket socket = serverSocketaccept();
threadPoolexecute(new Runnable() {
@Override
public void run() {
new TestMessage(socket)execute();
}
});
} catch (IOException e) {
eprintStackTrace();
}
}
}
public void finish() {
finish = true;
}
}
public static void main(String[] args) {
int port = ;
Systemoutprintln(server is listening on port: + port);
new Server(port)start();
}
}
這個Server調用的是自己實現的一個基於任務隊列的簡單線程池
package testmutithreadserveroldthreadpool;
import javautilLinkedList;
/**
* 簡單線程池 (基於工作隊列的同步線程池)
*
* @author zhangjun
*
*/
public class ThreadPool extends ThreadGroup {
private final static String THREADPOOL = thread pool;
private final static String WORKTHREAD = work thread ;
private final static int DEFAULTSIZE = RuntimegetRuntime()
availableProcessors() + ;
private LinkedList<Runnable> taskQueue;
private boolean isPoolClose = false;
public ThreadPool() {
this(DEFAULTSIZE);
}
public ThreadPool(int size) {
super(THREADPOOL);
setDaemon(true);
taskQueue = new LinkedList<Runnable>();
initWorkThread(size);
}
private void initWorkThread(int size) {
for (int i = ; i < size; i++) {
new WorkThread(WORKTHREAD + i)start();
}
try {
Threadsleep( * size);
} catch (InterruptedException e) {
}
}
public synchronized void execute(Runnable task) {
if (isPoolClose) {
throw new IllegalStateException();
}
if (task != null) {
taskQueueadd(task);
notify();
}
}
private synchronized Runnable getTask() throws InterruptedException {
if (taskQueuesize() == ) {
if (isPoolClose) {
return null;
}
wait();
}
if (taskQueuesize() == ) {
return null;
}
return taskQueueremoveFirst();
}
public void shutdown() {
waitFinish();
synchronized (this) {
isPoolClose = true;
interrupt();
taskQueueclear();
}
}
private void waitFinish() {
synchronized (this) {
isPoolClose = true;
notifyAll();
}
Thread[] threads = new Thread[activeCount()];
enumerate(threads);
try {
for (Thread t : threads) {
tjoin();
}
} catch (InterruptedException e) {
//swallow this
}
}
private class WorkThread extends Thread {
public WorkThread(String name) {
super(ThreadPoolthis name);
}
@Override
public void run() {
while (!isInterrupted()) {
Runnable task = null;
try {
task = getTask();
} catch (InterruptedException e) {
//swallow this
}
if (task == null) {
return;
}
try {
taskrun();
} catch (Throwable e) {
eprintStackTrace();
}
}
}
}
}
當然也可以直接使用concurrent的線程池代碼幾乎不用改變
package testncurrent;
import javaioIOException;
import ServerSocket;
import Socket;
import ncurrentExecutorService;
import ncurrentExecutors;
import testmutithreadserveroldTestMessage;
/**
* 簡單阻塞式多線程服務器(線程池處理)
*
* @author zhangjun
*
*/
public class Server {
private int port;
private ServerSocket serverSocket;
private ExecutorService threadPool;
private PortListenThread listener;
public Server(int port) {
thisport = port;
threadPool = ExecutorsnewFixedThreadPool();
}
public void start() {
try {
serverSocket = new ServerSocket(port);
listener = new PortListenThread();
listenerstart();
} catch (IOException e) {
eprintStackTrace();
}
}
public void shutdown() {
threadPoolshutdown();
listenerfinish();
}
private class PortListenThread extends Thread {
private Boolean finish = false;
@Override
public void run() {
while (!finish) {
try {
final Socket socket = serverSocketaccept();
threadPoolexecute(new Runnable() {
@Override
public void run() {
new TestMessage(socket)execute();
}
});
} catch (IOException e) {
eprintStackTrace();
}
}
}
public void finish() {
finish = true;
}
}
public static void main(String[] args) {
int port = ;
Systemoutprintln(server is listening on port: + port);
new Server(port)start();
}
}
裡邊我構造了一個Message接口
package testmutithreadserverold;
/**
* 通用消息接口
*
* @author zhangjun
*
*/
public interface Message {
void execute();
}
以及實現了一個測試消息類
package testmutithreadserverold;
import javaioBufferedReader;
import javaioIOException;
import javaioInputStreamReader;
import javaioPrintWriter;
import Socket;
/**
* 測試消息
*
* @author zhangjun
*
*/
public class TestMessage implements Message {
private Socket socket;
public TestMessage(Socket socket) {
thissocket = socket;
}
@Override
public void execute() {
try {
BufferedReader in = new BufferedReader(new InputStreamReader(socket
getInputStream()));
PrintWriter out = new PrintWriter(socketgetOutputStream() true);
String s;
while ((s = inreadLine()) != null) {
Systemoutprintln(received message: + s);
if (sequals(quit)) {
break;
}
outprintln(hello + s);
}
} catch (IOException e) {
eprintStackTrace();
} finally {
try {
if (!socketisClosed()) {
socketclose();
}
} catch (IOException e) {
}
}
}
}
代碼很簡單就不用多解釋什麼了下一步打算用nio在自己寫個非阻塞的服務器
From:http://tw.wingwit.com/Article/program/Java/gj/201311/27312.html