1. 程式人生 > >Rabbitmq的五種模式和案例

Rabbitmq的五種模式和案例

1.simple模式

 

訊息生產者p將訊息放入佇列
消費者監聽佇列
,如果佇列中有訊息,就消費掉,訊息被拿走後,自動從佇列刪除
(隱患,訊息可能沒有被消費者正確處理,已經消失了,無法恢復)

應用場景:聊天室

案例:

1>.首先準備依賴

<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
	</dependency>
2>.寫一個test類
public class SimpleTest {
	//模擬生產者將訊息放入佇列
	@Test
	public void send() throws Exception{
		/*1 建立連線工廠
		 * 2 配置共創config
		 * 3 獲取連線
		 * 4獲取通道
		 * 5 從通道宣告queue
		 * 6 傳送訊息
		 * 7 釋放資源
		 */
		ConnectionFactory factory=new ConnectionFactory();
		factory.setHost("106.23.34.56");
		factory.setPort(5672);
		factory.setVirtualHost("/tb");
		factory.setUsername("admin");
		factory.setPassword("123456");
		//從工廠獲取連線
		Connection conn=factory.newConnection();
		//從連接獲取通道
		Channel chan=conn.createChannel();
		//利用channel宣告第一個佇列
		chan.queueDeclare("simple", false, false, false, null);
	    //queue String型別,表示宣告的queue對列的名字
		//durable Boolean型別,表示是否持久化
		//exclusive Boolean型別:當前宣告的queue是否專注;true當前連線建立的
		//任何channle都可以連線這個queue,false,新的channel不可使用
		//autoDelete Boolean型別:在最後連線使用完成後,是否刪除佇列,false
		//arguments Map型別,其他宣告引數
		//傳送訊息
		String msg="helloworld,nihaoa";
		chan.basicPublish("", "simple", null, msg.getBytes());
		//exchange String型別,交換機名稱,簡單模式使用預設交換""
		//routingkey String型別,當前的訊息繫結的routingkey,簡單模式下,與佇列同名即可
		//props BasicProperties型別,訊息的屬性欄位物件,例如BasicProperties
		//可以設定一個deliveryMode的值0 持久化,1 表示不持久化,durable配合使用
		//body byte[] :訊息字串的byte陣列
	}
	//模擬消費端
	@Test
	public void receive() throws Exception{
		
                ConnectionFactory factory=new ConnectionFactory();
		factory.setHost("106.23.34.56");
		factory.setPort(5672);
		factory.setVirtualHost("/tb");
		factory.setUsername("admin");
		factory.setPassword("123456");
		//從工廠獲取連線
Connection conn=factory.newConnection();//從連接獲取通道Channel chan=conn.createChannel();chan.queueDeclare("simple", false, false, false, null);//建立一個消費者QueueingConsumer consumer= new QueueingConsumer(chan);chan.basicConsume("simple", consumer);//監聽佇列while(true){//獲取下一個delivery,delivery從佇列獲取訊息Delivery delivery = consumer.nextDelivery();String msg=new String(delivery.getBody());System.out.println(msg);}}}

2.work模式


生產者將訊息放入佇列
多個消費者同時監聽同一個佇列
,訊息如何被消費?
C1,C2
共同爭搶當前訊息佇列的內容,誰先拿到訊息,誰來負責消費
應用場景
:紅包;大型專案中的資源排程過程(直接由最空閒的系統爭搶到資源處理任務)

案例:

1>首先寫一個工具類

public class ConnectionUtil {
	
	public static Connection getConn(){
		try{
			ConnectionFactory factory=new ConnectionFactory();
			factory.setHost("106.33.44.179");
			factory.setPort(5672);
			factory.setVirtualHost("/tb");
			factory.setUsername("admin");
			factory.setPassword("123456");
		
			//從工廠獲取連線
			Connection conn=factory.newConnection();
			return conn;
		}catch(Exception e){
			System.out.println(e.getMessage());
			return null;
		}
		
	}
}

2>寫test類

public class WorkTest {
	@Test
	public void send() throws Exception{
		//獲取連線
		Connection conn = ConnectionUtil.getConn();
		Channel chan = conn.createChannel();
		//宣告佇列
		chan.queueDeclare("work", false, false, false, null);
		for(int i=0;i<100;i++){
			String msg="1712,hello:"+i+"message";
			chan.basicPublish("", "work", null, msg.getBytes());
			System.out.println("第"+i+"條資訊已經發送");
		}
		chan.close();
		conn.close();
	}
	@Test
	public void receive1() throws Exception{
		//獲取連線,獲取通道
		Connection conn = ConnectionUtil.getConn();
		Channel chan = conn.createChannel();
		chan.queueDeclare("work", false, false, false, null);
		//同一時刻伺服器只發送一條訊息給同一消費者,消費者空閒,才傳送一條
		chan.basicQos(1);
		//定義消費者
		QueueingConsumer consumer=new QueueingConsumer(chan);
		//繫結佇列和消費者的關係
		//queue
		//autoAck:訊息被消費後,是否自動確認回執,如果false,不自動需要手動在
		//完成訊息消費後進行回執確認,channel.ack,channel.nack
		//callback
		//chan.basicConsume(queue, autoAck, callback)
		chan.basicConsume("work", false, consumer);
		//監聽
		while(true){
			Delivery delivery=consumer.nextDelivery();
			byte[] result = delivery.getBody();
			String msg=new String(result);
			System.out.println("接受到:"+msg);
			Thread.sleep(50);
			//返回伺服器,回執
			chan.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
		}	
	}
	@Test
	public void receive2() throws Exception{
		//獲取連線,獲取通道
		Connection conn = ConnectionUtil.getConn();
		Channel chan = conn.createChannel();
		chan.queueDeclare("work", false, false, false, null);
		//同一時刻伺服器只發送一條訊息給同一消費者,消費者空閒,才傳送一條
		chan.basicQos(1);
		//定義消費者
		QueueingConsumer consumer=new QueueingConsumer(chan);
		//繫結佇列和消費者的關係
		//queue
		//autoAck:訊息被消費後,是否自動確認回執,如果false,不自動需要手動在
		//完成訊息消費後進行回執確認,channel.ack,channel.nack
		//callback
		//chan.basicConsume(queue, autoAck, callback)
		chan.basicConsume("work", false, consumer);
		//監聽
		while(true){
			Delivery delivery=consumer.nextDelivery();
			byte[] result = delivery.getBody();
			String msg=new String(result);
			System.out.println("接受到:"+msg);
			Thread.sleep(150);
			//返回伺服器,回執
			chan.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
		}
	}
	
}

3 publish/fanout釋出訂閱

 

生產者將訊息交給交換機
有交換機根據釋出訂閱的模式設定將訊息同步到所有的繫結佇列中
;
後端的消費者都能拿到訊息

應用場景:郵件群發,群聊天,廣告

案例:

public class FanoutTest {
	//交換機,有型別,釋出訂閱:fanout
	//路由模式:direct
	//主題模式:topic
	@Test
	public void send() throws Exception {
		//獲取連線
		Connection conn = ConnectionUtil.getConn();
		Channel chan = conn.createChannel();
		//宣告交換機
		//引數意義,1 交換機名稱,2 型別:fanout,direct,topic
		chan.exchangeDeclare("fanoutEx", "fanout");
		//傳送訊息
		for(int i=0;i<100;i++){
			String msg="1712 hello:"+i+"msg";
			chan.basicPublish("fanoutEx", "", null, msg.getBytes());
			System.out.println("第"+i+"條資訊已經發送");
		}
	}
	
	@Test
	public void receiv01() throws Exception{
		//獲取連線
		Connection conn = ConnectionUtil.getConn();
		Channel chan = conn.createChannel();
		//生命佇列
		chan.queueDeclare("fanout01", false, false, false, null);
		//宣告交換機
		chan.exchangeDeclare("fanoutEx", "fanout");
		//繫結佇列到交換機
		//引數 1 佇列名稱,2 交換機名稱 3 路由key
		chan.queueBind("fanout01", "fanoutEx", "");
		chan.basicQos(1);
		//定義消費者
		QueueingConsumer consumer=new QueueingConsumer(chan);
		//消費者與佇列繫結
		chan.basicConsume("fanout01",false, consumer);
		while(true){
			Delivery delivery= consumer.nextDelivery();
			System.out.println("一號消費者接收到"+
			new String(delivery.getBody()));
			chan.basicAck(delivery.getEnvelope().
					getDeliveryTag(), false);
		}
	}
	@Test
	public void receiv02() throws Exception{
		//獲取連線
		Connection conn = ConnectionUtil.getConn();
		Channel chan = conn.createChannel();
		//生命佇列
		chan.queueDeclare("fanout02", false, false, false, null);
		//宣告交換機
		chan.exchangeDeclare("fanoutEx", "fanout");
		//繫結佇列到交換機
		//引數 1 佇列名稱,2 交換機名稱 3 路由key
		chan.queueBind("fanout02", "fanoutEx", "");
		chan.basicQos(1);
		//定義消費者
		QueueingConsumer consumer=new QueueingConsumer(chan);
		//消費者與佇列繫結
		chan.basicConsume("fanout02",false, consumer);
		while(true){
			Delivery delivery= consumer.nextDelivery();
			System.out.println("二號消費者接收到"+new String(delivery.getBody()));
			chan.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
		}
	}
}

4 routing路由模式


生產者傳送訊息到交換機,同時繫結一個路由Key,交換機根據路由key對下游繫結的佇列進行路
key的判斷,滿足路由key的佇列才會接收到訊息,消費者消費訊息

應用場景: 專案中的error報錯

案例:

public class RoutingTopicTest {
	
	@Test
	public void routingSend() throws Exception{
		//獲取連線
		Connection conn = ConnectionUtil.getConn();
		Channel chan = conn.createChannel();
		//宣告交換機
		//引數意義,1 交換機名稱,2 型別:fanout,direct,topic
		chan.exchangeDeclare("directEx", "direct");
		//傳送訊息
		String msg="路由模式的訊息";
		chan.basicPublish("directEx", "jt1713", 
				null, msg.getBytes());
	}
	@Test
	public void routingRec01() throws Exception{
		System.out.println("一號消費者等待接收訊息");
		//獲取連線
		Connection conn = ConnectionUtil.getConn();
		Channel chan = conn.createChannel();
		//宣告佇列
		chan.queueDeclare("direct01", false, false, false, null);
		//宣告交換機
		chan.exchangeDeclare("directEx", "direct");
		//繫結佇列到交換機
		//引數 1 佇列名稱,2 交換機名稱 3 路由key
		chan.queueBind("direct01", "directEx", "jt1712");
		chan.basicQos(1);
		//定義消費者
		QueueingConsumer consumer=new QueueingConsumer(chan);
		//消費者與佇列繫結
		chan.basicConsume("direct01",false, consumer);
		while(true){
			Delivery delivery= consumer.nextDelivery();
			System.out.println("一號消費者接收到"+
			new String(delivery.getBody()));
			chan.basicAck(delivery.getEnvelope().
					getDeliveryTag(), false);
		}
	}
	@Test
	public void routingRec02() throws Exception{
		System.out.println("二號消費者等待接收訊息");
		//獲取連線
		Connection conn = ConnectionUtil.getConn();
		Channel chan = conn.createChannel();
		//宣告佇列
		chan.queueDeclare("direct02", false, false, false, null);
		//宣告交換機
		chan.exchangeDeclare("directEx", "direct");
		//繫結佇列到交換機
		//引數 1 佇列名稱,2 交換機名稱 3 路由key
		chan.queueBind("direct02", "directEx", "jt1711");
		chan.basicQos(1);
		//定義消費者
		QueueingConsumer consumer=new QueueingConsumer(chan);
		//消費者與佇列繫結
		chan.basicConsume("direct02",false, consumer);
		while(true){
			Delivery delivery= consumer.nextDelivery();
			System.out.println("二號消費者接收到"+
			new String(delivery.getBody()));
			chan.basicAck(delivery.getEnvelope().
					getDeliveryTag(), false);
		}
	}
}

5 topic主題模式


*號代表單個詞語
#代表多個詞語

其他的內容與routing路由模式一致

案例:

public class RoutingTopicTest {
	
	
	@Test
	public void routingRec02() throws Exception{
		System.out.println("二號消費者等待接收訊息");
		//獲取連線
		Connection conn = ConnectionUtil.getConn();
		Channel chan = conn.createChannel();
		//宣告佇列
		chan.queueDeclare("direct02", false, false, false, null);
		//宣告交換機
		chan.exchangeDeclare("directEx", "direct");
		//繫結佇列到交換機
		//引數 1 佇列名稱,2 交換機名稱 3 路由key
		chan.queueBind("direct02", "directEx", "jt1711");
		chan.basicQos(1);
		//定義消費者
		QueueingConsumer consumer=new QueueingConsumer(chan);
		//消費者與佇列繫結
		chan.basicConsume("direct02",false, consumer);
		while(true){
			Delivery delivery= consumer.nextDelivery();
			System.out.println("二號消費者接收到"+
			new String(delivery.getBody()));
			chan.basicAck(delivery.getEnvelope().
					getDeliveryTag(), false);
		}
	}
	
	@Test
	public void topicSend() throws Exception{
		//獲取連線
		Connection conn = ConnectionUtil.getConn();
		Channel chan = conn.createChannel();
		//宣告交換機
		//引數意義,1 交換機名稱,2 型別:fanout,direct,topic
		chan.exchangeDeclare("topicEx", "topic");
		//傳送訊息
		String msg="主題模式的訊息";
		chan.basicPublish("topicEx", "jt1712.add.update", 
				null, msg.getBytes());
	}
	@Test
	public void topicRec01() throws Exception{
		System.out.println("一號消費者等待接收訊息");
		//獲取連線
		Connection conn = ConnectionUtil.getConn();
		Channel chan = conn.createChannel();
		//宣告佇列
		chan.queueDeclare("topic01", false, false, false, null);
		//宣告交換機
		chan.exchangeDeclare("topicEx", "topic");
		//繫結佇列到交換機
		//引數 1 佇列名稱,2 交換機名稱 3 路由key
		chan.queueBind("topic01", "topicEx", "jt1712");
		chan.basicQos(1);
		//定義消費者
		QueueingConsumer consumer=new QueueingConsumer(chan);
		//消費者與佇列繫結
		chan.basicConsume("topic01",false, consumer);
		while(true){
			Delivery delivery= consumer.nextDelivery();
			System.out.println("一號消費者接收到"+
			new String(delivery.getBody()));
			chan.basicAck(delivery.getEnvelope().
					getDeliveryTag(), false);
		}
	}
	@Test
	public void topicRec02() throws Exception{
		System.out.println("二號消費者等待接收訊息");
		//獲取連線
		Connection conn = ConnectionUtil.getConn();
		Channel chan = conn.createChannel();
		//宣告佇列
		chan.queueDeclare("topic02", false, false, false, null);
		//宣告交換機
		chan.exchangeDeclare("topicEx", "topic");
		//繫結佇列到交換機
		//引數 1 佇列名稱,2 交換機名稱 3 路由key
		chan.queueBind("topic02", "topicEx", "jt1712.#");
		chan.basicQos(1);
		//定義消費者
		QueueingConsumer consumer=new QueueingConsumer(chan);
		//消費者與佇列繫結
		chan.basicConsume("topic02",false, consumer);
		while(true){
			Delivery delivery= consumer.nextDelivery();
			System.out.println("二號消費者接收到"+
			new String(delivery.getBody()));
			chan.basicAck(delivery.getEnvelope().
					getDeliveryTag(), false);
		}
	}
}