1. 程式人生 > >九.springboot+rabbitmq(文章寫的比較好) 以及 java版詳細例子

九.springboot+rabbitmq(文章寫的比較好) 以及 java版詳細例子

package com.tiglle.producer.main;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Created by Administrator on 2018/7/15 0015.
 */
public class ProducerMain { public static void main(String[] args) throws Exception { fanoutExchange(); } //預設交換機 public static void defauleExchange()throws Exception { //通過連線工程獲取連線(長連線,如果不關閉,會一直連線) ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"
); factory.setPort(5672); factory.setUsername("tiglle"); factory.setPassword("hateyou75"); //指定虛擬機器,不指定預設為"/"這個虛擬機器(TiglleHost 不等於 /TiglleHost) factory.setVirtualHost("TiglleHost"); //從連線中獲取通道 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /** *宣告一個列隊queue * 1.queue:列隊名稱 * 2.durable:是否持久化 * 3.exclusive:是否排他性: * 只有 首次宣告它的連線 (假如為A Connection)可見,A Connection的不同通道是可見的。其他連線不可訪問和宣告此相同的列隊 * 如果試圖在一個不同的連線中重新宣告或訪問(如publish,consume)該排他性佇列,會得到資源被鎖定的錯誤:ESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'DefauleQueue1' * 會在其連線斷開的時候自動刪除(A Connection連線close或者客戶端程式退出了),不管這個佇列是否被宣告成永續性的(Durable =true)。 * exclusive列隊只能消費者宣告,消費者斷開就刪除。(因為消費者的Connection直接和Queue通訊,提供者的Connection是和Exchange通訊的。) * 4..autoDelete:當所有消費者斷開連線後,自動刪除,對於持久化的列隊 * autoDelete的列隊剛開始建立,沒有消費者連線的時候,一直存在。一旦有消費者連線,並且所有消費者斷開後,會自動刪除 * 5. */
channel.queueDeclare("DefauleQueue2",true,false,true,null); /** * 1.exchange:為""的時候使用預設的交換機 * 預設交換機為direct 型別:通過訊息的routingKey,與列隊和交換機的bindingKey(有些也叫做列隊和交換機的routingKey)對比,相同則傳送到該列隊 * 2.routingKey:訊息的屬性,在預設交換機中,訊息的routingKey和queue的name進行比較(bindingKey為訊息的名稱) * 3. * 4.body:訊息主體 */ byte[] body = "first message1".getBytes(); /** * 傳送訊息 * 1.exchange 列隊的名稱,預設direct的列隊名稱為"" * 2.routingKey 路右鍵的名稱,需要和目queue隊繫結exchange的bindingKey相同 * 3. * 4.訊息體 */ channel.basicPublish("","DefauleQueue2",null,body); channel.close(); connection.close(); } //direct型別的交換機 public static void directExchange() throws Exception { //通過連線工程獲取連線(長連線,如果不關閉,會一直連線) ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("tiglle"); factory.setPassword("hateyou75"); //指定虛擬機器,不指定預設為"/"這個虛擬機器(TiglleHost 不等於 /TiglleHost) factory.setVirtualHost("TiglleHost"); //從連線中獲取通道 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /** * 宣告direct型別的exchange (根據訊息的routingKey和列隊與交換機繫結的時候的bindingKey對比,來路由訊息) * 1.exchange 交換機的名稱 * 2.BuiltinExchangeType 交換機的型別(列舉) direct、fanout、topic、header */ channel.exchangeDeclare("direct-exchange1", BuiltinExchangeType.DIRECT); /** *宣告一個列隊queue * 1.queue:列隊名稱 * 2.durable:是否持久化 * 3.exclusive:是否排他性: * 只有 首次宣告它的連線 (假如為A Connection)可見,A Connection的不同通道是可見的。其他連線不可訪問和宣告此相同的列隊 * 如果試圖在一個不同的連線中重新宣告或訪問(如publish,consume)該排他性佇列,會得到資源被鎖定的錯誤:ESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'DefauleQueue1' * 會在其連線斷開的時候自動刪除(A Connection連線close或者客戶端程式退出了),不管這個佇列是否被宣告成永續性的(Durable =true)。 * exclusive列隊只能消費者宣告,消費者斷開就刪除。(因為消費者的Connection直接和Queue通訊,提供者的Connection是和Exchange通訊的。) * 4..autoDelete:當所有消費者斷開連線後,自動刪除,對於持久化的列隊 * autoDelete的列隊剛開始建立,沒有消費者連線的時候,一直存在。一旦有消費者連線,並且所有消費者斷開後,會自動刪除 * 5. */ channel.queueDeclare("direct-queue1",true,false,false,null); /** * 列隊和交換機的繫結 * 1. queue 列隊的名稱 * 2. exchange 交換機的名稱 * 3.routingKey 繫結標識,可以叫bandingKey */ channel.queueBind("direct-queue1","direct-exchange1","direct-routingkey1"); byte[] body = "第一條訊息".getBytes(); /** * 傳送訊息 * 1.exchange 列隊的名稱 * 2.routingKey 路右鍵的名稱,需要和目queue隊繫結exchange的bindingKey相同 * 3. * 4.訊息體 */ channel.basicPublish("direct-exchange1","direct-routingkey1",null,body); channel.close(); connection.close(); } //topic型別的交換機 public static void topicExchange() throws Exception { //通過連線工程獲取連線(長連線,如果不關閉,會一直連線) ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("tiglle"); factory.setPassword("hateyou75"); //指定虛擬機器,不指定預設為"/"這個虛擬機器(TiglleHost 不等於 /TiglleHost) factory.setVirtualHost("TiglleHost"); //從連線中獲取通道 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /** * 宣告topic型別的exchange (相當於有萬用字元 * 一個單詞 #0個或多個單詞,用 . 隔開的direct交換機) 注意是單詞不是字母 * 1.exchange 交換機的名稱 * 2.BuiltinExchangeType 交換機的型別(列舉) direct、fanout、topic、header */ channel.exchangeDeclare("topic-exchange1", BuiltinExchangeType.TOPIC); /** *宣告一個列隊queue * 1.queue:列隊名稱 * 2.durable:是否持久化 * 3.exclusive:是否排他性: * 只有 首次宣告它的連線 (假如為A Connection)可見,A Connection的不同通道是可見的。其他連線不可訪問和宣告此相同的列隊 * 如果試圖在一個不同的連線中重新宣告或訪問(如publish,consume)該排他性佇列,會得到資源被鎖定的錯誤:ESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'DefauleQueue1' * 會在其連線斷開的時候自動刪除(A Connection連線close或者客戶端程式退出了),不管這個佇列是否被宣告成永續性的(Durable =true)。 * exclusive列隊只能消費者宣告,消費者斷開就刪除。(因為消費者的Connection直接和Queue通訊,提供者的Connection是和Exchange通訊的。) * 4..autoDelete:當所有消費者斷開連線後,自動刪除,對於持久化的列隊 * autoDelete的列隊剛開始建立,沒有消費者連線的時候,一直存在。一旦有消費者連線,並且所有消費者斷開後,會自動刪除 * 5. */ channel.queueDeclare("topic.queue1",true,false,false,null); /** * 列隊和交換機的繫結 * 1. queue 列隊的名稱 * 2. exchange 交換機的名稱 * 3.routingKey 繫結標識,可以叫bandingKey,可以使用萬用字元 * 一個單詞 # 0個或多個單詞,用 . 隔開 */ channel.queueBind("topic.queue1","topic-exchange1","topic.routingkey.*"); byte[] body = "第一條訊息".getBytes(); /** * 傳送訊息 * 1.exchange 列隊的名稱 * 2.routingKey 路右鍵的名稱,需要和目queue隊繫結exchange的bindingKey相同 * 3. * 4.訊息體 */ channel.basicPublish("topic-exchange1","topic.routingkey.test",null,body);//(此處可以通配topic.routingkey.* ,*為一個單詞) channel.basicPublish("topic-exchange1","topic.routingkey.test.nihao",null,body);//(此處不能通配topic.routingkey.* ,因為有多個單詞) channel.close(); connection.close(); } //fanout型別的交換機 public static void fanoutExchange() throws Exception { //通過連線工程獲取連線(長連線,如果不關閉,會一直連線) ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("tiglle"); factory.setPassword("hateyou75"); //指定虛擬機器,不指定預設為"/"這個虛擬機器(TiglleHost 不等於 /TiglleHost) factory.setVirtualHost("TiglleHost"); //從連線中獲取通道 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /** * 宣告fanout型別的exchange 傳送給所有繫結的列隊,忽略bindingKey * 1.exchange 交換機的名稱 * 2.BuiltinExchangeType 交換機的型別(列舉) direct、fanout、topic、header */ channel.exchangeDeclare("fanout-exchange1", BuiltinExchangeType.FANOUT); /** *宣告一個列隊queue * 1.queue:列隊名稱 * 2.durable:是否持久化 * 3.exclusive:是否排他性: * 只有 首次宣告它的連線 (假如為A Connection)可見,A Connection的不同通道是可見的。其他連線不可訪問和宣告此相同的列隊 * 如果試圖在一個不同的連線中重新宣告或訪問(如publish,consume)該排他性佇列,會得到資源被鎖定的錯誤:ESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'DefauleQueue1' * 會在其連線斷開的時候自動刪除(A Connection連線close或者客戶端程式退出了),不管這個佇列是否被宣告成永續性的(Durable =true)。 * exclusive列隊只能消費者宣告,消費者斷開就刪除。(因為消費者的Connection直接和Queue通訊,提供者的Connection是和Exchange通訊的。) * 4..autoDelete:當所有消費者斷開連線後,自動刪除,對於持久化的列隊 * autoDelete的列隊剛開始建立,沒有消費者連線的時候,一直存在。一旦有消費者連線,並且所有消費者斷開後,會自動刪除 * 5. */ channel.queueDeclare("fanout-queue1",true,false,false,null); channel.queueDeclare("fanout-queue2",true,false,false,null); /** * 列隊和交換機的繫結 * 1. queue 列隊的名稱 * 2. exchange 交換機的名稱 * 3.routingKey 繫結標識,可以叫bandingKey,可以使用萬用字元 * 一個單詞 # 0個或多個單詞,用 . 隔開 */ channel.queueBind("fanout-queue1","fanout-exchange1","DoNotUse1"); channel.queueBind("fanout-queue2","fanout-exchange1","DoNotUse2"); byte[] body = "第一條訊息".getBytes(); /** * 傳送訊息 * 1.exchange 列隊的名稱 * 2.routingKey 路右鍵的名稱,需要和目queue隊繫結exchange的bindingKey相同 * 3. * 4.訊息體 */ channel.basicPublish("fanout-exchange1","DoNotUse1",null,body); channel.close(); connection.close(); } }