熱點推薦:
您现在的位置: 電腦知識網 >> 編程 >> Java編程 >> Java高級技術 >> 正文

簡單多線程服務器實現

2022-06-13   來源: Java高級技術 

  閒來沒事本來是在學習nio框架的突然發現對最原始的多線程服務器都不是很了解遂自己寫了個簡單的例子

   package testmutithreadserverold;

  

   import javaioIOException;

   import ServerSocket;

   import Socket;

  

   import testmutithreadserveroldthreadpoolThreadPool;

  

   /**

    * 簡單阻塞式多線程服務器(線程池處理)

    *

    * @author zhangjun

    *

    */

   public class Server {

  

       private int port;

  

       private ServerSocket serverSocket;

  

       private ThreadPool threadPool;

  

       private PortListenThread listener;

  

       public Server(int port) {

           thisport = port;

           threadPool = new ThreadPool();

       }

  

       public void start() {

           try {

               serverSocket = new ServerSocket(port);

               listener = new PortListenThread();

               listenerstart();

           } catch (IOException e) {

               eprintStackTrace();

           }

       }

  

       public void shutdown() {

           threadPoolshutdown();

           listenerfinish();

       }

  

       private class PortListenThread extends Thread {

  

           private Boolean finish = false;

  

           @Override

           public void run() {

               while (!finish) {

                   try {

                       final Socket socket = serverSocketaccept();

                       threadPoolexecute(new Runnable() {

  

                           @Override

                           public void run() {

                               new TestMessage(socket)execute();

                           }

                       });

                   } catch (IOException e) {

                       eprintStackTrace();

                   }

  

               }

           }

  

           public void finish() {

               finish = true;

           }

  

       }

  

       public static void main(String[] args) {

           int port = ;

           Systemoutprintln(server is listening on port: + port);

           new Server(port)start();

       }

  

   }

  

  這個Server調用的是自己實現的一個基於任務隊列的簡單線程池

   package testmutithreadserveroldthreadpool;

  

   import javautilLinkedList;

  

   /**

    * 簡單線程池 (基於工作隊列的同步線程池)

    *

    * @author zhangjun

    *

    */

   public class ThreadPool extends ThreadGroup {

       private final static String THREADPOOL = thread pool;

       private final static String WORKTHREAD = work thread ;

       private final static int DEFAULTSIZE = RuntimegetRuntime()

               availableProcessors() + ;

       private LinkedList<Runnable> taskQueue;

       private boolean isPoolClose = false;

  

       public ThreadPool() {

           this(DEFAULTSIZE);

       }

  

       public ThreadPool(int size) {

           super(THREADPOOL);

           setDaemon(true);

           taskQueue = new LinkedList<Runnable>();

           initWorkThread(size);

       }

  

       private void initWorkThread(int size) {

           for (int i = ; i < size; i++) {

               new WorkThread(WORKTHREAD + i)start();

           }

           try {

               Threadsleep( * size);

           } catch (InterruptedException e) {

           }

       }

  

       public synchronized void execute(Runnable task) {

           if (isPoolClose) {

               throw new IllegalStateException();

           }

           if (task != null) {

               taskQueueadd(task);

               notify();

           }

       }

  

       private synchronized Runnable getTask() throws InterruptedException {

           if (taskQueuesize() == ) {

               if (isPoolClose) {

                   return null;

               }

               wait();

           }

           if (taskQueuesize() == ) {

               return null;

           }

           return taskQueueremoveFirst();

       }

  

       public void shutdown() {

           waitFinish();

           synchronized (this) {

               isPoolClose = true;

               interrupt();

               taskQueueclear();

           }

       }

  

       private void waitFinish() {

           synchronized (this) {

               isPoolClose = true;

               notifyAll();

           }

           Thread[] threads = new Thread[activeCount()];

           enumerate(threads);

           try {

               for (Thread t : threads) {

                   tjoin();

               }

           } catch (InterruptedException e) {

               //swallow this

           }

       }

  

       private class WorkThread extends Thread {

  

           public WorkThread(String name) {

               super(ThreadPoolthis name);

           }

  

           @Override

           public void run() {

               while (!isInterrupted()) {

                   Runnable task = null;

                   try {

                       task = getTask();

                   } catch (InterruptedException e) {

                       //swallow this

                   }

                   if (task == null) {

                       return;

                   }

                   try {

                       taskrun();

                   } catch (Throwable e) {

                       eprintStackTrace();

                   }

               }

           }

  

       }

   }

  

  當然也可以直接使用concurrent的線程池代碼幾乎不用改變

   package testncurrent;

  

   import javaioIOException;

   import ServerSocket;

   import Socket;

   import ncurrentExecutorService;

   import ncurrentExecutors;

  

   import testmutithreadserveroldTestMessage;

  

   /**

    * 簡單阻塞式多線程服務器(線程池處理)

    *

    * @author zhangjun

    *

    */

   public class Server {

  

       private int port;

  

       private ServerSocket serverSocket;

  

       private ExecutorService threadPool;

  

       private PortListenThread listener;

  

       public Server(int port) {

           thisport = port;

           threadPool = ExecutorsnewFixedThreadPool();

       }

  

       public void start() {

           try {

               serverSocket = new ServerSocket(port);

               listener = new PortListenThread();

               listenerstart();

           } catch (IOException e) {

               eprintStackTrace();

           }

       }

  

       public void shutdown() {

           threadPoolshutdown();

           listenerfinish();

       }

  

       private class PortListenThread extends Thread {

  

           private Boolean finish = false;

  

           @Override

           public void run() {

               while (!finish) {

                   try {

                       final Socket socket = serverSocketaccept();

                       threadPoolexecute(new Runnable() {

  

                           @Override

                           public void run() {

                               new TestMessage(socket)execute();

                           }

                       });

                   } catch (IOException e) {

                       eprintStackTrace();

                   }

  

               }

           }

  

           public void finish() {

               finish = true;

           }

  

       }

  

       public static void main(String[] args) {

           int port = ;

           Systemoutprintln(server is listening on port: + port);

           new Server(port)start();

       }

   }

  

  裡邊我構造了一個Message接口

   package testmutithreadserverold;

  

   /**

    * 通用消息接口

    *

    * @author zhangjun

    *

    */

   public interface Message {

  

       void execute();

  

   }

  

  以及實現了一個測試消息類

   package testmutithreadserverold;

  

   import javaioBufferedReader;

   import javaioIOException;

   import javaioInputStreamReader;

   import javaioPrintWriter;

   import Socket;

  

   /**

    * 測試消息

    *

    * @author zhangjun

    *

    */

   public class TestMessage implements Message {

  

       private Socket socket;

  

       public TestMessage(Socket socket) {

           thissocket = socket;

       }

  

       @Override

       public void execute() {

           try {

               BufferedReader in = new BufferedReader(new InputStreamReader(socket

                       getInputStream()));

               PrintWriter out = new PrintWriter(socketgetOutputStream() true);

               String s;

               while ((s = inreadLine()) != null) {

                   Systemoutprintln(received message: + s);

                   if (sequals(quit)) {

                       break;

                   }

                   outprintln(hello + s);

               }

           } catch (IOException e) {

               eprintStackTrace();

           } finally {

               try {

                   if (!socketisClosed()) {

                       socketclose();

                   }

               } catch (IOException e) {

               }

           }

       }

  

   }

  

  代碼很簡單就不用多解釋什麼了下一步打算用nio在自己寫個非阻塞的服務器


From:http://tw.wingwit.com/Article/program/Java/gj/201311/27312.html
    推薦文章
    Copyright © 2005-2022 電腦知識網 Computer Knowledge   All rights reserved.