訊息中介軟體rabbitmq(2)
阿新 • • 發佈:2018-12-03
介紹
主要演示生產者和消費如何傳遞訊息的
生產者
public static void main(String[] args) throws IOException, TimeoutException { // TODO Auto-generated method stub ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //queueDeclare第一個引數表示佇列名稱、第二個引數為是否持久化(true表示是,佇列將在伺服器重啟時生存)、第三個引數為是否是獨佔佇列(建立者可以使用的私有佇列,斷開後自動刪除)、第四個引數為當所有消費者客戶端連線斷開時是否自動刪除佇列、第五個引數為佇列的其他引數//basicPublish第一個引數為交換機名稱、第二個引數為佇列對映的路由key、第三個引數為訊息的其他屬性、第四個引數為傳送資訊的主體 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello RabbitMQ"; //傳送訊息到佇列中 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("Producer Send +'" + message + "'"); //關閉通道和連線 channel.close(); connection.close(); }
消費者
public static void main(String[] args) throws IOException, TimeoutException { // TODO Auto-generated method stub // 建立連線工廠 ConnectionFactory factory = new ConnectionFactory(); //設定RabbitMQ地址 factory.setHost("localhost"); factory.setUsername("admin"); factory.setPassword("123456"); //建立一個新的連線 Connection connection = factory.newConnection(); //建立一個通道 Channel channel = connection.createChannel(); //宣告要關注的佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("Customer Waiting Received messages"); //DefaultConsumer類實現了Consumer介面,通過傳入一個頻道, // 告訴伺服器我們需要那個頻道的訊息,如果頻道中有訊息,就會執行回撥函式handleDelivery Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Customer Received '" + message + "'"); //throw new RuntimeException(); } }; //自動回覆佇列應答 -- RabbitMQ中的訊息確認機制 ack機制,ack = false時,如果程式異常會重新消費 channel.basicConsume(QUEUE_NAME, true, consumer); }
注:在消費者最後設定channel.basicConsume時,第二個引數代表是否需要自動回覆,即如果是true,不論消費成功還是失敗,即使丟擲異常,該條訊息也會從訊息佇列中刪除,所以對於訊息準確性要求教高的業務場景,可以設定為false,只有消費成功後,自己返回才算消費成功。