java實現簡單的訊息佇列模型(BIO)
阿新 • • 發佈:2019-01-12
本例項主要模擬一個一對一的訊息佇列處理:
宣告佇列:
package com.bai.testbio; import java.util.LinkedList; public class JmsBuffer { // 佇列 最大儲存量 private final static int MAX_SIZE = 2; // 訊息佇列 private static LinkedList<String> jmsQueue=new LinkedList<String>(); static JmsBuffer buffer; // 生產訊息 public static void produce(String str) { // 同步程式碼段 synchronized (jmsQueue) { // 如果倉庫剩餘容量不足 while (jmsQueue.size()> MAX_SIZE) { System.out.println("你要生產的訊息為" + "/t【庫存量】:" + jmsQueue.size() + "/t暫時不能執行生產任務!"); try { // 由於條件不滿足,生產阻塞 jmsQueue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // 生產條件滿足情況下,生產訊息 jmsQueue.add(str); System.out.println("已經生產該訊息" + str + "/t【現倉儲量為】:" + jmsQueue.size()); jmsQueue.notifyAll(); } } // 消費訊息 public static String consume() { // 同步程式碼段 synchronized (jmsQueue) { // 如果倉庫儲存量不足 while (jmsQueue.size() > MAX_SIZE) { System.out.println("【訊息庫存量】:" + jmsQueue.size() + "/t暫時不能執行生產任務!"); try { // 由於條件不滿足,消費阻塞 jmsQueue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // 消費條件滿足情況下,消費該訊息 String str=(String) jmsQueue.removeLast(); System.out.println("【已經消費該訊息】:" + str+ "/t【現倉儲量為】:" + jmsQueue.size()); jmsQueue.notifyAll(); return str; } } public synchronized static JmsBuffer getJmsBuffer(){ if(buffer==null){ return new JmsBuffer(); }else{ return buffer; } } }
使用BIO實現一個服務端:
package com.bai.testbio; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; public class Server extends Thread { private final Socket client; public Server(Socket c) { this.client = c; } public Socket getClient() { return client; } @Override public void run() { try { BufferedReader in = new BufferedReader(new InputStreamReader( client.getInputStream())); PrintWriter out = new PrintWriter(client.getOutputStream()); while (true) { String str = in.readLine(); if(str.contains("produce")){ //produce代表生產訊息 //生產訊息 JmsBuffer.produce(str); } if(str.contains("consume")){ //consume代表消費訊息 String messag=JmsBuffer.consume(); out.println(messag); out.flush(); } System.out.println(str); out.flush(); if (str.equals("end")) break; } client.close(); } catch (IOException ex) { } finally { } } public static void main(String[] args) throws IOException { ServerSocket server = new ServerSocket(5678); while (true) { Server mc = new Server(server.accept()); mc.start(); } } }
宣告一個客戶端:
package com.bai.testbio; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.InetAddress; import java.net.Socket; import java.net.UnknownHostException; public class Client { static Socket server; public static void creaSockeet() throws UnknownHostException, IOException{ server = new Socket(InetAddress.getLocalHost(), 5678); } //釋出訊息 public static void pub(String message) throws UnknownHostException, IOException{ BufferedReader in = new BufferedReader(new InputStreamReader( server.getInputStream())); PrintWriter out = new PrintWriter(server.getOutputStream()); out.println(message); out.flush(); System.out.println(in.readLine()); server.close(); } //釋出訊息 public static String sub(String code) throws UnknownHostException, IOException{ BufferedReader in = new BufferedReader(new InputStreamReader( server.getInputStream())); PrintWriter out = new PrintWriter(server.getOutputStream()); out.println(code); out.flush(); String str = in.readLine(); System.out.println(str); System.out.println("獲取的訊息為:"+str); //server.close(); return str; } }
測試:
public static void main(String[] args) throws Exception {
Client client=new Client();
client.creaSockeet();
StringBuilder builder=new StringBuilder("perduce"); //訊息標示
builder.append("content:你好嗎?");
client.pub(builder.toString());
}
public static void main(String[] args) throws Exception {
Client client=new Client();
client.creaSockeet();
StringBuilder builder=new StringBuilder("consumer"); //訊息標示
String message=client.sub(builder.toString());
System.out.println("獲取的訊息為:"+message);//獲取訊息
}