1. 程式人生 > >基於TCP協議的項目架構之Socket流傳輸的實現

基於TCP協議的項目架構之Socket流傳輸的實現

hand == buffered 可靠的 客戶端請求 utf 信息 color hdf

項目背景

   某銀行的影像平臺由於使用時間長,服務器等配置原因,老影像系統滿足不了現在日益增長的數據量的需求,所以急需要升級改造。傳統的影像平臺使用的是Oracle數據庫和簡單的架構來存儲數據(視頻、圖片等),所以很難滿足現在的業務需求,其中最主要的就是存儲下載等速度的影響,綜合考慮之後,我們給出了升級改造方案,基於Http協議的數據傳輸和基於TCP協議的數據傳輸,按照行方的要求需要用TCP協議,我們最終采用的是Socket網絡傳輸的方案。

TCP協議介紹

  TCP是一種面向連接的、可靠的、基於字節流的傳輸層通信協議,在簡化的計算機網絡OSI模型中,它完成第四層傳輸層所指定的功能,TCP層位於IP層之上,應用層之下的中間層,不同主機的應用層之間經常需要可靠的、像管道一樣的連接,但IP層不需要提供這樣的機制,而是提供不可靠的包交換。當應用層向TCP層發送用於網間傳輸的、用8位字節標示的數據流,TCP會把數據流分割為適當長度的報文段,之後TCP把數據包傳給IP層,由它來通過網絡將包傳輸給接收端的實體TCP層。TCP是因特網中的傳輸層協議,使用3次握手協議建立連接。

Socket

TCP通信是嚴格區分客戶端與服務端的,在通信時,必須先由客戶端去連接服務器才能實現通信,服務器端不可以主動連接客戶端,並且服務器要事先啟動,等待客戶端的連接。

在JDK中提供了兩個類用於實現TCP程序,一個是ServerSocket類,用於表示服務器,一個是Socket類,用於表示客戶端。通信時,首先創建代表服務器端的ServerSocket對象,該對象相當於開啟一個服務,並等待客戶端的連接,然後創建代表客戶端的Socket對象向服務器端發出連接請求,服務器端響應請求,兩者建立連接,開始通信。

下面是我們通過Socket建立的模型,是多線程的,首先看服務端代碼:

/**
 * socket服務端
 * @author 我心自在
 *
 */
public class Main {
    private static Log logger=null;
    //線程池
    public static ExecutorService executor =null;
    /**
     * 靜態塊,初始化
     */
    static{
        logger=LogFactory.getLog(SocketMain.class);
        //線程池
        executor = Executors.newFixedThreadPool(Integer.parseInt(new
PropertiesUtil(). getProperties("config.properties").getProperty("UserThreads"))); } /** * 主程序入口 * @param args */ public static void main(String[] args) { ServerSocket server = null; Socket socket = null; PropertiesUtil PropertiesUtil = new PropertiesUtil(); try { //啟動Socket服務端 server=newServerSocket(Integer.parseInt(PropertiesUtil.getProperties("config.properties") .getProperty("port"))); while (true) { //多線程接收客戶端請求 socket = server.accept(); if (socket != null) { executor.execute(new Controller(socket)); } } } catch (IOException e) { logger.error("Main IO異常:"+e.getMessage(),e); } finally { try { if (server != null) { server.close(); } if (socket != null) { socket.close(); } } catch (IOException e) { logger.error("Main流關閉異常:"+e.getMessage(),e); } } } }

下面是業務處理類,支持多線程,主要用來處理業務

public class SocketController implements Runnable {
    private Socket socket;
    private static Log logger=LogFactory.getLog(SocketController.class);
public SocketController(Socket socket) {
        super();
        this.socket = socket;
    }
public void run() {
        //讀取文件流
        String requestMethod=null;
        Map<String,Object> getMethAndNumMap=null;
        Map<String,Object> jsonMap=null;
        BufferedInputStream  bis =null; 
        OutputStream ops = null;
        BufferedWriter bw = null;
        BufferedOutputStream bos = null;
        Document doc=null;
        String XMLString=null;
        try {
            bis= new BufferedInputStream (socket.getInputStream());
            //獲取輸出流
            ops = socket.getOutputStream();
            bos = new BufferedOutputStream(ops);
            bw = new BufferedWriter(new OutputStreamWriter(ops));
            byte[] fileinfo=new byte[256];
            try {
                bis.read(fileinfo);
            } catch (IOException e1){
                logger.error("流讀取異常...."+e1.getMessage(),e1);
            }
            if(fileinfo!=null){
                fileInfoString=new String(fileinfo).trim();
            }
            jsonMap=JSONUtil.jsonToMap(fileInfoString);
            requestMethod=(String) jsonMap.get("requestMethod");
                if (!(requestMethod==null||"".equals(requestMethod))) {
                    switch (requestMethod){ 
                    case "login":
                        Login.login(XMLString,bw,doc);//登錄接口
                        break;
                    case "XXXX":
                        XXX.xxx();
                        break;
                    default:
                //請求方法錯誤

                    }
                }
        } catch (UnknownHostException e) {
            logger.error("Socket未知端口:"+e.getMessage(),e);
        } catch (IOException e) {
            logger.error("Controller讀流流異常:"+e.getMessage(),e);
        }finally{
            try {
                if (bw!=null) {
                    bw.flush();
                    bw.close();
                }
                if(ops!=null){
                    ops.close();
                }
                if(bos!=null){
                    bos.close();
                }
                bis.close();
                socket.close();
            } catch (IOException e) {
                logger.error("socket關閉異常:"+e.getMessage(),e);
            }
        }
    }
        

  這裏只貼出了部分代碼,做個參考,基本思想就是,通過輸入流接到客戶端發送過來的報文,然後進行解析,為什麽統一用字節流接受呢,因為我們的文件上傳分兩部分,一部分是文件信息,一部分是文件流,所以為了方便,統一使用字節流接收,根據字節流中的請求接口方法,調用對應的接口方法,完成業務處理。因為客戶端的報文有兩種,一種是XML類型的報文,另外一種是json格式的報文,這裏只貼出了部分json格式的代碼。差的只是一個XML的解析,解析方式很多,就不再贅述。

下面是客戶端代碼,以登錄為例:

public class LoginTest {
    public static void main(String[] args) {
        String xml = 
                "<?xml version=\"1.0\" encoding=\"utf-8\"?>"
                    + "<root>"
                        + "<requestMethod>login</requestMethod>"
                        + "<userinfo>"
                            + "<loginname>test</loginname>"    
                            + "<passwd>dmd1dnZndXY=</passwd>"
                        + "</userinfo>"    
                    + "</root>";
        Socket socket=null;
        BufferedWriter bw=null;
        BufferedReader br=null;
        try {
            socket=new Socket(new ConfigUtil().getValue("ipAddress"), 
                    Integer.parseInt(new ConfigUtil().getValue("port")));
            bw=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            bw.write(xml);
            bw.newLine();
            bw.flush();
            br=new BufferedReader(new InputStreamReader(socket.getInputStream()));
            System.out.println("服務器返回報文:"+br.readLine());
            bw.close();
            br.close();
            socket.close();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }    

  客戶端代碼非常簡單,基本就是以流的形式發送報文,發送到客戶端,完成請求,收到服務器響應後,關閉連接,完成一次請求。

  以上只是一個簡單的Socket通信模型,具體項目中涉及到很多內容,這裏無法一一列舉。大致思路我們就是通過Socket通信,獲取客戶端發送過來的報文,然後對報文進行解析,根據請求方法,調用不同的業務接口,處理不同的業務,我們的業務包括對影像資料的增刪改查,文件根據不同的大小存在大數據集群中,小文件存入Hbase,大文件存入HDFS,然後根據不同的場景去查詢刪除或者更新。這裏邊涉及到的存索引,用到了Elasticsearch5.4,用來做高級檢索查詢。

基於TCP協議的項目架構之Socket流傳輸的實現