九.springboot+rabbitmq(文章寫的比較好) 以及 java版詳細例子
阿新 • • 發佈:2019-01-23
package com.tiglle.producer.main;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Created by Administrator on 2018/7/15 0015.
*/
public class ProducerMain {
public static void main(String[] args) throws Exception {
fanoutExchange();
}
//預設交換機
public static void defauleExchange()throws Exception {
//通過連線工程獲取連線(長連線,如果不關閉,會一直連線)
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost" );
factory.setPort(5672);
factory.setUsername("tiglle");
factory.setPassword("hateyou75");
//指定虛擬機器,不指定預設為"/"這個虛擬機器(TiglleHost 不等於 /TiglleHost)
factory.setVirtualHost("TiglleHost");
//從連線中獲取通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/**
*宣告一個列隊queue
* 1.queue:列隊名稱
* 2.durable:是否持久化
* 3.exclusive:是否排他性:
* 只有 首次宣告它的連線 (假如為A Connection)可見,A Connection的不同通道是可見的。其他連線不可訪問和宣告此相同的列隊
* 如果試圖在一個不同的連線中重新宣告或訪問(如publish,consume)該排他性佇列,會得到資源被鎖定的錯誤:ESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'DefauleQueue1'
* 會在其連線斷開的時候自動刪除(A Connection連線close或者客戶端程式退出了),不管這個佇列是否被宣告成永續性的(Durable =true)。
* exclusive列隊只能消費者宣告,消費者斷開就刪除。(因為消費者的Connection直接和Queue通訊,提供者的Connection是和Exchange通訊的。)
* 4..autoDelete:當所有消費者斷開連線後,自動刪除,對於持久化的列隊
* autoDelete的列隊剛開始建立,沒有消費者連線的時候,一直存在。一旦有消費者連線,並且所有消費者斷開後,會自動刪除
* 5.
*/
channel.queueDeclare("DefauleQueue2",true,false,true,null);
/**
* 1.exchange:為""的時候使用預設的交換機
* 預設交換機為direct 型別:通過訊息的routingKey,與列隊和交換機的bindingKey(有些也叫做列隊和交換機的routingKey)對比,相同則傳送到該列隊
* 2.routingKey:訊息的屬性,在預設交換機中,訊息的routingKey和queue的name進行比較(bindingKey為訊息的名稱)
* 3.
* 4.body:訊息主體
*/
byte[] body = "first message1".getBytes();
/**
* 傳送訊息
* 1.exchange 列隊的名稱,預設direct的列隊名稱為""
* 2.routingKey 路右鍵的名稱,需要和目queue隊繫結exchange的bindingKey相同
* 3.
* 4.訊息體
*/
channel.basicPublish("","DefauleQueue2",null,body);
channel.close();
connection.close();
}
//direct型別的交換機
public static void directExchange() throws Exception {
//通過連線工程獲取連線(長連線,如果不關閉,會一直連線)
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("tiglle");
factory.setPassword("hateyou75");
//指定虛擬機器,不指定預設為"/"這個虛擬機器(TiglleHost 不等於 /TiglleHost)
factory.setVirtualHost("TiglleHost");
//從連線中獲取通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/**
* 宣告direct型別的exchange (根據訊息的routingKey和列隊與交換機繫結的時候的bindingKey對比,來路由訊息)
* 1.exchange 交換機的名稱
* 2.BuiltinExchangeType 交換機的型別(列舉) direct、fanout、topic、header
*/
channel.exchangeDeclare("direct-exchange1", BuiltinExchangeType.DIRECT);
/**
*宣告一個列隊queue
* 1.queue:列隊名稱
* 2.durable:是否持久化
* 3.exclusive:是否排他性:
* 只有 首次宣告它的連線 (假如為A Connection)可見,A Connection的不同通道是可見的。其他連線不可訪問和宣告此相同的列隊
* 如果試圖在一個不同的連線中重新宣告或訪問(如publish,consume)該排他性佇列,會得到資源被鎖定的錯誤:ESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'DefauleQueue1'
* 會在其連線斷開的時候自動刪除(A Connection連線close或者客戶端程式退出了),不管這個佇列是否被宣告成永續性的(Durable =true)。
* exclusive列隊只能消費者宣告,消費者斷開就刪除。(因為消費者的Connection直接和Queue通訊,提供者的Connection是和Exchange通訊的。)
* 4..autoDelete:當所有消費者斷開連線後,自動刪除,對於持久化的列隊
* autoDelete的列隊剛開始建立,沒有消費者連線的時候,一直存在。一旦有消費者連線,並且所有消費者斷開後,會自動刪除
* 5.
*/
channel.queueDeclare("direct-queue1",true,false,false,null);
/**
* 列隊和交換機的繫結
* 1. queue 列隊的名稱
* 2. exchange 交換機的名稱
* 3.routingKey 繫結標識,可以叫bandingKey
*/
channel.queueBind("direct-queue1","direct-exchange1","direct-routingkey1");
byte[] body = "第一條訊息".getBytes();
/**
* 傳送訊息
* 1.exchange 列隊的名稱
* 2.routingKey 路右鍵的名稱,需要和目queue隊繫結exchange的bindingKey相同
* 3.
* 4.訊息體
*/
channel.basicPublish("direct-exchange1","direct-routingkey1",null,body);
channel.close();
connection.close();
}
//topic型別的交換機
public static void topicExchange() throws Exception {
//通過連線工程獲取連線(長連線,如果不關閉,會一直連線)
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("tiglle");
factory.setPassword("hateyou75");
//指定虛擬機器,不指定預設為"/"這個虛擬機器(TiglleHost 不等於 /TiglleHost)
factory.setVirtualHost("TiglleHost");
//從連線中獲取通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/**
* 宣告topic型別的exchange (相當於有萬用字元 * 一個單詞 #0個或多個單詞,用 . 隔開的direct交換機) 注意是單詞不是字母
* 1.exchange 交換機的名稱
* 2.BuiltinExchangeType 交換機的型別(列舉) direct、fanout、topic、header
*/
channel.exchangeDeclare("topic-exchange1", BuiltinExchangeType.TOPIC);
/**
*宣告一個列隊queue
* 1.queue:列隊名稱
* 2.durable:是否持久化
* 3.exclusive:是否排他性:
* 只有 首次宣告它的連線 (假如為A Connection)可見,A Connection的不同通道是可見的。其他連線不可訪問和宣告此相同的列隊
* 如果試圖在一個不同的連線中重新宣告或訪問(如publish,consume)該排他性佇列,會得到資源被鎖定的錯誤:ESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'DefauleQueue1'
* 會在其連線斷開的時候自動刪除(A Connection連線close或者客戶端程式退出了),不管這個佇列是否被宣告成永續性的(Durable =true)。
* exclusive列隊只能消費者宣告,消費者斷開就刪除。(因為消費者的Connection直接和Queue通訊,提供者的Connection是和Exchange通訊的。)
* 4..autoDelete:當所有消費者斷開連線後,自動刪除,對於持久化的列隊
* autoDelete的列隊剛開始建立,沒有消費者連線的時候,一直存在。一旦有消費者連線,並且所有消費者斷開後,會自動刪除
* 5.
*/
channel.queueDeclare("topic.queue1",true,false,false,null);
/**
* 列隊和交換機的繫結
* 1. queue 列隊的名稱
* 2. exchange 交換機的名稱
* 3.routingKey 繫結標識,可以叫bandingKey,可以使用萬用字元 * 一個單詞 # 0個或多個單詞,用 . 隔開
*/
channel.queueBind("topic.queue1","topic-exchange1","topic.routingkey.*");
byte[] body = "第一條訊息".getBytes();
/**
* 傳送訊息
* 1.exchange 列隊的名稱
* 2.routingKey 路右鍵的名稱,需要和目queue隊繫結exchange的bindingKey相同
* 3.
* 4.訊息體
*/
channel.basicPublish("topic-exchange1","topic.routingkey.test",null,body);//(此處可以通配topic.routingkey.* ,*為一個單詞)
channel.basicPublish("topic-exchange1","topic.routingkey.test.nihao",null,body);//(此處不能通配topic.routingkey.* ,因為有多個單詞)
channel.close();
connection.close();
}
//fanout型別的交換機
public static void fanoutExchange() throws Exception {
//通過連線工程獲取連線(長連線,如果不關閉,會一直連線)
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("tiglle");
factory.setPassword("hateyou75");
//指定虛擬機器,不指定預設為"/"這個虛擬機器(TiglleHost 不等於 /TiglleHost)
factory.setVirtualHost("TiglleHost");
//從連線中獲取通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/**
* 宣告fanout型別的exchange 傳送給所有繫結的列隊,忽略bindingKey
* 1.exchange 交換機的名稱
* 2.BuiltinExchangeType 交換機的型別(列舉) direct、fanout、topic、header
*/
channel.exchangeDeclare("fanout-exchange1", BuiltinExchangeType.FANOUT);
/**
*宣告一個列隊queue
* 1.queue:列隊名稱
* 2.durable:是否持久化
* 3.exclusive:是否排他性:
* 只有 首次宣告它的連線 (假如為A Connection)可見,A Connection的不同通道是可見的。其他連線不可訪問和宣告此相同的列隊
* 如果試圖在一個不同的連線中重新宣告或訪問(如publish,consume)該排他性佇列,會得到資源被鎖定的錯誤:ESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'DefauleQueue1'
* 會在其連線斷開的時候自動刪除(A Connection連線close或者客戶端程式退出了),不管這個佇列是否被宣告成永續性的(Durable =true)。
* exclusive列隊只能消費者宣告,消費者斷開就刪除。(因為消費者的Connection直接和Queue通訊,提供者的Connection是和Exchange通訊的。)
* 4..autoDelete:當所有消費者斷開連線後,自動刪除,對於持久化的列隊
* autoDelete的列隊剛開始建立,沒有消費者連線的時候,一直存在。一旦有消費者連線,並且所有消費者斷開後,會自動刪除
* 5.
*/
channel.queueDeclare("fanout-queue1",true,false,false,null);
channel.queueDeclare("fanout-queue2",true,false,false,null);
/**
* 列隊和交換機的繫結
* 1. queue 列隊的名稱
* 2. exchange 交換機的名稱
* 3.routingKey 繫結標識,可以叫bandingKey,可以使用萬用字元 * 一個單詞 # 0個或多個單詞,用 . 隔開
*/
channel.queueBind("fanout-queue1","fanout-exchange1","DoNotUse1");
channel.queueBind("fanout-queue2","fanout-exchange1","DoNotUse2");
byte[] body = "第一條訊息".getBytes();
/**
* 傳送訊息
* 1.exchange 列隊的名稱
* 2.routingKey 路右鍵的名稱,需要和目queue隊繫結exchange的bindingKey相同
* 3.
* 4.訊息體
*/
channel.basicPublish("fanout-exchange1","DoNotUse1",null,body);
channel.close();
connection.close();
}
}