1. 程式人生 > >RabbitMQ的五種工作模式的程式碼實現

RabbitMQ的五種工作模式的程式碼實現

首先是一個SpringBoot專案,在專案中新增如下依賴:

      

1、簡單模式

package com.jt.test.rabbitmq;

import org.junit.Before;
import org.junit.Test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

/**
 * 本類生成2個test方法,其中一個是生產者
 * 另一個是消費者
 * @author DGHxj
 *
 */
public class SimpleTest {
	private Connection conn;
	@Before//Test執行之前都要執行這個方法
	public void getResource() throws Exception{
		/*
		 * 建立連線工程
		 * 獲取場長連結
		 */
		ConnectionFactory factory=new ConnectionFactory();
		//com.rabbitmq.client
		//建立連線,獲取登入資訊,ip,埠
		factory.setHost("10.42.39.48");
		factory.setPort(5672);
		factory.setUsername("easymall");
		factory.setPassword("123456");
		factory.setVirtualHost("/easymall");
		//從工廠中獲取連線物件
		conn=factory.newConnection();
		
	}
	
	@Test
	public void productor() throws Exception{
		/*
		 *長連結建立短連線
		 *宣告繫結佇列queue
		 *傳送訊息 
		 */
		Channel channel=conn.createChannel();//到此為止可以連線了
		//rabbitmq
		//準備一個佇列名稱
		String queue="simple";
		//宣告佇列,消費端和生產端呼叫宣告佇列方法,無則建立有則直接連線使用
		/*
		 * queue String 佇列名稱
		 * durable boolean 是否持久化
		 * exclusive boolean
		   是否專屬,一個連線建立的所有channel宣告的queue
		   是否只有當前連結可以使用 false
		 *autoDelete boolean 是否自動刪除 false 沒有channel連線queue時
		  queue自動消失
		 *arguments map型別,其他引數,例如佇列多長(資料量)
		 */
		channel.queueDeclare(queue, false, false, false, null);
		//傳送訊息
		String msg="hello simple mode rabbitmq";
		/*
		 * exchange string 交換機名稱 "" 預設建立的AMQP default(direct)
		 * routingKey string 繫結交換機的路由key 簡單模式使用queue名稱
		 * props BasicProperties 屬性類,訊息也有各種屬性
		 * 例如 deliveryMode 0持久化 1不持久化
		 * body byte[] 訊息的二進位制
		 */
		channel.basicPublish("", queue, null, msg.getBytes());
		
	}
	
	//消費端程式碼
	@Test
	public void consumer() throws Exception{
		//獲取連線
		Channel channel=conn.createChannel();
		//生產端已經聲明瞭佇列,就無需再次宣告
		String queue="simple";
		channel.queueDeclare(queue,false,false,false,null);
		//消費者物件,建立
		QueueingConsumer consumer=new QueueingConsumer(channel);
		//利用繫結channel的消費者物件,繫結消費佇列
		/*
		 * queue string 繫結的佇列名
		 * autoAck boolean 是否自動確認
		 *        確認邏輯在佇列髒哦能舉足輕重 true表示自動確認,false表示手動確認
		 * callback繫結的消費物件 
		 */
		channel.basicConsume(queue, true,consumer);
		//編寫監聽邏輯,NIO非阻塞執行緒的程式碼
		while(true){
			//一旦有訊息生成,建立接受物件Delivery
			Delivery delivery=consumer.nextDelivery();
			//從對向獲取訊息
			String msg=new String(delivery.getBody());
			System.out.println("消費者獲取訊息:"+msg);
		}
	}

	
}

二、工作模式(資源爭搶)

package com.jt.test.rabbitmq;

import org.junit.Before;
import org.junit.Test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

/**
 * 測試利用消費邏輯,完成爭搶的效果
 * 一個模擬忙,搶的少
 * 一個模擬空閒,搶得多
 * @author DGHxj
 *
 */
public class WorkTest {
	private Connection conn;
	@Before
	public void getResource() throws Exception{
		ConnectionFactory factory=new ConnectionFactory();
		factory.setHost("10.42.39.48");
		factory.setPort(5672);
		factory.setUsername("easymall");
		factory.setPassword("123456");
		factory.setVirtualHost("/easymall");
		conn=factory.newConnection();
	}
	
	//生產者
	@Test
	public void productor() throws Exception{
		Channel channel=conn.createChannel();
		String queue="work";
		channel.queueDeclare(queue, false, false, false, null);
		for(int i=0;i<100;i++){
			String msg="hello work "+i;
			channel.basicPublish("",queue, null,msg.getBytes());
			System.out.println("生產者傳送成功第"+i+"條");
		}
	}
	
	//消費者1
	@Test
	public void consumer01() throws Exception{
		Channel channel=conn.createChannel();
		String queue="work";
		channel.queueDeclare(queue, false, false, false, null);
		QueueingConsumer consumer=new QueueingConsumer(channel);
		channel.basicQos(1);//消費者不執行回執確認,queue只最多傳送一條訊息
		channel.basicConsume(queue, false,consumer);
		while(true){
			Delivery delivery=consumer.nextDelivery();
			System.out.println("消費者1接收到訊息:"+new String(delivery.getBody()));
			Thread.sleep(10);
			channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
		}
	}
	
	//消費者2
		@Test
		public void consumer02() throws Exception{
			Channel channel=conn.createChannel();
			String queue="work";
			channel.queueDeclare(queue, false, false, false, null);
			QueueingConsumer consumer=new QueueingConsumer(channel);
			channel.basicQos(1);//消費者不執行回執確認,queue只最多傳送一條訊息
			channel.basicConsume(queue, false,consumer);
			while(true){
				Delivery delivery=consumer.nextDelivery();
				System.out.println("消費者2接收到訊息:"+new String(delivery.getBody()));
				Thread.sleep(100);
				channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
			}
		}
	
}

三、釋出訂閱(publish/fanout)

package com.jt.test.rabbitmq;

import org.junit.Before;
import org.junit.Test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

/**
 * 一個生產者,傳送到釋出訂閱型別的交換機訊息
 * 同步複製到所有與該交換機連線繫結的佇列+ 
 * @author DGHxj
 *
 */
public class PublishTest {
	private Connection conn;
	@Before//Test執行之前,@Before的方法自動換行一次
	public void getResource() throws Exception{
		/*
		 * 建立連線工程
		 * 獲取長連結
		 */
		ConnectionFactory factory=new ConnectionFactory();
		//com.rabbitmq.client
		//建立連線,獲取登入資訊
		factory.setHost("10.42.39.48");
		factory.setPort(5672);
		factory.setUsername("easymall");
		factory.setPassword("123456");
		factory.setVirtualHost("/easymall");
		//從工廠中獲取連線物件
		conn=factory.newConnection();
	}
	
	private static final String exchange="fanout1807";
	private static final String queue01="fanoutQ01";
	private static final String queue02="fanoutQ02";
	
	//生產端程式碼
	@Test
	public void productor() throws Exception{
		//自定義完成一個釋出訂閱型別的交換機
		Channel channel=conn.createChannel();
		//宣告佇列可以在生產端也可以在消費端,交換機也是如此
		channel.exchangeDeclare(exchange, "fanout");//名稱,type
		//topic direct fanout headers
		//傳送訊息
		for(int i=0;i<100;i++){
			String msg="hello fanout:"+i;
			channel.basicPublish(exchange, "", null, msg.getBytes());
		}
	}
	
	//消費端01
	@Test
	public void consumer01() throws Exception{
		
		Channel channel=conn.createChannel();
		//宣告佇列
		channel.queueDeclare(queue01, false, false, false, null);
		//宣告交換機
		channel.exchangeDeclare(exchange, "fanout");
		//繫結佇列到交換機
		channel.queueBind(queue01, exchange, "");
		//消費者物件
		QueueingConsumer consumer=new QueueingConsumer(channel);
		//繫結消費者佇列
		channel.basicConsume(queue01, true, consumer);
		while(true){
			Delivery delivery=consumer.nextDelivery();
			System.out.println("消費者01接收到:"+new String(delivery.getBody()));
		}
	}
	
	//消費端02
		@Test
		public void consumer02() throws Exception{
			
			Channel channel=conn.createChannel();
			//宣告佇列
			channel.queueDeclare(queue02, false, false, false, null);
			//宣告交換機
			channel.exchangeDeclare(exchange, "fanout");
			//繫結佇列到交換機
			channel.queueBind(queue02, exchange, "");
			//消費者物件
			QueueingConsumer consumer=new QueueingConsumer(channel);
			//繫結消費者佇列
			channel.basicConsume(queue02, true, consumer);
			while(true){
				Delivery delivery=consumer.nextDelivery();
				System.out.println("消費者02接收到:"+new String(delivery.getBody()));
			}
		}	
	
}

四、路由模式(routing/direct)

package com.jt.test.rabbitmq;

import org.junit.Before;
import org.junit.Test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

import ch.qos.logback.core.net.SyslogOutputStream;

/**
 * 測試路由模式
 * 訊息攜帶具體的路由key
 * 繫結到交換機的不同佇列使用不同的路由key
 * 根據匹配的訊息會發送到目的queue
 * @author DGHxj
 *
 */
public class RoutingTest {
	private Connection conn;
	@Before
	public void getResource() throws Exception{
		ConnectionFactory factory=new ConnectionFactory();
		factory.setHost("10.42.39.48");
		factory.setPort(5672);
		factory.setUsername("easymall");
		factory.setPassword("123456");
		factory.setVirtualHost("/easymall");
		conn=factory.newConnection();
	}
	
	private static final String exchange="direct1807";
	private static final String queue01="directQ01";
	private static final String queue02="directQ02";
	
	//生產者程式碼
	@Test
	public void productor() throws Exception{
		Channel channel=conn.createChannel();
		channel.exchangeDeclare(exchange, "direct");
		
		for(int i=0;i<50;i++){
			String msg="hello direct update:"+i;
			channel.basicPublish(exchange, "item.update", null, msg.getBytes());
			String msg2="hello direct add:"+i;
			channel.basicPublish(exchange, "item.add", null, msg2.getBytes());
		}
	}
	
	//消費者程式碼
	@Test
	public void consumer01() throws Exception{
		//通過長連結建立短連線
		Channel channel=conn.createChannel();
		//宣告佇列
		channel.queueDeclare(queue01, false, false, false, null);
		//宣告交換機
		channel.exchangeDeclare(exchange, "direct");
		//繫結佇列和交換機
		channel.queueBind(queue01, exchange, "item.update");
		//消費者物件
		QueueingConsumer consumer=new QueueingConsumer(channel);
		//繫結消費者與佇列
		channel.basicConsume(queue01, true, consumer);
		//實時監控生產者
		while(true){
			//建立接受物件delivery
			Delivery delivery=consumer.nextDelivery();
			System.out.println("消費者01接收到:"+new String(delivery.getBody()));
		}
	}
	//消費者程式碼
	@Test
	public void consumer02() throws Exception{
		//通過長連結建立短連線
		Channel channel=conn.createChannel();
		//宣告佇列
		channel.queueDeclare(queue02, false, false, false, null);
		//宣告交換機
		channel.exchangeDeclare(exchange, "direct");
		//繫結交換機和佇列
		channel.queueBind(queue02, exchange, "item.add");
		//消費者物件
		QueueingConsumer consumer=new QueueingConsumer(channel);
		//繫結佇列和消費者物件
		channel.basicConsume(queue02, true, consumer);
		//實時監聽
		while(true){
			//建立接收物件
			Delivery delivery=consumer.nextDelivery();
			System.out.println("消費者02接收到:"+new String(delivery.getBody()));
		}
		
	}
			
}

五、topic主題模式

package com.jt.test.rabbitmq;

import org.junit.Before;
import org.junit.Test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

/**
 * 測試topic主題模式 範圍的轉發
 * 
 * @author DGHxj
 *
 */
public class TopicTest {
	private Connection conn;

	@Before
	public void getResource() throws Exception {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("10.42.39.48");
		factory.setPort(5672);
		factory.setUsername("easymall");
		factory.setPassword("123456");
		factory.setVirtualHost("/easymall");
		conn = factory.newConnection();
	}

	private static final String exchange = "topics1807";
	private static final String queue01 = "topicQ01";
	private static final String queue02 = "topicQ02";
	
	//#表示任意,*表示一個字串
	//生產者程式碼
	@Test
	public void productor() throws Exception{
		Channel channel=conn.createChannel();
		channel.exchangeDeclare(exchange, "topic");
		for(int i=0;i<50;i++){
			String msg="hello topic item.#:"+i;
			channel.basicPublish(exchange, "12.item.add", null, msg.getBytes());
			
			String msg2="hello topic *.item:"+i;
			channel.basicPublish(exchange, "tedu.item", null, msg2.getBytes());
			
		}
	}

	// 消費者程式碼
	@Test
	public void consumer01() throws Exception {
		Channel channel = conn.createChannel();
		channel.queueDeclare(queue01, false, false, false, null);
		//channel.exchangeDeclare(exchange, "topic");
		channel.queueBind(queue01, exchange, "#.item.#");
		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(queue01, true, consumer);
		while (true) {
			Delivery delivery = consumer.nextDelivery();
			System.out.println("消費者01獲得:" + new String(delivery.getBody()));
		}
	}

	// 消費者程式碼
	@Test
	public void consumer02() throws Exception {
		Channel channel = conn.createChannel();
		channel.queueDeclare(queue02, false, false, false, null);
		channel.exchangeDeclare(exchange, "topic");
		channel.queueBind(queue02, exchange, "*.item");
		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(queue02, true, consumer);
		while (true) {
			Delivery delivery = consumer.nextDelivery();
			System.out.println("消費者02獲得:" + new String(delivery.getBody()));
		}
	}

}