亚洲国产日韩欧美在线a乱码,国产精品路线1路线2路线,亚洲视频一区,精品国产自,www狠狠,国产情侣激情在线视频免费看,亚洲成年网站在线观看

如何解決Java Socket通信技術(shù)收發(fā)線程互斥

時間:2024-08-28 18:14:33 SUN認證 我要投稿
  • 相關(guān)推薦

如何解決Java Socket通信技術(shù)收發(fā)線程互斥

  Java Socket通信技術(shù)在很長的時間里都在使用,在不少的程序員眼中都有很多高的評價。那么下面我們就看看如何才能掌握這門復(fù)雜的編程語言,希望大家在今后的Java Socket通信技術(shù)使用中有所收獲。

  下面就是Java Socket通信技術(shù)在解決收發(fā)線程互斥的代碼介紹。

  1.package com.bill99.svr;

  2.import java.io.IOException;

  3.import java.io.InputStream;

  4.import java.io.OutputStream;

  5.import java.net.InetSocketAddress;

  6.import java.net.Socket;

  7.import java.net.SocketException;

  8.import java.net.SocketTimeoutException;

  9.import java.text.SimpleDateFormat;

  10.import java.util.Date;

  11.import java.util.Properties;

  12.import java.util.Timer;

  13.import java.util.TimerTask;

  14.import java.util.concurrent.ConcurrentHashMap;

  15.import java.util.concurrent.TimeUnit;

  16.import java.util.concurrent.locks.Condition;

  17.import java.util.concurrent.locks.ReentrantLock;

  18.import org.apache.log4j.Logger;

  19./**

  20.*

title: socket通信包裝類

 

  21.*

Description:

 

  22.*

CopyRight: CopyRight (c) 2009

 

  23.*

Company: 99bill.com

 

  24.*

Create date: 2009-10-14

 

  25.*author sunnylocus

  26. * v0.10 2009-10-14 初類

  27.* v0.11 2009-11-12 對命令收發(fā)邏輯及收發(fā)線程互斥機制進行了優(yōu)化,

  處理命令速度由原來8~16個/秒提高到25~32個/秒

  28.*/ public class SocketConnection {

  29.private volatile Socket socket;

  30.private int timeout = 1000*10; //超時時間,初始值10秒

  31.private boolean isLaunchHeartcheck = false;//是否已啟動心跳檢測

  32.private boolean isNetworkConnect = false; //網(wǎng)絡(luò)是否已連接

  33.private static String host = "";

  34.private static int port;

  35.static InputStream inStream = null;

  36.static OutputStream outStream = null;

  37.private static Logger log =Logger.getLogger

  (SocketConnection.class);

  38.private static SocketConnection socketConnection = null;

  39.private static java.util.Timer heartTimer=null;

  40.//private final Map recMsgMap= Collections.

  synchronizedMap(new HashMap());

  41.private final ConcurrentHashMap recMsgMap

  = new ConcurrentHashMap();

  42.private static Thread receiveThread = null;

  43.private final ReentrantLock lock = new ReentrantLock();

  44.private SocketConnection(){

  45.Properties conf = new Properties();

  46.try {

  47.conf.load(SocketConnection.class.getResourceAsStream

  ("test.conf"));

  48.this.timeout = Integer.valueOf(conf.getProperty("timeout"));

  49.init(conf.getProperty("ip"),Integer.valueOf

  (conf.getProperty("port")));

  50.} catch(IOException e) {

  51.log.fatal("socket初始化異常!",e);

  52.throw new RuntimeException("socket初始化異常,請檢查配置參數(shù)");

  53.}

  54.}

  55./**

  56.* 單態(tài)模式

  57.*/

  58.public static SocketConnection getInstance() {

  59.if(socketConnection==null) {

  60.synchronized(SocketConnection.class) {

  61.if(socketConnection==null) {

  62.socketConnection = new SocketConnection();

  63.return socketConnection;

  64.}

  65.}

  66.}

  67.return socketConnection;

  68.}

  69.private void init(String host,int port) throws IOException {

  70.InetSocketAddress addr = new InetSocketAddress(host,port);

  71.socket = new Socket();

  72.synchronized (this) {

  73.log.info("【準備與"+addr+"建立連接】");

  74.socket.connect(addr, timeout);

  75.log.info("【與"+addr+"連接已建立】");

  76.inStream = socket.getInputStream();

  77.outStream = socket.getOutputStream();

  78.socket.setTcpNoDelay(true);//數(shù)據(jù)不作緩沖,立即發(fā)送

  79.socket.setSoLinger(true, 0);//socket關(guān)閉時,立即釋放資源

  80.socket.setKeepAlive(true);

  81.socket.setTrafficClass(0x04|0x10);//高可靠性和最小延遲傳輸

  82.isNetworkConnect=true;

  83.receiveThread = new Thread(new ReceiveWorker());

  84.receiveThread.start();

  85.SocketConnection.host=host;

  86.SocketConnection.port=port;

  87.if(!isLaunchHeartcheck)

  88.launchHeartcheck();

  89.}

  90.}

  91./**

  92.* 心跳包檢測

  93.*/

  94.private void launchHeartcheck() {

  95.if(socket == null)

  96.throw new IllegalStateException("socket is not

  established!");

  97.heartTimer = new Timer();

  98.isLaunchHeartcheck = true;

  99.heartTimer.schedule(new TimerTask() {

  100.public void run() {

  101.String msgStreamNo = StreamNoGenerator.getStreamNo("kq");

  102.int mstType =9999;//999-心跳包請求

  103.SimpleDateFormat dateformate = new SimpleDateFormat

  ("yyyyMMddHHmmss");

  104.String msgDateTime = dateformate.format(new Date());

  105.int msgLength =38;//消息頭長度

  106.String commandstr = "00" +msgLength + mstType + msgStreamNo;

  107.log.info("心跳檢測包 -> IVR "+commandstr);

  108.int reconnCounter = 1;

  109.while(true) {

  110.String responseMsg =null;

  111.try {

  112.responseMsg = readReqMsg(commandstr);

  113.} catch (IOException e) {

  114.log.error("IO流異常",e);

  115.reconnCounter ++;

  116.}

  117.if(responseMsg!=null) {

  118.log.info("心跳響應(yīng)包 <- IVR "+responseMsg);

  119.reconnCounter = 1;

  120.break;

  121.} else {

  122.reconnCounter ++;

  123.}

  124.if(reconnCounter >3) {//重連次數(shù)已達三次,判定網(wǎng)絡(luò)連接中斷,

  重新建立連接。連接未被建立時不釋放鎖

  125.reConnectToCTCC(); break;

  126.}

  127.}

  128.}

  129.},1000 * 60*1,1000*60*2);

  130.}

  131./**

  132.* 重連與目標IP建立重連

  133.*/

  134.private void reConnectToCTCC() {

  135.new Thread(new Runnable(){

  136.public void run(){

  137.log.info("重新建立與"+host+":"+port+"的連接");

  138.//清理工作,中斷計時器,中斷接收線程,恢復(fù)初始變量

  139.heartTimer.cancel();

  140.isLaunchHeartcheck=false;

  141.isNetworkConnect = false;

  142.receiveThread.interrupt();

  143.try {

  144.socket.close();

  145.} catch (IOException e1) {log.error("重連時,關(guān)閉socket連

  接發(fā)生IO流異常",e1);}

  146.//----------------

  147.synchronized(this){

  148.for(; ;){

  149.try {

  150.Thread.currentThread();

  151.Thread.sleep(1000 * 1);

  152.init(host,port);

  153.this.notifyAll();

  154.break ;

  155.} catch (IOException e) {

  156.log.error("重新建立連接未成功",e);

  157.} catch (InterruptedException e){

  158.log.error("重連線程中斷",e);

  159.}

  160.}

  161.}

  162.}

  163.}).start();

  164.}

  165./**

  166.* 發(fā)送命令并接受響應(yīng)

  167.* @param requestMsg

  168.* @return

  169.* @throws SocketTimeoutException

  170.* @throws IOException

  171.*/

  172.public String readReqMsg(String requestMsg) throws IOException {

  173.if(requestMsg ==null) {

  174.return null;

  175.}

  176.if(!isNetworkConnect) {

  177.synchronized(this){

  178.try {

  179.this.wait(1000*5); //等待5秒,如果網(wǎng)絡(luò)還沒有恢復(fù),拋出IO流異常

  180.if(!isNetworkConnect) {

  181.throw new IOException("網(wǎng)絡(luò)連接中斷!");

  182.}

  183.} catch (InterruptedException e) {

  184.log.error("發(fā)送線程中斷",e);

  185.}

  186.}

  187.}

  188.String msgNo = requestMsg.substring(8, 8 + 24);//讀取流水號

  189.outStream = socket.getOutputStream();

  190.outStream.write(requestMsg.getBytes());

  191.outStream.flush();

  192.Condition msglock = lock.newCondition(); //消息鎖

  193.//注冊等待接收消息

  194.recMsgMap.put(msgNo, msglock);

  195.try {

  196.lock.lock();

  197.msglock.await(timeout,TimeUnit.MILLISECONDS);

  198.} catch (InterruptedException e) {

  199.log.error("發(fā)送線程中斷",e);

  200.} finally {

  201.lock.unlock();

  202.}

  203.Object respMsg = recMsgMap.remove(msgNo); //響應(yīng)信息

  204.if(respMsg!=null &&(respMsg != msglock)) {

  205.//已經(jīng)接收到消息,注銷等待,成功返回消息

  206.return (String) respMsg;

  207.} else {

  208.log.error(msgNo+" 超時,未收到響應(yīng)消息");

  209.throw new SocketTimeoutException(msgNo+" 超時,未收到響應(yīng)消息");

  210.}

  211.}

  212.public void finalize() {

  213.if (socket != null) {

  214.try {

  215.socket.close();

  216.} catch (IOException e) {

  217.e.printStackTrace();

  218.}

  219.}

  220.}

  221.//消息接收線程

  222.private class ReceiveWorker implements Runnable {

  223.String intStr= null;

  224.public void run() {

  225.while(!Thread.interrupted()){

  226.try {

  227.byte[] headBytes = new byte[4];

  228.if(inStream.read(headBytes)==-1){

  229.log.warn("讀到流未尾,對方已關(guān)閉流!");

  230.reConnectToCTCC();//讀到流未尾,對方已關(guān)閉流

  231.return;

  232.}

  233.byte[] tmp =new byte[4];

  234.tmp = headBytes;

  235.String tempStr = new String(tmp).trim();

  236.if(tempStr==null || tempStr.equals("")) {

  237.log.error("received message is null");

  238.continue;

  239.}

  240.intStr = new String(tmp);

  241.int totalLength =Integer.parseInt(intStr);

  242.//----------------

  243.byte[] msgBytes = new byte[totalLength-4];

  244.inStream.read(msgBytes);

  245.String resultMsg = new String(headBytes)+ new

  String(msgBytes);

  246.//抽出消息ID

  247.String msgNo = resultMsg.substring(8, 8 + 24);

  248.Condition msglock =(Condition) recMsgMap.get(msgNo);

  249.if(msglock ==null) {

  250.log.warn(msgNo+"序號可能已被注銷!響應(yīng)消息丟棄");

  251.recMsgMap.remove(msgNo);

  252.continue;

  253.}

  254.recMsgMap.put(msgNo, resultMsg);

  255.try{

  256.lock.lock();

  257.msglock.signalAll();

  258.}finally {

  259.lock.unlock();

  260.}

  261.}catch(SocketException e){

  262.log.error("服務(wù)端關(guān)閉socket",e);

  263.reConnectToCTCC();

  264.} catch(IOException e) {

  265.log.error("接收線程讀取響應(yīng)數(shù)據(jù)時發(fā)生IO流異常",e);

  266.} catch(NumberFormatException e){

  267.log.error("收到?jīng)]良心包,String轉(zhuǎn)int異常,異常字符:"+intStr);

  268.}

  269.}

  270.}

  271.}

  272.}

【如何解決Java Socket通信技術(shù)收發(fā)線程互斥】相關(guān)文章:

PHP中如何使用socket進行通信08-21

Java線程同步的方法10-25

Java多線程的實現(xiàn)方式07-08

java多線程面試題201710-03

2016年java多線程面試題及答案07-02

sun認證考試輔導(dǎo):java關(guān)于多線程的部分操作07-27

PHP socket的配置08-04

超線程技術(shù)是什么意思09-09

如何編譯java程序09-28

如何讓JAVA代碼更高效07-18