1. 程式人生 > >基於TCP/IP協議的socket通訊server

基於TCP/IP協議的socket通訊server

while resource err close 通訊 ice inpu utils 緩沖

思路:

socket必須要隨項目啟動時啟動,所以需用Spring自帶的監聽器,需要保持長連接,要用死循環,所以必須另外起線程,不能阻礙主線程運行

1.在項目的web.xml中配置listener

<listener>
    <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
  </listener>
<listener>
    <listener-class>com.ra.car.utils.MyListener</listener-class>
  </listener>

2.因為是一個獨立的線程,所以需要調用的註入類不能通過@resource或@aotowire註入,需要應用上下文獲取

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xmlns:mvc="http://www.springframework.org/schema/mvc"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:aop="http://www.springframework.org/schema/aop" 
	xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="http://www.springframework.org/schema/beans 
		http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 
		http://www.springframework.org/schema/mvc 
		http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd 
		http://www.springframework.org/schema/context 
		http://www.springframework.org/schema/context/spring-context-4.0.xsd 
		http://www.springframework.org/schema/aop 
		http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 
		http://www.springframework.org/schema/tx 
		http://www.springframework.org/schema/tx/spring-tx-4.0.xsd 
		http://www.springframework.org/schema/task 
		http://www.springframework.org/schema/task/spring-task-4.0.xsd">
		
	<!-- 掃描包加載Service實現類 -->
	<context:component-scan base-package="com.ra.*.service.impl"></context:component-scan>
	 <bean id="DataCallBackService" class="com.ra.truck.service.impl.DataCallBackServiceImpl"/>
	 <bean id="RdTrackInfoService" class="com.ra.truck.service.impl.RdTrackInfoServiceImpl"/>
	 <bean id="OutInterfaceService" class="com.ra.truck.service.impl.OutInterfaceImpl"/>
	 <bean id="RdPhotoInfoService" class="com.ra.truck.service.impl.RdPhotoInfoServiceImpl"/>
	<bean id="MessagePackegerService" class="com.ra.truck.service.impl.MessagePackegerServiceImpl"/>
	 <!--<bean id="redis" class="com.ra.redis.service.impl.JedisClientCluster"/>-->
</beans>

  

3.創建listener監聽器類

package com.ra.car.utils;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ra.car.rabbitMQ.PBWRabbitMQCustomer;
import com.ra.car.rabbitMQ.RabbitMQCustomer;

/**
 * listener監聽器類
* */ public class MyListener implements ServletContextListener { protected static final Logger logge = LoggerFactory .getLogger(MyListener.class); @Override public void contextInitialized(ServletContextEvent arg0) { //必須單獨啟線程去跑listener Mythread myThread = new Mythread(); //創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程 // ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); // cachedThreadPool.execute(myThread); Thread thread = new Thread(myThread); thread.start(); //啟動MQTT // MQTTSubMsg client = new MQTTSubMsg(); // client.start(); RabbitMQCustomer customer=new RabbitMQCustomer(); Thread threadCustomer = new Thread(customer); threadCustomer.start(); PBWRabbitMQCustomer pbwcustomer=new PBWRabbitMQCustomer(); Thread pbwT = new Thread(pbwcustomer); pbwT.start(); } @Override public void contextDestroyed(ServletContextEvent arg0) { logge.info("進入ListenerUtil的contextDestroyed方法........."); } }
package com.ra.car.utils;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 多線程類
 *
 */
public class Mythread implements Runnable{

    protected static final Logger logge = LoggerFactory
            .getLogger(Mythread.class);
    
    @Override
    public void run() {
        logge.info("進入ListenerUtil的contextInitialized方法.........");
        try {
            ServerSocket serverSocket = new ServerSocket(8888);
            logge.info("socket通信服務端已啟動,等待客戶端連接.......");
            logge.info("我是111111111111111");
            while (true) {
                Socket socket = serverSocket.accept();// 偵聽並接受到此套接字的連接,返回一個Socket對象
                JavaTCPServer socketThread = new JavaTCPServer(socket);
                socketThread.run();
                try {
                    //休眠10毫秒,壓力測試50000次連接無壓力
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } catch (IOException e) {
            logge.error("通信服務器啟動失敗!", e);
        }
    }
    public static String stampToDate(String s){
        Long timestamp = Long.parseLong(s)*1000;  
          String date = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date(timestamp));

        return date;
    }
    
}
package com.ra.car.utils;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JavaTCPServer {
    protected static final Logger logger=LoggerFactory.getLogger(JavaTCPServer.class);

    private Socket socket;
    
    public JavaTCPServer(Socket socket) {
        this.socket = socket;
    }
    
    public void run() {
        MyThread2 myThread2=null;
        try {
            myThread2 = new MyThread2(socket);
        } catch (IOException e) {
            e.printStackTrace();
        }
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        cachedThreadPool.execute(myThread2);
    }

    
     
}
package com.ra.car.utils;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.ra.truck.model.RdDeviceCallBackDataDomain;
import com.ra.truck.service.DataCallBackService;
import com.ra.truck.service.RdPhotoInfoService;
import com.ra.truck.service.RdTrackInfoService;
import com.ra.truck.service.outInterface.OutInterfaceService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.context.ContextLoader;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;
import java.text.SimpleDateFormat;
import java.util.*;

public class MyThread2 implements Runnable {

    protected static final Logger logger = LoggerFactory
            .getLogger(MyThread2.class);

    private Socket socket;
    private InputStream inputStream;
    private OutputStream outputStream;
    private PrintWriter printWriter;
    
    private int totalCount;  //總數量

    private int adasCount; // 傳輸的ADAS信號數量
    private int gpsCount; // 傳輸的GPS信號數量
    private DataCallBackService dataCallBackService;//數據回傳private SimpleDateFormat df;

    public MyThread2(Socket socket) throws IOException {
        this.socket = socket;
        inputStream = socket.getInputStream();
        outputStream = socket.getOutputStream();
        printWriter = new PrintWriter(outputStream);
    
        dataCallBackService=(DataCallBackService)
                 ContextLoader.getCurrentWebApplicationContext().getBean("DataCallBackService");
        df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    }

    @Override
    public void run() {
        // 根據輸入輸出流和客戶端連接

        // 得到一個輸入流,接收客戶端傳遞的信息
        // InputStreamReader inputStreamReader = new InputStreamReader(
        // inputStream);// 提高效率,將自己字節流轉為字符流
        // bufferedReader = new BufferedReader(inputStreamReader);// 加入緩沖區
        Date timestart = new Date();
        Date timeend = null;
        long minuine = 0;
        int count = 0;
        while (true) {
            try {
                if (inputStream.available() > 0 == false) {
                    timeend = new Date();
                    minuine = timeend.getTime() - timestart.getTime();
                    if (minuine != 0 && (minuine / 1000) > 60) {
                        break;
                    }
                    continue;
                } else {
                    timestart = new Date();
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        logger.error("*****線程休眠出現異常*****", e);
                    }
                    count = inputStream.available();
                    byte[] b = new byte[count];
                    int readCount = 0; // 已經成功讀取的字節的個數
                    while (readCount < count) {
                        readCount += inputStream.read(b, readCount, count
                                - readCount);
                    }
                    logger.info("**********當前服務器正在被連接**********");
                    logger.info("正在連接的客戶端IP為:"
                            + socket.getInetAddress().getHostAddress());
                    
                    logger.info("當前時間為:" + df.format(new Date()));
                    String data = new String(b, "utf-8");
                    logger.info("傳輸過來的info:" + data);
                    String id = jsonStringToObject(data);
                    Map<Object, Object> map = new HashMap<Object, Object>();
                    //心跳發送不帶id的json數據
                    if (StringUtils.isNotBlank(id)) {
                        map.put("id", id);
                    }
                    map.put("resultCode", "1");
                    map.put("result", "success");
                    printWriter.print(JSON.toJSONString(map) + "\n");
                    printWriter.flush();
                }
            } catch (Exception e) {
                logger.error("數據傳輸出現異常", e);
                try {
                    outputStream = socket.getOutputStream();
                } catch (IOException e1) {
                    logger.error("獲取outputStream出現異常");
                }
                // 獲取一個輸出流,向服務端發送信息
                // printWriter = new PrintWriter(outputStream);// 將輸出流包裝成打印流
                Map<Object, Object> map = new HashMap<Object, Object>();
                map.put("resultCode", "0");
                map.put("result", "fail");
                printWriter.print(JSON.toJSONString(map) + "\n");
                printWriter.flush();
            }
        }
        try {
            printWriter.close();
            outputStream.close();
            inputStream.close();
            logger.info("30s沒有發送數據,服務端主動關閉連接");
            logger.info("被斷開的客戶端IP為:"
                    + socket.getInetAddress().getHostAddress());
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            logger.info("被斷開的時間為:" + df.format(new Date()));
            socket.close();
        } catch (IOException e) {
            logger.error("關閉socket出現異常", e);

        }

        /*
         * while ((temp = bufferedReader.readLine()) != null) { info += temp;
         * logger.info(bufferedReader.readLine());
         * logger.info("已接收到客戶端連接!!!!!!"); logger.info("服務端接收到客戶端信息:" +
         * info + ",當前客戶端ip為:" + socket.getInetAddress().getHostAddress());
         * logger.info("服務端接收到客戶端信息:" + info + ",當前客戶端ip為:" +
         * socket.getInetAddress().getHostAddress()); }
         */

        /*
         * logger.info("*****測試Redis*****"); JedisClient
         * jedisClient=(JedisClient)
         * ContextLoader.getCurrentWebApplicationContext().getBean("redis");
         * jedisClient.set("testLanHao", "123456789"); String
         * str=jedisClient.get("testLanHao");
         * logger.info("從Redis中取得數據為:"+str);
         * logger.info("*****測試Redis*****");
         */

        // ApplicationContext applicationContext=new
        // ClassPathXmlApplicationContext("classpath*:applicationContext-*.xml");
        // RiskManageService
        // riskManageService=applicationContext.getBean(RiskManageService.class);
        // socket單獨線程,需要重新加載上下文,掃描的類在applicationContext-service.xml配置
        /*
         * RiskManageService riskManageService=(RiskManageService)
         * ContextLoader.getCurrentWebApplicationContext().getBean("risk");
         * RdRiskEventInfo rdRiskEventInfo=new RdRiskEventInfo();
         * rdRiskEventInfo.setId("10"); try { List<RdPhotoInfo>
         * list=riskManageService.findPhotoInfoByEventId(rdRiskEventInfo);
         * logger.info(list); } catch (ServiceException e) {
         * e.printStackTrace(); }
         */
        // outputStream = socket.getOutputStream();// 獲取一個輸出流,向服務端發送信息
        // printWriter = new PrintWriter(outputStream);// 將輸出流包裝成打印流

    }

    private String jsonStringToObject(String data) {
        //數據解析方法return xx;
    }
    public static Date stampToDate(String s){
        
        Long timestamp = Long.parseLong(s)*1000;  
          Date date = new Date(timestamp);

        return date;
    }

基於TCP/IP協議的socket通訊server