RabbitMQ實戰之Hello World(三)
阿新 • • 發佈:2018-12-01
Producer
package base; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) { foo(); }private static void foo() { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); //宣告一個我們將要傳送資料進去的queue,然後向其傳送資料。queue的宣告是冪等的-只有不存在的話才會實際去建立//資料內容是byte陣列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 5; i++) { String message = "hello world"+i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("Sent:"+message); } channel.close(); connection.close(); }catch (Exception e) { e.printStackTrace(); } } }
Consumer
package base; import java.io.UnsupportedEncodingException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) { foo(); } private static void foo() { //向本地localhost建立一個和物理連線 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try { //tcp物理連線的一個抽象,關注協議版本和認證 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //這裡同樣宣告queue,因為Producer可能先於Consumer執行,所以需要保證queue存在 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //定義一個消費者實現類來處理訊息的上報 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { String message = new String(body,"UTF-8"); System.out.println("Received:"+message); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } catch (Exception e) { e.printStackTrace(); } } }
執行結果
Producer
Sent:hello world0
Sent:hello world1
Sent:hello world2
Sent:hello world3
Sent:hello world4
Consumer
Received:hello world0
Received:hello world1
Received:hello world2
Received:hello world3
Received:hello world4
工程結構如下圖:
主要引入了下面三個jar包,log4j和slf4j是為了解決編譯問題