熱點推薦:
您现在的位置: 電腦知識網 >> 編程 >> Java編程 >> Java核心技術 >> 正文

Java Socket通信技術收發線程互斥的解決方法

2022-06-13   來源: Java核心技術 

  Java Socket通信技術在很長的時間裡都在使用在不少的程序員眼中都有很多高的評價那麼下面我們就看看如何才能掌握這門復雜的編程語言希望大家在今後的Java <> Socket通信技術使用中有所收獲

  下面就是Java Socket通信技術在解決收發線程互斥的代碼介紹

  package combillsvr;

  import javaioIOException;

  import javaioInputStream;

  import javaioOutputStream;

  import InetSocketAddress;

  import Socket;

  import SocketException;

  import SocketTimeoutException;

  import javatextSimpleDateFormat;

  import javautilDate;

  import javautilProperties;

  import javautilTimer;

  import javautilTimerTask;

  import ncurrentConcurrentHashMap;

  import ncurrentTimeUnit;

  import ncurrentlocksCondition;

  import ncurrentlocksReentrantLock;

  import orgapachelogjLogger;

  /**

  *<p>title: socket通信包裝類</p>

  *<p>Description: </p>

  *<p>CopyRight: CopyRight (c) </p>

  *<p>Company: </p>

  *<p>Create date: </P>

  *author sunnylocus<A mailto:>

  </A> * v 初類

  * v 對命令收發邏輯及收發線程互斥機制進行了優化

  處理命令速度由原來~個/秒提高到~個/秒

  */ public class SocketConnection {

  private volatile Socket socket;

  private int timeout = *; //超時時間初始值

  private boolean isLaunchHeartcheck = false;//是否已啟動心跳檢測

  private boolean isNetworkConnect = false; //網絡是否已連接

  private static String host = ;

  private static int port;

  static InputStream inStream = null;

  static OutputStream outStream = null;

  private static Logger log =LoggergetLogger

  (SocketConnectionclass);

  private static SocketConnection socketConnection = null;

  private static javautilTimer heartTimer=null;

  //private final Map<String Object> recMsgMap= Collections

  synchronizedMap(new HashMap<String Object>());

  private final ConcurrentHashMap<String Object> recMsgMap

  = new ConcurrentHashMap<String Object>();

  private static Thread receiveThread = null;

  private final ReentrantLock lock = new ReentrantLock();

  private SocketConnection(){

  Properties conf = new Properties();

  try {

  nfload(SocketConnectionclassgetResourceAsStream

  (nf));

  thistimeout = IntegervalueOf(confgetProperty(timeout));

  init(confgetProperty(ip)IntegervalueOf

  (confgetProperty(port)));

  } catch(IOException e) {

  logfatal(socket初始化異常!e);

  throw new RuntimeException(socket初始化異常請檢查配置參數);

  }

  }

  /**

  * 單態模式

  */

  public static SocketConnection getInstance() {

  if(socketConnection==null) {

  synchronized(SocketConnectionclass) {

  if(socketConnection==null) {

  socketConnection = new SocketConnection();

  return socketConnection;

  }

  }

  }

  return socketConnection;

  }

  private void init(String hostint port) throws IOException {

  InetSocketAddress addr = new InetSocketAddress(hostport);

  socket = new Socket();

  synchronized (this) {

  (【准備與+addr+建立連接】);

  nnect(addr timeout);

  (【與+addr+連接已建立】);

  inStream = socketgetInputStream();

  outStream = socketgetOutputStream();

  socketsetTcpNoDelay(true);//數據不作緩沖立即發送

  socketsetSoLinger(true );//socket關閉時立即釋放資源

  socketsetKeepAlive(true);

  socketsetTrafficClass(x|x);//高可靠性和最小延遲傳輸

  isNetworkConnect=true;

  receiveThread = new Thread(new ReceiveWorker());

  receiveThreadstart();

  SocketConnectionhost=host;

  SocketConnectionport=port;

  if(!isLaunchHeartcheck)

  launchHeartcheck();

  }

  }

  /**

  * 心跳包檢測

  */

  private void launchHeartcheck() {

  if(socket == null)

  throw new IllegalStateException(socket is not

  established!);

  heartTimer = new Timer();

  isLaunchHeartcheck = true;

  heartTimerschedule(new TimerTask() {

  public void run() {

  String msgStreamNo = StreamNoGeneratorgetStreamNo(kq);

  int mstType =;//心跳包請求

  SimpleDateFormat dateformate = new SimpleDateFormat

  (yyyyMMddHHmmss);

  String msgDateTime = dateformateformat(new Date());

  int msgLength =;//消息頭長度

  String commandstr = +msgLength + mstType + msgStreamNo;

  (心跳檢測包 > IVR +commandstr);

  int reconnCounter = ;

  while(true) {

  String responseMsg =null;

  try {

  responseMsg = readReqMsg(commandstr);

  } catch (IOException e) {

  logerror(IO流異常e);

  reconnCounter ++;

  }

  if(responseMsg!=null) {

  (心跳響應包 < IVR +responseMsg);

  reconnCounter = ;

  break;

  } else {

  reconnCounter ++;

  }

  if(reconnCounter >) {//重連次數已達三次判定網絡連接中斷

  重新建立連接連接未被建立時不釋放鎖

  reConnectToCTCC(); break;

  }

  }

  }

  } * ***);

  }

  /**

  * 重連與目標IP建立重連

  */

  private void reConnectToCTCC() {

  new Thread(new Runnable(){

  public void run(){

  (重新建立與+host+:+port+的連接);

  //清理工作中斷計時器中斷接收線程恢復初始變量

  heartTimercancel();

  isLaunchHeartcheck=false;

  isNetworkConnect = false;

  receiveThreadinterrupt();

  try {

  socketclose();

  } catch (IOException e) {logerror(重連時關閉socket連

  接發生IO流異常e);}

  //

  synchronized(this){

  for(; ;){

  try {

  ThreadcurrentThread();

  Threadsleep( * );

  init(hostport);

  thisnotifyAll();

  break ;

  } catch (IOException e) {

  logerror(重新建立連接未成功e);

  } catch (InterruptedException e){

  logerror(重連線程中斷e);

  }

  }

  }

  }

  })start();

  }

  /**

  * 發送命令並接受響應

  * @param requestMsg

  * @return

  * @throws SocketTimeoutException

  * @throws IOException

  */

  public String readReqMsg(String requestMsg) throws IOException {

  if(requestMsg ==null) {

  return null;

  }

  if(!isNetworkConnect) {

  synchronized(this){

  try {

  thiswait(*); //等待如果網絡還沒有恢復拋出IO流異常

  if(!isNetworkConnect) {

  throw new IOException(網絡連接中斷!);

  }

  } catch (InterruptedException e) {

  logerror(發送線程中斷e);

  }

  }

  }

  String msgNo = requestMsgsubstring( + );//讀取流水號

  outStream = socketgetOutputStream();

  outStreamwrite(requestMsggetBytes());

  outStreamflush();

  Condition msglock = locknewCondition(); //消息鎖

  //注冊等待接收消息

  recMsgMapput(msgNo msglock);

  try {

  locklock();

  msglockawait(timeoutTimeUnitMILLISECONDS);

  } catch (InterruptedException e) {

  logerror(發送線程中斷e);

  } finally {

  lockunlock();

  }

  Object respMsg = recMsgMapremove(msgNo); //響應信息

  if(respMsg!=null &&(respMsg != msglock)) {

  //已經接收到消息注銷等待成功返回消息

  return (String) respMsg;

  } else {

  logerror(msgNo+ 超時未收到響應消息);

  throw new SocketTimeoutException(msgNo+ 超時未收到響應消息);

  }

  }

  public void finalize() {

  if (socket != null) {

  try {

  socketclose();

  } catch (IOException e) {

  eprintStackTrace();

  }

  }

  }

  //消息接收線程

  private class ReceiveWorker implements Runnable {

  String intStr= null;

  public void run() {

  while(!Threadinterrupted()){

  try {

  byte[] headBytes = new byte[];

  if(inStreamread(headBytes)==){

  logwarn(讀到流未尾對方已關閉流!);

  reConnectToCTCC();//讀到流未尾對方已關閉流

  return;

  }

  byte[] tmp =new byte[];

  tmp = headBytes;

  String tempStr = new String(tmp)trim();

  if(tempStr==null || tempStrequals()) {

  logerror(received message is null);

  ntinue;

  }

  intStr = new String(tmp);

  int totalLength =IntegerparseInt(intStr);

  //

  byte[] msgBytes = new byte[totalLength];

  inStreamread(msgBytes);

  String resultMsg = new String(headBytes)+ new

  String(msgBytes);

  //抽出消息ID

  String msgNo = resultMsgsubstring( + );

  Condition msglock =(Condition) recMsgMapget(msgNo);

  if(msglock ==null) {

  logwarn(msgNo+序號可能已被注銷!響應消息丟棄);

  recMsgMapremove(msgNo);

  ntinue;

  }

  recMsgMapput(msgNo resultMsg);

  try{

  locklock();

  msglocksignalAll();

  }finally {

  lockunlock();

  }

  }catch(SocketException e){

  logerror(服務端關閉sockete);

  reConnectToCTCC();

  } catch(IOException e) {

  logerror(接收線程讀取響應數據時發生IO流異常e);

  } catch(NumberFormatException e){

  logerror(收到沒良心包String轉int異常異常字符:+intStr);

  }

  }

  }

  }

  }

  以上就是對Java Socket通信技術中收發線程互斥的詳細解決方法希望大家有所領悟


From:http://tw.wingwit.com/Article/program/Java/hx/201311/25550.html
    推薦文章
    Copyright © 2005-2022 電腦知識網 Computer Knowledge   All rights reserved.