入門程序,hello world
RabbitMQ是消息代理。從本質上說,它接受來自生產者的信息,並將它們傳遞給消費者。在兩者之間,它可以根據你給它的路由,緩沖規則進行傳遞消息。
一、專業術語
1. 生產者:
在現實生活中就好比制造商品的工廠,他們是商品的生產者。生產者只意味著發送。發送消息的程序稱之為一個生產者。我們用“P”表示:
2. 隊列:
隊列就像存放商品的倉庫或者商店,是生產商品的工廠和購買商品的用戶之間的中轉站。隊列就像是一個倉庫或者流水線。在RabbitMQ中,信息流從你的應用程序出發,來到RabbitMQ的隊列,所有信息可以只存儲在一個隊列中。隊列可以存儲很多的消息,因為它基本上是一個無限制的緩沖區,前提是你的機器有足夠的存儲空間。多個生產者可以將消息發送到同一個隊列中,多個消費者也可以只從同一個隊列接收數據。這就是隊列的特性。隊列用下面的圖表示,圖上面是隊列的名字:
3. 消費者
消費者就好比是從商店購買或從倉庫取走商品的人,消費的意思就是接收。消費者是一個程序,主要是等待接收消息。我們的用“C”表示
註意:
生產者,消費者和隊列(RabbitMQ)不必部署在同一臺機器上。實際在生產環境的大多數應用中,他們都是分開部署的。
二、“Hello World”
1. 說明
在本教程中,我們我們通過2個java程序,一個發送消息的生產者,和一個接收信息並打印的消費者。想要了解rabbitmq,必須了解一些最基礎的內容,我們先從一些代碼片段來了解產生信息和接收消息的流程和方法。在編寫代碼前,我們先來用戶一張圖表示要做的事,在下圖中,“P”是我們的生產者,“C”是我們的消費者。在中間紅色框是代表RabbitMQ中的一個消息隊列。箭頭指向表示信息流的方向。
2.maven依賴
1 <dependency> 2 <groupId>com.rabbitmq</groupId> 3 <artifactId>amqp-client</artifactId> 4 <version>3.5.7</version> 5 </dependency>
3. 消息生產者
1 package com.luchao.rabbitMQ.helloworld;2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.ConnectionFactory; 6 7 import java.io.IOException; 8 import java.util.concurrent.TimeoutException; 9 10 /** 11 * 生產者 12 * Created by andpay on 17/5/31. 13 */ 14 public class Producer { 15 16 private static final String QUEUE_NAME = "hello"; 17 18 public static void main(String[] args) throws IOException, TimeoutException { 19 //創建連接工廠 20 ConnectionFactory connectionFactory = new ConnectionFactory(); 21 //設置RabbitMQ地址 22 connectionFactory.setHost("localhost"); 23 //創建一個新的連接 24 Connection connection = connectionFactory.newConnection(); 25 //創建一個頻道 26 Channel channel = connection.createChannel(); 27 //聲明一個隊列 -- 在RabbitMQ中,隊列聲明是冪等性的(一個冪等操作的特點是其任意多次執行所產生的影響 28 // 均與一次執行的影響相同),也就是說,如果不存在,就創建,如果存在,不會對已經存在的隊列產生任何影響。 29 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 30 String message = "Hello World!"; 31 //發送消息到隊列中 32 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("utf-8")); 33 System.out.println("P [x] Sent ‘" + message + "‘"); 34 //關閉頻道、關閉連接 35 channel.close(); 36 connection.close(); 37 } 38 }
4. 消息消費者
1 package com.luchao.rabbitMQ.helloworld; 2 3 import com.rabbitmq.client.*; 4 5 import java.io.IOException; 6 import java.util.concurrent.TimeoutException; 7 8 /** 9 * 消費者 10 * Created by andpay on 17/5/31. 11 */ 12 public class Consumer { 13 14 private static final String QUEUE_NAME = "hello"; 15 16 public static void main(String[] args) throws IOException, TimeoutException { 17 //創建連接工廠 18 ConnectionFactory connectionFactory = new ConnectionFactory(); 19 //設置設置RabbitMQ地址 20 connectionFactory.setHost("localhost"); 21 //創建一個新的連接 22 Connection connection = connectionFactory.newConnection(); 23 //創建頻道 24 Channel channel = connection.createChannel(); 25 //聲明要關註的隊列 -- 在RabbitMQ中,隊列聲明是冪等性的(一個冪等操作的特點是其任意多次執行所產生的影響均與一 26 // 次執行的影響相同),也就是說,如果不存在,就創建,如果存在,不會對已經存在的隊列產生任何影響。 27 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 28 System.out.println("C [*] Waiting for messages. To exit press CTRL+C"); 29 com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){ 30 @Override 31 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 32 String message = new String(body, "utf-8"); 33 System.out.println("C [x] Received ‘" + message + "‘"); 34 } 35 }; 36 //自動回復隊列應答 -- RabbitMQ中的消息確認機制 37 channel.basicConsume(QUEUE_NAME, true, consumer); 38 } 39 }
5. 運行測試
啟動rabbitMQ,運行生產者程序。運行結果。
C [*] Waiting for messages. To exit press CTRL+C
運行消費者程序,運行結果。
C [*] Waiting for messages. To exit press CTRL+C C [x] Received ‘Hello World!‘
打開rabbitMQ的web控制臺看到:
Connection
channels
exchanges
以amp開頭的是rabbitMQ自己的exchange,我們用的是默認的exchange,在代碼中沒有指定exchange。
queues
我們指定的queue為hello
入門程序,hello world