1. 程式人生 > >【Java TCP/IP Socket程式設計】----傳送和接收資料----構建和解析協議訊息

【Java TCP/IP Socket程式設計】----傳送和接收資料----構建和解析協議訊息

--------筆記來自於書籍《Java TCP/IP Socket程式設計》。

簡介

使用套接字時,通常要麼是需要同時建立通訊通道兩端的程式,要麼實現一個給定的協議進行通訊。如果知道通訊雙方都使用java實現,且擁有對協議的完全控制權,那麼就可以使用Java的內建工具如Serialiable介面或者遠端方法呼叫(RMI)工具,但是由於某些原因導致,不是最好的解決方法。首先,它們是比較籠統的工具,在通訊開銷上不能做到最高效。其次自己的類要實現序列化介面,這個很容易出錯。所以實現自己的方法,可能更簡單、容量已或者更有效。

案例裡面的程式主要是實現了“投票”協議,客戶端向伺服器端傳送請求,訊息中包含一個候選人ID,範圍是0~1000,程式支援了兩種請求。

      1)查詢請求,即向伺服器詢問給定候選人當前獲得的投票總數。伺服器發回一個響應訊息,包含了原來的候選人的ID和該候選人的獲取的選票總數。

      2)投票請求,即向指定候選人投一票,伺服器對這種請求也發回響應訊息,包含了候選人ID和其獲取的選票數。

實現協議時,專門一個類用來存放訊息中的所包含的資訊,提供操作訊息中的欄位的方法,同時維護不同欄位之間的不變數。而且客戶端和伺服器端的訊息比較簡單,直接可以使用一個類包含客戶端和伺服器端的訊息。

案例

1.首先定義一個訊息類VoteMsg.java,此類包含了客戶端和伺服器端的訊息

。對於欄位的基本介紹如下:

  • 布林值isInquiry,其值為true時表示該訊息是查詢請求(false表示該訊息是投票資訊)。
  • 布林值isResponse,表示該訊息是響應(伺服器端傳送)還是請求。
  • 整型變數candidateID表示的是候選人的ID。
  • 長整型的變數voteCount表示的是所查詢的候選人獲得總票數。

 此類維護一下欄位間的不變數。

  • candidateID的範圍是0~1000.
  • voteCount在響應訊息中只能是一個非零值(isResponse為true)。
  • voteCount不能是負數。
public class VoteMsg {
  private boolean isInquiry;
  private boolean isResponse;
  private int candidateID;
  private long voteCount;
  private static final int MAX_CANDIDATE_ID = 1000;

  public VoteMsg(boolean isResponse, boolean isInquiry, int candidateID, long voteCount) {
    if (voteCount != 0 && !isResponse) {
      throw new IllegalArgumentException("Request vote count must be zero");
    }
    if (candidateID < 0 || candidateID > MAX_CANDIDATE_ID) {
      throw new IllegalArgumentException("Bad Candidate ID:" + candidateID);
    }
    if (voteCount < 0) {
      throw new IllegalArgumentException("total vote count must >=0");
    }
    this.isInquiry = isInquiry;
    this.isResponse = isResponse;
    this.candidateID = candidateID;
    this.voteCount = voteCount;
  }

  public boolean isInquiry() {
    return isInquiry;
  }

  public void setInquiry(boolean isInquiry) {
    this.isInquiry = isInquiry;
  }

  public boolean isResponse() {
    return isResponse;
  }

  public void setResponse(boolean isResponse) {
    this.isResponse = isResponse;
  }

  public int getCandidateID() {
    return candidateID;
  }

  public void setCandidateID(int candidateID) {
    if (candidateID < 0 || candidateID > MAX_CANDIDATE_ID) {
      throw new IllegalArgumentException("Bad Candidate ID:" + candidateID);
    }
    this.candidateID = candidateID;
  }

  public long getVoteCount() {
    return voteCount;
  }

  public void setVoteCount(long voteCount) {
    if ((voteCount != 0 && !isResponse) || voteCount < 0) {
      throw new IllegalArgumentException("Bad vote count");
    }
    this.voteCount = voteCount;
  }

  public String toString() {
    String res = (isInquiry ? "inquiry" : "vote") + " for candidate " + candidateID;
    if (isResponse) {
      res = "response to " + res + " who now has " + voteCount + " vote(s)";
    }
    return res;
  }
}

介面VoteMsgCoder定義了兩個方法,toWire()方法根據特定的協議將投票資訊轉換成在位元組序列。fromWire()方法是根據協議將位元組序列進行解析。 

public interface VoteMsgCoder {
  //訊息的序列化
  byte[] toWire(VoteMsg msg) throws IOException;
  //訊息的反序列化
  VoteMsg fromWire(byte[] input) throws IOException;
}

2.基於文字方式對訊息編碼

基於文字方式對投票資訊進行編碼,協議使用了ASCII字符集對文字進行編碼。訊息的開頭是“魔術字串”,即位元組序列,用於接收者快速將投票協議的訊息和網路中隨機到來的垃圾訊息區分開。投票/查詢布林值被編碼程字元形式,‘v’表示的是投票訊息,而‘i’表示的是查詢訊息。訊息的狀態,即是否為伺服器的響應,由字元‘R’指示,狀態標記後面是候選人ID,其後面跟的是選票總數,它們都能編碼程十進位制字串。

由於傳送的訊息是空格隔開的,Scanner類根據空白符一個一個獲取所有的欄位。

public class VoteMsgTextCoder implements VoteMsgCoder {
  /**
   * Wire format "VOTEPROTO" <"v"|"i">[<RESPFLAG>][<CANDIDATE>][<VOTECNT>] Charset is fixed by wire format
   */
  public static final String MAGIC = "Voting";
  public static final String VOTESTR = "v";
  public static final String INQSTR = "i";
  public static final String RESPONSETER = "R";
  public static final String CHARSETNAME = "US-ASCII";
  public static final String DELIMETER = " ";
  public static final int MAX_WIRE_LENGTH = 2000;

  @Override
  public byte[] toWire(VoteMsg msg) throws IOException {
    String msgString = MAGIC + DELIMETER + (msg.isInquiry() ? INQSTR : VOTESTR) + DELIMETER
        + (msg.isResponse() ? RESPONSETER + DELIMETER : "") + Integer.toString(msg.getCandidateID()) + DELIMETER
        + Long.toString(msg.getVoteCount());
    byte data[] = msgString.getBytes(CHARSETNAME);
    return data;
  }

  @Override
  public VoteMsg fromWire(byte[] message) throws IOException {
    ByteArrayInputStream msgStream = new ByteArrayInputStream(message);
    Scanner s = new Scanner(new InputStreamReader(msgStream, CHARSETNAME));
    boolean isInquiry;
    boolean isResponse;
    int candidateID;
    long voteCount;
    String token;
    try {
      token = s.next();
      if (!token.equals(MAGIC)) {
        throw new IOException("Bad magic string:" + token);
      }
      token = s.next();
      if (token.equals(VOTESTR)) {
        isInquiry = false;
      } else if (!token.equals(INQSTR)) {
        throw new IOException("Bad vote/inq indicator: " + token);
      } else {
        isInquiry = true;
      }
      token = s.next();
      if (token.equals(RESPONSETER)) {
        isResponse = true;
        token = s.next();
      } else {
        isResponse = false;
      }
      candidateID = Integer.parseInt(token);
      if (isResponse) {
        token = s.next();
        voteCount = Long.parseLong(token);
      } else {
        voteCount = 0;
      }
    } catch (IOException e) {
      throw new IOException("Parse error");
    }
    return new VoteMsg(isResponse, isInquiry, candidateID, voteCount);
  }
}

3.基於二進位制方式對訊息進行編碼

二進位制格式使用固定大小的訊息,每條訊息都由一個特殊位元組開始,該位元組的最高六位為一個“魔術值”010101,可以篩選對應的來源的資料,該位元組的最低兩位對兩個布林值進行了編碼(isResponse,isInquiry),訊息的第二個自己總是0.第三,第四個位元組包含了candidateID值,只有響應訊息的最後8個位元組才包含了選票總數資訊。

public class VoteMsgBinCoder implements VoteMsgCoder {

  public static final int MIN_WIRE_LENGTH = 4;
  public static final int MAX_WIRE_LENGTH = 16;
  public static final int MAGIC = 0x5400;// 0101 0100 0000 0000
  public static final int MAGIC_MASK = 0xfc00;// 1111 1100 0000 0000
  public static final int MAGIC_SHIFT = 8;
  public static final int RESPONSE_FLAG = 0x0200;// 10 0000 0000
  public static final int INQUIRE_FLAG = 0x0100;// 1 0000 0000

  @Override
  public byte[] toWire(VoteMsg msg) throws IOException {
    ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
    DataOutputStream out = new DataOutputStream(byteStream);
    short magicAndFlags = MAGIC;
    if (msg.isInquiry()) {
      magicAndFlags |= INQUIRE_FLAG;
    }
    if (msg.isResponse()) {
      magicAndFlags |= RESPONSE_FLAG;
    }
    out.writeShort(magicAndFlags);
    // candidate ID will fit in a short :its>&&<1000;
    out.writeShort((short) msg.getCandidateID());
    if (msg.isResponse()) {
      out.writeLong(msg.getVoteCount());
    }
    out.flush();
    byte[] data = byteStream.toByteArray();
    return data;
  }

  @Override
  public VoteMsg fromWire(byte[] input) throws IOException {
    if (input.length < MIN_WIRE_LENGTH) {
      throw new IOException("Runt mssage");
    }
    ByteArrayInputStream bs = new ByteArrayInputStream(input);
    DataInputStream in = new DataInputStream(bs);
    int magic = in.readShort();
    if ((magic & MAGIC_MASK) != MAGIC) {
      throw new IOException("Bad Magic #:" + ((magic & MAGIC_MASK) >> MAGIC_SHIFT));
    }
    boolean resp = ((magic & RESPONSE_FLAG) != 0);
    boolean inq = ((magic & INQUIRE_FLAG) != 0);
    int candidateID = in.readShort();
    if (candidateID < 0 || candidateID > 1000) {
      throw new IOException("Bad candidate ID: " + candidateID);
    }
    long count = 0;
    if (resp) {
      count = in.readLong();
      if (count < 0) {
        throw new IOException("Bad vote count: " + count);
      }
    }
    return new VoteMsg(resp, inq, candidateID, count);
  }
}

************************************【注意】以下的案例中的Framer介面,LengthFramer類參考另外一篇筆記

【Java TCP/IP Socket程式設計】----傳送和接收資料----訊息成幀與解析

4..傳送和接收

1)基於TCP套接字傳送投票資訊和接收資訊,其中訊息是使用的二進位制方式進行編碼。

public class VoteClientTCP {
  public static final int CANDIDATEID = 888;

  public static void main(String[] args) throws Exception {
    Socket socket = new Socket("127.0.0.1", 1234);
    OutputStream out = socket.getOutputStream();
    VoteMsgCoder coder = new VoteMsgBinCoder();
    Framer framer = new LengthFramer(socket.getInputStream());
    VoteMsg msg = new VoteMsg(false, true, CANDIDATEID, 0);
    byte[] encodedMsg = coder.toWire(msg);
    System.out.println("sending Inquiry (" + encodedMsg.length + "bytes):");
    System.out.println(msg);
    framer.frameMsg(encodedMsg, out);
    //傳送投票請求
    msg.setInquiry(false);
    encodedMsg = coder.toWire(msg);
    System.out.println("Sending Vote (" + encodedMsg.length + "bytes):");
    framer.frameMsg(encodedMsg, out);
    //查詢資訊響應
    encodedMsg = framer.nextMsg();
    msg = coder.fromWire(encodedMsg);
    System.out.println("Received Response (" + encodedMsg.length + "bytes):");
    System.out.println(msg);
    //投票資訊響應
    msg = coder.fromWire(framer.nextMsg());
    System.out.println("Received Response (" + encodedMsg.length + "bytes):");
    System.out.println(msg);
    socket.close();
  }
}
public class VoteServerTCP {
  public static void main(String[] args) throws Exception {
    ServerSocket serverSocket = new ServerSocket(1234);
    VoteMsgCoder coder = new VoteMsgBinCoder();
    VoteService service = new VoteService();
    while (true) {
      Socket clientSocket = serverSocket.accept();
      System.out.println("Handling client at :" + clientSocket.getRemoteSocketAddress());
      Framer framer = new LengthFramer(clientSocket.getInputStream());
      try {
        byte[] req;
        while ((req = framer.nextMsg()) != null) {
          System.out.println("Received message(" + req.length + "bytes)");
          VoteMsg responseMsg = service.handleRequest(coder.fromWire(req));
          framer.frameMsg(coder.toWire(responseMsg), clientSocket.getOutputStream());
        }
      } catch (IOException e) {
        System.out.println("Error handling client:" + e.getMessage());
      } finally {
        System.out.println("Closing connection");
        clientSocket.close();
      }
    }
  }
}

2)基於UDP套接字傳送投票資訊和接收資訊,其中訊息是使用文字形式進行的編碼。

public class VoteServiceUDP {
  public static void main(String[] args) throws IOException {
    DatagramSocket socket = new DatagramSocket(1234);
    byte[] buffer = new byte[VoteMsgTextCoder.MAX_WIRE_LENGTH];
    VoteMsgCoder coder = new VoteMsgTextCoder();
    VoteService service = new VoteService();
    while(true) {
      DatagramPacket packet = new DatagramPacket(buffer,buffer.length);
      socket.receive(packet);
      byte[] encodedMsg = Arrays.copyOfRange(packet.getData(), 0, packet.getLength());
      System.out.println("Handling request from "+packet.getSocketAddress()+"("+encodedMsg.length+"bytes)");
      try {
        VoteMsg msg = coder.fromWire(encodedMsg);
        msg = service .handleRequest(msg);
        packet.setData(coder.toWire(msg));
        System.out.println("Sending response ("+packet.getLength()+"bytes):");
        System.out.println(msg);
        socket.send(packet);
      }catch(IOException e) {
        System.out.println("Parse error in message:"+e.getMessage());
      }
    }
  }
}

public class VoteClientTCP {
  public static final int CANDIDATEID = 888;

  public static void main(String[] args) throws Exception {
    Socket socket = new Socket("127.0.0.1", 1234);
    OutputStream out = socket.getOutputStream();
    VoteMsgCoder coder = new VoteMsgBinCoder();
    Framer framer = new LengthFramer(socket.getInputStream());
    VoteMsg msg = new VoteMsg(false, true, CANDIDATEID, 0);
    byte[] encodedMsg = coder.toWire(msg);
    System.out.println("sending Inquiry (" + encodedMsg.length + "bytes):");
    System.out.println(msg);
    framer.frameMsg(encodedMsg, out);
    //傳送投票請求
    msg.setInquiry(false);
    encodedMsg = coder.toWire(msg);
    System.out.println("Sending Vote (" + encodedMsg.length + "bytes):");
    framer.frameMsg(encodedMsg, out);
    //查詢資訊響應
    encodedMsg = framer.nextMsg();
    msg = coder.fromWire(encodedMsg);
    System.out.println("Received Response (" + encodedMsg.length + "bytes):");
    System.out.println(msg);
    //投票資訊響應
    msg = coder.fromWire(framer.nextMsg());
    System.out.println("Received Response (" + encodedMsg.length + "bytes):");
    System.out.println(msg);
    socket.close();
  }
}