RabbitMQ的五種工作模式的程式碼實現
阿新 • • 發佈:2018-11-19
首先是一個SpringBoot專案,在專案中新增如下依賴:
1、簡單模式
package com.jt.test.rabbitmq; import org.junit.Before; import org.junit.Test; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; /** * 本類生成2個test方法,其中一個是生產者 * 另一個是消費者 * @author DGHxj * */ public class SimpleTest { private Connection conn; @Before//Test執行之前都要執行這個方法 public void getResource() throws Exception{ /* * 建立連線工程 * 獲取場長連結 */ ConnectionFactory factory=new ConnectionFactory(); //com.rabbitmq.client //建立連線,獲取登入資訊,ip,埠 factory.setHost("10.42.39.48"); factory.setPort(5672); factory.setUsername("easymall"); factory.setPassword("123456"); factory.setVirtualHost("/easymall"); //從工廠中獲取連線物件 conn=factory.newConnection(); } @Test public void productor() throws Exception{ /* *長連結建立短連線 *宣告繫結佇列queue *傳送訊息 */ Channel channel=conn.createChannel();//到此為止可以連線了 //rabbitmq //準備一個佇列名稱 String queue="simple"; //宣告佇列,消費端和生產端呼叫宣告佇列方法,無則建立有則直接連線使用 /* * queue String 佇列名稱 * durable boolean 是否持久化 * exclusive boolean 是否專屬,一個連線建立的所有channel宣告的queue 是否只有當前連結可以使用 false *autoDelete boolean 是否自動刪除 false 沒有channel連線queue時 queue自動消失 *arguments map型別,其他引數,例如佇列多長(資料量) */ channel.queueDeclare(queue, false, false, false, null); //傳送訊息 String msg="hello simple mode rabbitmq"; /* * exchange string 交換機名稱 "" 預設建立的AMQP default(direct) * routingKey string 繫結交換機的路由key 簡單模式使用queue名稱 * props BasicProperties 屬性類,訊息也有各種屬性 * 例如 deliveryMode 0持久化 1不持久化 * body byte[] 訊息的二進位制 */ channel.basicPublish("", queue, null, msg.getBytes()); } //消費端程式碼 @Test public void consumer() throws Exception{ //獲取連線 Channel channel=conn.createChannel(); //生產端已經聲明瞭佇列,就無需再次宣告 String queue="simple"; channel.queueDeclare(queue,false,false,false,null); //消費者物件,建立 QueueingConsumer consumer=new QueueingConsumer(channel); //利用繫結channel的消費者物件,繫結消費佇列 /* * queue string 繫結的佇列名 * autoAck boolean 是否自動確認 * 確認邏輯在佇列髒哦能舉足輕重 true表示自動確認,false表示手動確認 * callback繫結的消費物件 */ channel.basicConsume(queue, true,consumer); //編寫監聽邏輯,NIO非阻塞執行緒的程式碼 while(true){ //一旦有訊息生成,建立接受物件Delivery Delivery delivery=consumer.nextDelivery(); //從對向獲取訊息 String msg=new String(delivery.getBody()); System.out.println("消費者獲取訊息:"+msg); } } }
二、工作模式(資源爭搶)
package com.jt.test.rabbitmq; import org.junit.Before; import org.junit.Test; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; /** * 測試利用消費邏輯,完成爭搶的效果 * 一個模擬忙,搶的少 * 一個模擬空閒,搶得多 * @author DGHxj * */ public class WorkTest { private Connection conn; @Before public void getResource() throws Exception{ ConnectionFactory factory=new ConnectionFactory(); factory.setHost("10.42.39.48"); factory.setPort(5672); factory.setUsername("easymall"); factory.setPassword("123456"); factory.setVirtualHost("/easymall"); conn=factory.newConnection(); } //生產者 @Test public void productor() throws Exception{ Channel channel=conn.createChannel(); String queue="work"; channel.queueDeclare(queue, false, false, false, null); for(int i=0;i<100;i++){ String msg="hello work "+i; channel.basicPublish("",queue, null,msg.getBytes()); System.out.println("生產者傳送成功第"+i+"條"); } } //消費者1 @Test public void consumer01() throws Exception{ Channel channel=conn.createChannel(); String queue="work"; channel.queueDeclare(queue, false, false, false, null); QueueingConsumer consumer=new QueueingConsumer(channel); channel.basicQos(1);//消費者不執行回執確認,queue只最多傳送一條訊息 channel.basicConsume(queue, false,consumer); while(true){ Delivery delivery=consumer.nextDelivery(); System.out.println("消費者1接收到訊息:"+new String(delivery.getBody())); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } //消費者2 @Test public void consumer02() throws Exception{ Channel channel=conn.createChannel(); String queue="work"; channel.queueDeclare(queue, false, false, false, null); QueueingConsumer consumer=new QueueingConsumer(channel); channel.basicQos(1);//消費者不執行回執確認,queue只最多傳送一條訊息 channel.basicConsume(queue, false,consumer); while(true){ Delivery delivery=consumer.nextDelivery(); System.out.println("消費者2接收到訊息:"+new String(delivery.getBody())); Thread.sleep(100); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
三、釋出訂閱(publish/fanout)
package com.jt.test.rabbitmq; import org.junit.Before; import org.junit.Test; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; /** * 一個生產者,傳送到釋出訂閱型別的交換機訊息 * 同步複製到所有與該交換機連線繫結的佇列+ * @author DGHxj * */ public class PublishTest { private Connection conn; @Before//Test執行之前,@Before的方法自動換行一次 public void getResource() throws Exception{ /* * 建立連線工程 * 獲取長連結 */ ConnectionFactory factory=new ConnectionFactory(); //com.rabbitmq.client //建立連線,獲取登入資訊 factory.setHost("10.42.39.48"); factory.setPort(5672); factory.setUsername("easymall"); factory.setPassword("123456"); factory.setVirtualHost("/easymall"); //從工廠中獲取連線物件 conn=factory.newConnection(); } private static final String exchange="fanout1807"; private static final String queue01="fanoutQ01"; private static final String queue02="fanoutQ02"; //生產端程式碼 @Test public void productor() throws Exception{ //自定義完成一個釋出訂閱型別的交換機 Channel channel=conn.createChannel(); //宣告佇列可以在生產端也可以在消費端,交換機也是如此 channel.exchangeDeclare(exchange, "fanout");//名稱,type //topic direct fanout headers //傳送訊息 for(int i=0;i<100;i++){ String msg="hello fanout:"+i; channel.basicPublish(exchange, "", null, msg.getBytes()); } } //消費端01 @Test public void consumer01() throws Exception{ Channel channel=conn.createChannel(); //宣告佇列 channel.queueDeclare(queue01, false, false, false, null); //宣告交換機 channel.exchangeDeclare(exchange, "fanout"); //繫結佇列到交換機 channel.queueBind(queue01, exchange, ""); //消費者物件 QueueingConsumer consumer=new QueueingConsumer(channel); //繫結消費者佇列 channel.basicConsume(queue01, true, consumer); while(true){ Delivery delivery=consumer.nextDelivery(); System.out.println("消費者01接收到:"+new String(delivery.getBody())); } } //消費端02 @Test public void consumer02() throws Exception{ Channel channel=conn.createChannel(); //宣告佇列 channel.queueDeclare(queue02, false, false, false, null); //宣告交換機 channel.exchangeDeclare(exchange, "fanout"); //繫結佇列到交換機 channel.queueBind(queue02, exchange, ""); //消費者物件 QueueingConsumer consumer=new QueueingConsumer(channel); //繫結消費者佇列 channel.basicConsume(queue02, true, consumer); while(true){ Delivery delivery=consumer.nextDelivery(); System.out.println("消費者02接收到:"+new String(delivery.getBody())); } } }
四、路由模式(routing/direct)
package com.jt.test.rabbitmq;
import org.junit.Before;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import ch.qos.logback.core.net.SyslogOutputStream;
/**
* 測試路由模式
* 訊息攜帶具體的路由key
* 繫結到交換機的不同佇列使用不同的路由key
* 根據匹配的訊息會發送到目的queue
* @author DGHxj
*
*/
public class RoutingTest {
private Connection conn;
@Before
public void getResource() throws Exception{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("10.42.39.48");
factory.setPort(5672);
factory.setUsername("easymall");
factory.setPassword("123456");
factory.setVirtualHost("/easymall");
conn=factory.newConnection();
}
private static final String exchange="direct1807";
private static final String queue01="directQ01";
private static final String queue02="directQ02";
//生產者程式碼
@Test
public void productor() throws Exception{
Channel channel=conn.createChannel();
channel.exchangeDeclare(exchange, "direct");
for(int i=0;i<50;i++){
String msg="hello direct update:"+i;
channel.basicPublish(exchange, "item.update", null, msg.getBytes());
String msg2="hello direct add:"+i;
channel.basicPublish(exchange, "item.add", null, msg2.getBytes());
}
}
//消費者程式碼
@Test
public void consumer01() throws Exception{
//通過長連結建立短連線
Channel channel=conn.createChannel();
//宣告佇列
channel.queueDeclare(queue01, false, false, false, null);
//宣告交換機
channel.exchangeDeclare(exchange, "direct");
//繫結佇列和交換機
channel.queueBind(queue01, exchange, "item.update");
//消費者物件
QueueingConsumer consumer=new QueueingConsumer(channel);
//繫結消費者與佇列
channel.basicConsume(queue01, true, consumer);
//實時監控生產者
while(true){
//建立接受物件delivery
Delivery delivery=consumer.nextDelivery();
System.out.println("消費者01接收到:"+new String(delivery.getBody()));
}
}
//消費者程式碼
@Test
public void consumer02() throws Exception{
//通過長連結建立短連線
Channel channel=conn.createChannel();
//宣告佇列
channel.queueDeclare(queue02, false, false, false, null);
//宣告交換機
channel.exchangeDeclare(exchange, "direct");
//繫結交換機和佇列
channel.queueBind(queue02, exchange, "item.add");
//消費者物件
QueueingConsumer consumer=new QueueingConsumer(channel);
//繫結佇列和消費者物件
channel.basicConsume(queue02, true, consumer);
//實時監聽
while(true){
//建立接收物件
Delivery delivery=consumer.nextDelivery();
System.out.println("消費者02接收到:"+new String(delivery.getBody()));
}
}
}
五、topic主題模式
package com.jt.test.rabbitmq;
import org.junit.Before;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
/**
* 測試topic主題模式 範圍的轉發
*
* @author DGHxj
*
*/
public class TopicTest {
private Connection conn;
@Before
public void getResource() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.42.39.48");
factory.setPort(5672);
factory.setUsername("easymall");
factory.setPassword("123456");
factory.setVirtualHost("/easymall");
conn = factory.newConnection();
}
private static final String exchange = "topics1807";
private static final String queue01 = "topicQ01";
private static final String queue02 = "topicQ02";
//#表示任意,*表示一個字串
//生產者程式碼
@Test
public void productor() throws Exception{
Channel channel=conn.createChannel();
channel.exchangeDeclare(exchange, "topic");
for(int i=0;i<50;i++){
String msg="hello topic item.#:"+i;
channel.basicPublish(exchange, "12.item.add", null, msg.getBytes());
String msg2="hello topic *.item:"+i;
channel.basicPublish(exchange, "tedu.item", null, msg2.getBytes());
}
}
// 消費者程式碼
@Test
public void consumer01() throws Exception {
Channel channel = conn.createChannel();
channel.queueDeclare(queue01, false, false, false, null);
//channel.exchangeDeclare(exchange, "topic");
channel.queueBind(queue01, exchange, "#.item.#");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queue01, true, consumer);
while (true) {
Delivery delivery = consumer.nextDelivery();
System.out.println("消費者01獲得:" + new String(delivery.getBody()));
}
}
// 消費者程式碼
@Test
public void consumer02() throws Exception {
Channel channel = conn.createChannel();
channel.queueDeclare(queue02, false, false, false, null);
channel.exchangeDeclare(exchange, "topic");
channel.queueBind(queue02, exchange, "*.item");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queue02, true, consumer);
while (true) {
Delivery delivery = consumer.nextDelivery();
System.out.println("消費者02獲得:" + new String(delivery.getBody()));
}
}
}