1. 程式人生 > >java實現簡單的訊息佇列模型(BIO)

java實現簡單的訊息佇列模型(BIO)

本例項主要模擬一個一對一的訊息佇列處理:

宣告佇列:

package com.bai.testbio;

import java.util.LinkedList;

public class JmsBuffer {
	// 佇列 最大儲存量
	private final static int  MAX_SIZE = 2;
	// 訊息佇列
	private static LinkedList<String> jmsQueue=new LinkedList<String>(); 
	static JmsBuffer buffer;
	// 生產訊息
    public static void produce(String str) {  
        // 同步程式碼段  
        synchronized (jmsQueue) {  
            // 如果倉庫剩餘容量不足  
            while (jmsQueue.size()> MAX_SIZE) {  
                System.out.println("你要生產的訊息為" + "/t【庫存量】:"  
                        + jmsQueue.size() + "/t暫時不能執行生產任務!");  
                try {  
                    // 由於條件不滿足,生產阻塞  
                	jmsQueue.wait();  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
  
            // 生產條件滿足情況下,生產訊息  
           
            jmsQueue.add(str);  
           
            System.out.println("已經生產該訊息" + str + "/t【現倉儲量為】:" + jmsQueue.size());  
  
            jmsQueue.notifyAll();  
        }  
    }  
  
    // 消費訊息
    public static String consume() {  
        // 同步程式碼段  
        synchronized (jmsQueue) {  
            // 如果倉庫儲存量不足  
            while (jmsQueue.size() > MAX_SIZE) {  
                System.out.println("【訊息庫存量】:"  
                        + jmsQueue.size() + "/t暫時不能執行生產任務!");  
                try {  
                    // 由於條件不滿足,消費阻塞  
                	jmsQueue.wait();  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
  
            // 消費條件滿足情況下,消費該訊息 
          
            String str=(String) jmsQueue.removeLast();   
  
            System.out.println("【已經消費該訊息】:" + str+ "/t【現倉儲量為】:" + jmsQueue.size());  
  
           jmsQueue.notifyAll();  
           return str;
        }  
    }  
    public synchronized static JmsBuffer getJmsBuffer(){
    	if(buffer==null){
    		return new JmsBuffer();
    	}else{
    		return buffer;
    	}
    }
}

使用BIO實現一個服務端:

package com.bai.testbio;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

public class Server extends Thread {
	private final Socket client;
	public Server(Socket c) {
		this.client = c;
	}
	
	public Socket getClient() {
		return client;
	}
 
	@Override
	public void run() {
		try {
			BufferedReader in = new BufferedReader(new InputStreamReader(
					client.getInputStream()));
			PrintWriter out = new PrintWriter(client.getOutputStream());
			while (true) {
				String str = in.readLine();
				if(str.contains("produce")){ //produce代表生產訊息
					//生產訊息
					JmsBuffer.produce(str);
				}
				if(str.contains("consume")){ //consume代表消費訊息
					String messag=JmsBuffer.consume();
			             out.println(messag);
			             out.flush(); 	
				}
				System.out.println(str);
				out.flush();
				if (str.equals("end"))
					break;
			}
			client.close();
		} catch (IOException ex) {
		} finally {
		}
	}
	public static void main(String[] args) throws IOException {
		ServerSocket server = new ServerSocket(5678);
		while (true) {
			Server mc = new Server(server.accept());
			mc.start();
		}
	}
}

宣告一個客戶端:

package com.bai.testbio;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;

public class Client {
	static Socket server;
	public static void creaSockeet() throws UnknownHostException, IOException{
		server = new Socket(InetAddress.getLocalHost(), 5678); 
	}
	//釋出訊息
    public static void pub(String message) throws UnknownHostException, IOException{ 
         BufferedReader in = new BufferedReader(new InputStreamReader(  
                 server.getInputStream()));  
         PrintWriter out = new PrintWriter(server.getOutputStream());  
             out.println(message);
             out.flush(); 	
             System.out.println(in.readLine());   
         server.close();  
    }
    //釋出訊息
    public static String sub(String code) throws UnknownHostException, IOException{ 
         BufferedReader in = new BufferedReader(new InputStreamReader(  
                 server.getInputStream()));  
         PrintWriter out = new PrintWriter(server.getOutputStream());  
             out.println(code);
             out.flush(); 	
             String str = in.readLine();
             System.out.println(str); 
             System.out.println("獲取的訊息為:"+str);
         //server.close();  
         return str;
    }
}

測試:

public static void main(String[] args) throws Exception {
		Client client=new Client();
		client.creaSockeet();
		StringBuilder builder=new StringBuilder("perduce"); //訊息標示
		builder.append("content:你好嗎?");
		client.pub(builder.toString());
	}
public static void main(String[] args) throws Exception {
		Client client=new Client();
		client.creaSockeet();
		StringBuilder builder=new StringBuilder("consumer"); //訊息標示
		String message=client.sub(builder.toString());
		System.out.println("獲取的訊息為:"+message);//獲取訊息
	}