1. 程式人生 > >rabbitmq訊息佇列的簡單入門

rabbitmq訊息佇列的簡單入門

一rabbitmq

rabbitmq降低主程式執行緒壓力,是一種非阻塞模式的分散式訊息佇列伺服器,有生產者生產到rabbitmq,消費者消費

二。rabbitmq安裝

1.安裝rabbitmq

安裝過程 參考 (http://www.rabbitmq.com/install-rpm.html)

 rabbitmq-server 目前安裝包被包含在 Fedora rpm倉庫中 Fedora是epel庫

yum -y install epel-release.noarch  
檢視是否存在rabbitmq 然後安裝
yum search rabbitmq-server  
yum -y install rabbitmq-server  

檢視安裝包
tail -1000 /var/log/yum.log  
檢視rabbitmq-server被安裝的所有檔案的位置
/etc/logrotate.d/rabbitmq-server  
/etc/rabbitmq  
/etc/rabbitmq/rabbitmq.config  
/usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server  
/usr/lib/rabbitmq/bin  
/usr/lib/rabbitmq/bin/rabbitmq-defaults  
/usr/lib/rabbitmq/bin/rabbitmq-env  
/usr/lib/rabbitmq/bin/rabbitmq-plugins  
/usr/lib/rabbitmq/bin/rabbitmq-server  
/usr/lib/rabbitmq/bin/rabbitmqctl  
檢視rabbitmq-server所有的參考文件
[[email protected] log]# rpm -qd rabbitmq-server-3.3.5-34.el7.noarch  
/usr/share/doc/rabbitmq-server-3.3.5/LICENSE  
/usr/share/doc/rabbitmq-server-3.3.5/LICENSE-APACHE2-ExplorerCanvas  
/usr/share/doc/rabbitmq-server-3.3.5/LICENSE-APL2-Stomp-Websocket  
/usr/share/doc/rabbitmq-server-3.3.5/LICENSE-Apache-Basho  
/usr/share/doc/rabbitmq-server-3.3.5/LICENSE-BSD-base64js  
/usr/share/doc/rabbitmq-server-3.3.5/LICENSE-BSD-glMatrix  
/usr/share/doc/rabbitmq-server-3.3.5/LICENSE-MIT-EJS10  
/usr/share/doc/rabbitmq-server-3.3.5/LICENSE-MIT-Flot  
/usr/share/doc/rabbitmq-server-3.3.5/LICENSE-MIT-Mochi  
/usr/share/doc/rabbitmq-server-3.3.5/LICENSE-MIT-Sammy060  
/usr/share/doc/rabbitmq-server-3.3.5/LICENSE-MIT-eldap  
/usr/share/doc/rabbitmq-server-3.3.5/LICENSE-MIT-jQuery164  
/usr/share/doc/rabbitmq-server-3.3.5/LICENSE-MPL-RabbitMQ  
/usr/share/doc/rabbitmq-server-3.3.5/rabbitmq.config.example  
/usr/share/man/man1/rabbitmq-plugins.1.gz  
/usr/share/man/man1/rabbitmq-server.1.gz  
/usr/share/man/man1/rabbitmqctl.1.gz  
/usr/share/man/man5/rabbitmq-env.conf.5.gz  
其中比較重要的兩個參考檔案(這兩個只是參考)
執行配置檔案:rabbitmq.config.example
環境配置檔案:/usr/share/man/man5/rabbitmq-env.conf.5.gz 

預設該兩檔案需要配置在 /etc/rabbitmq目錄下 預設只有一個

[[email protected] log]# cd /etc/rabbitmq && ll  
total 20  
-rw-r--r-- 1 root root 18555 Aug 11  2014 rabbitmq.config  
單機預設配置足夠 具體配置參考官網 (http://www.rabbitmq.com/configure.html)

rabbitmq預設安裝後 會新增賬號rabbitmq 預設以該賬號執行

[[email protected] rabbitmq]# more /etc/passwd | grep rabbitmq  
rabbitmq:x:992:990:RabbitMQ messaging server:/var/lib/rabbitmq:/sbin/nologin  

2》啟動rabbitmq

  啟動後 檢視預設的埠 5672

[[email protected] rabbitmq]# netstat -aon | grep 5672  
tcp        0      0 0.0.0.0:25672           0.0.0.0:*               LISTEN      off (0.00/0/0)  
tcp6       0      0 :::5672                 :::*                    LISTEN      off (0.00/0/0) 
關閉和檢視rabiitmq的狀態可以使用命令
[[email protected] system]# rabbitmqctl stop  
Stopping and halting node [email protected] ...  
...done.  

rabbitmq預設提供了一個web管理工具(rabbitmq_management)參考官方http://www.rabbitmq.com/management.html 預設已經安裝 是一個外掛

檢視所有外掛

[[email protected] bin]# ./rabbitmq-plugins list
[ ] amqp_client                       3.1.5
[ ] cowboy                            0.5.0-rmq3.1.5-git4b93c2d
[ ] eldap                             3.1.5-gite309de4
[ ] mochiweb                          2.7.0-rmq3.1.5-git680dba8
[ ] rabbitmq_amqp1_0                  3.1.5
[ ] rabbitmq_auth_backend_ldap        3.1.5
[ ] rabbitmq_auth_mechanism_ssl       3.1.5
[ ] rabbitmq_consistent_hash_exchange 3.1.5
[ ] rabbitmq_federation               3.1.5
[ ] rabbitmq_federation_management    3.1.5
[ ] rabbitmq_jsonrpc                  3.1.5
[ ] rabbitmq_jsonrpc_channel          3.1.5
[ ] rabbitmq_jsonrpc_channel_examples 3.1.5
[ ] rabbitmq_management               3.1.5
[ ] rabbitmq_management_agent         3.1.5
[ ] rabbitmq_management_visualiser    3.1.5
[ ] rabbitmq_mqtt                     3.1.5
[ ] rabbitmq_shovel                   3.1.5
[ ] rabbitmq_shovel_management        3.1.5
[ ] rabbitmq_stomp                    3.1.5
[ ] rabbitmq_tracing                  3.1.5
[ ] rabbitmq_web_dispatch             3.1.5
[ ] rabbitmq_web_stomp                3.1.5
[ ] rabbitmq_web_stomp_examples       3.1.5
[ ] rfc4627_jsonrpc                   3.1.5-git5e67120
[ ] sockjs                            0.3.4-rmq3.1.5-git3132eb9
[ ] webmachine                        1.10.3-rmq3.1.5-gite9359c7

啟用該外掛即可

[[email protected] ~]# cd /usr/lib/rabbitmq/bin
[[email protected] bin]# ./rabbitmq-plugins enable rabbitmq_management
重啟rabbitmq-server
[[email protected] bin]# service rabbitmq-server restart
Restarting rabbitmq-server: RabbitMQ is not running
SUCCESS
rabbitmq-server.
瀏覽器訪問(開啟了埠 15672 當前機器ip是126.128) (http://192.168.126.128:15672/)

輸入使用者名稱 guest和guest


以後我們用Java程式碼去掉就是呼叫5672 這個埠。我們重點關注的是Queues和Exchanges

三.rabbitmq的api的呼叫

 參考官方文件 http://www.rabbitmq.com/getstarted.html

rabbitmq 官方操作api提供了n多種語言  前面幾種都會 java是本職 所以使用java

rabbitmq支援6種訊息接受和轉發機制 

1.簡單模式(http://www.rabbitmq.com/tutorials/tutorial-one-java.html)

1 個生產者對應一個消費者每個佇列都有一個唯一名字。一個生產者推送訊息到佇列,單個消費者消費訊息

簡單的呼叫,建立本地生產者maven檔案 在pom.xml中新增rabbitmq的架包

<project xmlns="http://maven.apache.org/POM/4.0.0" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>cn.et</groupId>
  <artifactId>Rabbitmq_Pub</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  	<dependencies>
  		<!-- 新增rabbitmq的架包 -->
  		<dependency>
  			<groupId>com.rabbitmq</groupId>  
          	<artifactId>amqp-client</artifactId>  
          	<version>4.2.0</version>  
  		</dependency>
  		
  	</dependencies>
  	
</project>

建立main方法

package cn.et;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
//生產者推送訊息到佇列
public class PubEmail {
	//將物件序列化為位元組陣列
	public static byte[] ser(Object obj) throws IOException{
		//接收被寫入的位元組陣列
		ByteArrayOutputStream bos= new ByteArrayOutputStream();
		//把物件序列化成位元組陣列
		ObjectOutputStream oos= new ObjectOutputStream(bos);
		//寫入
		oos.writeObject(obj);
		return bos.toByteArray();
	}
	//反序列化
	public static Object dser(byte[] src) throws Exception{
		//從位元組陣列讀取資料
		ByteArrayInputStream bis = new ByteArrayInputStream(src);
		//把位元組陣列反序列化成物件
		ObjectInputStream ois= new ObjectInputStream(bis);
		return ois.readObject();	
	}
	
	//任務被髮送的佇列名稱
	static String QUEUE_NAME="MAILQueue";
	public static void main(String[] args) throws Exception {
		//模擬一個任務訊息,以發郵件為例
		Map map = new HashMap();
		//接收郵件的郵箱
		map.put("sendto", "");
		//郵件標題
		map.put("subject", "測試郵件");
		//郵件內容
		map.put("content", "註冊成功你的驗證碼是135469");
		//連線遠端rabbit-server伺服器  
        ConnectionFactory factory = new ConnectionFactory();  
        //本機LinuxIP
        factory.setHost("192.168.126.128");  
        factory.setPort(5672);  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        //定義建立一個佇列  
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
        //傳送訊息  
        channel.basicPublish("", QUEUE_NAME, null, ser(map)); //注意傳送和接受段相同字符集否則出現亂碼  
        channel.close();  
        connection.close();  
	}
}

建立消費者maven專案,在pom.xml配置rabbitmq
<project xmlns="http://maven.apache.org/POM/4.0.0" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>cn.et</groupId>
  <artifactId>Rabbitmq_Cons</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
  	 <!--  依賴springboot -->
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.9.RELEASE</version>
</parent>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Dalston.SR4</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

  <dependencies>
  
  <!-- springboot每一個框架的整合都是一個starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
  	<!-- 配置傳送郵箱 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-mail</artifactId>
    </dependency>
  
  <!-- 新增rabbitmq的架包 -->
  		<dependency>
  			<groupId>com.rabbitmq</groupId>  
          	<artifactId>amqp-client</artifactId>  
          	<version>4.2.0</version>  
  		</dependency>
  
  
</dependencies>
  
</project>

建立消費者消費的類
package cn.et;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

//消費者從佇列消費
@RestController
public class Mail_Cons {
	//裝配發送郵件的類
	@Autowired
	JavaMailSender jms;
	//將物件序列化為位元組陣列
		public static byte[] ser(Object obj) throws IOException{
			//接收被寫入的位元組陣列
			ByteArrayOutputStream bos= new ByteArrayOutputStream();
			//把物件序列化成位元組陣列
			ObjectOutputStream oos= new ObjectOutputStream(bos);
			//寫入
			oos.writeObject(obj);
			return bos.toByteArray();
		}
		//反序列化
		public static Object dser(byte[] src) throws Exception{
			//從位元組陣列讀取資料
			ByteArrayInputStream bis = new ByteArrayInputStream(src);
			//把位元組陣列反序列化成物件
			ObjectInputStream ois= new ObjectInputStream(bis);
			return ois.readObject();	
		}
		//消費者消費的佇列名稱
	private static String QUEUE_NAME="MAILQueue";
	@GetMapping("/sends")
	public void asyncRec() throws Exception {
		
		 ConnectionFactory factory = new ConnectionFactory();  
	        factory.setHost("192.168.126.128");  
	        Connection connection = factory.newConnection();  
	        Channel channel = connection.createChannel();  
	        //消費者也需要定義佇列 有可能消費者先於生產者啟動   
	        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
	        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
	        //定義回撥抓取訊息  
	        Consumer consumer = new DefaultConsumer(channel) {  
	            @Override  
	            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,  
	                    byte[] body) throws IOException {  
	            	try {
						Map map=(Map) dser(body);
						SimpleMailMessage smm= new SimpleMailMessage();
						//設定傳送方
						smm.setFrom("[email protected]");
						//設定收件方
						smm.setTo(map.get("sendto").toString());
						//設定郵件標題
						smm.setSubject(map.get("subject").toString());
						//設定郵件內容
						smm.setText(map.get("content").toString());
						jms.send(smm);
					} catch (Exception e) {
						e.printStackTrace();
					}
	            }  
	        };  
	        channel.basicConsume(QUEUE_NAME, true, consumer);  
	}
}
配置傳送郵件的配置檔案在src/main/resources中建立application.properties
#配置smtp的主機名
spring.mail.host=smtp.126.com
#配置傳送方的郵件名
[email protected]
#配置傳送方的授權密碼
spring.mail.password=zmw960221
#配置stm埠
spring.mail.port=25

spring.mail.protocol=smtp

建立main方法執行
package cn.et;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Main {
	public static void main(String[] args) {
		SpringApplication.run(Main.class, args);
	}
}

在瀏覽器中輸入消費者的路徑localhost:8080/sends


2.工作佇列模式(http://www.rabbitmq.com/tutorials/tutorial-two-java.html)

工作佇列一般用於任務分配  釋出者釋出任務到佇列 多個訊息接收者 接收訊息 誰接收到某個訊息 其他接收者就只能消費其他訊息 佇列中的一個訊息只能被一個接收者消費(類似12306搶票一樣 比如某個車次 就相當於佇列  該車次 出來一些座位票  一張票只能被一個人搶到 最終 所有的座位票都被不同的人搶到 注意: 一個人可以搶多張票)


簡單的呼叫,建立生產者類

package cn.et.two;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class PubEmailTwo {
	
	//任務被髮送的佇列名稱
	static String QUEUE_NAME="WorkQueue";
	public static void main(String[] args) throws Exception {
	
		//連線遠端rabbit-server伺服器  
        ConnectionFactory factory = new ConnectionFactory();  
        //本機LinuxIP
        factory.setHost("192.168.126.128");  
        factory.setPort(5672);  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        //定義建立一個佇列  
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
        //傳送訊息  
        for(int i=0;i<10;i++){
        	channel.basicPublish("", QUEUE_NAME, null, ("這是:"+i).getBytes("UTF-8")); //注意傳送和接受段相同字符集否則出現亂碼  
        }
        System.out.println("生產者生產");
        channel.close();  
        connection.close();  
	}
}

執行生產者
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
生產者生產

建立消費者

package cn.et.two;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

//消費者從佇列消費
public class Mail_ConsTwo {
	
	private static String QUEUE_NAME="WorkQueue";
	
	public static void main(String[] args) throws Exception {
		
		 ConnectionFactory factory = new ConnectionFactory();  
	        factory.setHost("192.168.126.128");  
	        Connection connection = factory.newConnection();  
	        Channel channel = connection.createChannel();  
	        //消費者也需要定義佇列 有可能消費者先於生產者啟動   
	        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
	        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
	        //定義回撥抓取訊息  
	        Consumer consumer = new DefaultConsumer(channel) {  
	            @Override  
	            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,  
	                    byte[] body) throws IOException {  
	            	System.out.println(new String(body,"UTF-8"));
	            }  
	        };  
	        channel.basicConsume(QUEUE_NAME, true, consumer);  
	}
}

執行2次消費者

[*] Waiting for messages. To exit press CTRL+C
這是:1
這是:3
這是:5
這是:7
這是:9
 [*] Waiting for messages. To exit press CTRL+C
這是:0
這是:2
這是:4
這是:6
這是:8
3》釋出訂閱模式(http://www.rabbitmq.com/tutorials/tutorial-three-java.html)

生產者釋出主題訊息  訂閱者訂閱該主題 訂閱該主題的所有訂閱者都可以接受訊息 (類似於廣播  廣播有個頻道(主題) 聽廣播的使用者就是訂閱者廣播中推送的新聞和內容就是訊息)


釋出訂閱模式 引入了交換器的概念 訊息釋出者釋出訊息到交換器  訂閱者定義一個佇列(每個訂閱者定義一個)用於接受訊息   交換器 起到了中轉訊息的作用 這裡面還有個routingkey 如果定義了routingkey 交換器只會將訊息發給自己的routingkey和訂閱者佇列繫結routringkey和相同或者相似的佇列比如交換器定義routingkey是 abc  訂閱者將自己佇列繫結到交換器時指定routingkey是abc 交換器發現routingkey相同所以訊息被髮送到這個佇列

這裡 因為所有的訂閱者需要獲取訊息 所以routingkey為空 訂閱者產生的佇列名稱應該為隨機字串就可

交換器 有以下幾種型別  direct, topic, headers and fanout

 釋出訂閱模式 應該使用fanout(廣播)

建立生產者
package cn.et.three;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class PubEmailThree {
	
	/**  
     * 交換器名稱  
     */  
    private static final String EXCHANGE_NAME = "db.logs";  
    public static void main(String[] args) throws Exception {  
        //連線遠端rabbit-server伺服器  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("192.168.126.128");  
        factory.setPort(5672);  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        //定義建立一個交換器 引數1 名稱  引數2 交換器型別 引數3表示將交換器資訊永久儲存在伺服器磁碟上 關閉rabbitmqserver也不會丟失  
       //fanout釋出訂閱模式
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true);  
        String message = null;  
        //同時傳送10條訊息  
        for(int i=0;i<10;i++){  
            message="傳送第"+i+"訊息";  
            //第二個引數就是routingkey  不填 預設會轉發給所有的訂閱者佇列  
            channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));  
        }  
          
        System.out.println(" [x] Sent 6 message");  
        channel.close();  
        connection.close();  
    }
}

建立消費者

package cn.et.three;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

//消費者從佇列消費
public class Mail_ConsThree {
	
	/**  
     * 交換器名稱  
     */  
    private static final String EXCHANGE_NAME = "db.logs";  
	public static void main(String[] args) throws Exception {
		
		 ConnectionFactory factory = new ConnectionFactory();  
	        factory.setHost("192.168.126.128");  
	        Connection connection = factory.newConnection();  
	        Channel channel = connection.createChannel();  
	        //消費者也需要定義佇列 有可能消費者先於生產者啟動   
	        channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true);  
	      //  channel.basicQos(1);  
	        //產生一個隨機的佇列 該佇列用於從交換器獲取訊息  
	        String queueName = channel.queueDeclare().getQueue();  
	        //將佇列和某個交換器繫結 就可以正式獲取訊息了 routingkey和交換器的一樣都設定成空  
	        channel.queueBind(queueName, EXCHANGE_NAME, "");  
	        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
	        //定義回撥抓取訊息  
	        Consumer consumer = new DefaultConsumer(channel) {   
	            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,  
	                    byte[] body) throws IOException {  
	                String message = new String(body, "UTF-8");  
	                System.out.println(" [x] Received '" + message + "'");  
	                //引數2 true表示確認該佇列所有訊息  false只確認當前訊息 每個訊息都有一個訊息標記  
	                 
	            }  
	        };  
	        //引數2 表示手動確認  
	        channel.basicConsume(queueName, true, consumer);   
	}
}
先執行兩次生產者,建立佇列
[*] Waiting for messages. To exit press CTRL+C
[*] Waiting for messages. To exit press CTRL+C

執行生產者

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
 [x] Sent 6 message

消費者消費

[*] Waiting for messages. To exit press CTRL+C
 [x] Received '傳送第0訊息'
 [x] Received '傳送第1訊息'
 [x] Received '傳送第2訊息'
 [x] Received '傳送第3訊息'
 [x] Received '傳送第4訊息'
 [x] Received '傳送第5訊息'
 [x] Received '傳送第6訊息'
 [x] Received '傳送第7訊息'
 [x] Received '傳送第8訊息'
 [x] Received '傳送第9訊息'
[*] Waiting for messages. To exit press CTRL+C
 [x] Received '傳送第0訊息'
 [x] Received '傳送第1訊息'
 [x] Received '傳送第2訊息'
 [x] Received '傳送第3訊息'
 [x] Received '傳送第4訊息'
 [x] Received '傳送第5訊息'
 [x] Received '傳送第6訊息'
 [x] Received '傳送第7訊息'
 [x] Received '傳送第8訊息'
 [x] Received '傳送第9訊息'

4》路由模式(http://www.rabbitmq.com/tutorials/tutorial-three-java.html)

前面第三種釋出訂閱模式 每個訂閱者都會收到交換器發出的訊息 因為釋出者釋出訊息的routingkey是空  訂閱者接受訊息佇列的routingkey也是空 釋出者釋出的訊息 所有的訂閱者都能接收到   路由模式表示 釋出者釋出的訊息的routingkey和訂閱者接受訊息佇列的routingkey都不為空 相同的則可以接受訊息 比如傳送日誌的例子


該種模式 交換器的型別 必須是direct  而不是之前的faout  比如 釋出者P 釋出的多個訊息 使用的多個routingkey 比如error info warning等比如一條錯誤訊息 routingkey定義為 error 警告訊息是warning 如果某個訂閱者的臨時佇列使用的routingkey是error 將接受到error訊息是warning將接受到警告訊息 其他訊息如果沒有佇列繫結routingkey 將被丟棄模擬上圖的例子 比如有四類訊息 C1訂閱者程式 繫結到交換機X  routingkey是error 出現error寫入到磁碟  C2訂閱者程式繫結到交換機X routingkey繫結三個 info error warning 接受到訊息只是列印 

建立生產者
package cn.et.four;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class PubEmailFour {
	
	/**  
     * 交換器名稱  
     */  
    private static final String EXCHANGE_NAME = "db.ye";  
    public static void main(String[] args) throws Exception {  
    	  //連線遠端rabbit-server伺服器  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("192.168.126.128");  
        factory.setPort(5672);  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        //定義建立一個交換器 引數1 名稱  引數2 交換器型別 引數3表示將交換器資訊永久儲存在伺服器磁碟上 關閉rabbitmqserver也不會丟失  
        //路由模式 direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);  
        //第二個引數就是routingkey     
        channel.basicPublish(EXCHANGE_NAME, "error", MessageProperties.PERSISTENT_TEXT_PLAIN, "這是錯誤資訊".getBytes("UTF-8"));  
        channel.basicPublish(EXCHANGE_NAME, "info", MessageProperties.PERSISTENT_TEXT_PLAIN, "這是程式執行資訊".getBytes("UTF-8"));  
        channel.basicPublish(EXCHANGE_NAME, "warning", MessageProperties.PERSISTENT_TEXT_PLAIN, "這是警告".getBytes("UTF-8"));  
          System.out.println("[X] sent 6 message");
        channel.close();  
        connection.close();  
    }  
    
}
建立消費者
package cn.et.four;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

//消費者從佇列消費
public class Mail_ConsFour {
	
	/**  
     * 交換器名稱  
     */  
    private static final String EXCHANGE_NAME = "db.ye";  
	public static void main(String[] args) throws Exception {
		 //連線遠端rabbit-server伺服器 
		 ConnectionFactory factory = new ConnectionFactory();  
	        factory.setHost("192.168.126.128");  
	        Connection connection = factory.newConnection();  
	        Channel channel = connection.createChannel();  
	        //消費者也需要定義佇列 有可能消費者先於生產者啟動   
	        channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);  
	      //  channel.basicQos(1);  
	        //產生一個隨機的佇列 該佇列用於從交換器獲取訊息  
	        String queueName = channel.queueDeclare().getQueue();  
	        //將佇列和某個交換器繫結 設定key就可以正式獲取訊息了 
	        channel.queueBind(queueName, EXCHANGE_NAME, "info");  
	        System.out.println(" [*] Waiting for messages. To exit press CTRL+V");  
	        //定義回撥抓取訊息  
	        Consumer consumer = new DefaultConsumer(channel) {   
	            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,  
	                    byte[] body) throws IOException {  
	                String message = new String(body, "UTF-8");  
	                System.out.println(" [x] Received '" + message + "'");  
	                //引數2 true表示確認該佇列所有訊息  false只確認當前訊息 每個訊息都有一個訊息標記  
	                 
	            }  
	        };  
	        //引數2 表示手動確認  
	        channel.basicConsume(queueName, true, consumer);   
	}
}

先執行三次消費者,每次執行確保key的不同
channel.queueBind(queueName, EXCHANGE_NAME, "info"||"warning"||"error");  

在執行生產者

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[X] sent 6 message
消費者
 [*] Waiting for messages. To exit press CTRL+V
 [x] Received '這是警告'
 [*] Waiting for messages. To exit press CTRL+V
 [x] Received '這是錯誤資訊'
[*] Waiting for messages. To exit press CTRL+V
 [x] Received '這是程式執行資訊'

5》Topics路由模式(http://www.rabbitmq.com/tutorials/tutorial-three-java.html)

該種模式和路由模式類似 只是訊息的routingkey 是通過.隔開的多個字元組成  訂閱者的訊息佇列繫結的routingkey可以使用萬用字元通配所有滿足條件的交換機訊息  匹配上則接收訊息  這種型別的訊息 使用的交換器型別是 topic

接收者接收訊息可以設定routingkey為表示式模糊匹配  *匹配一個單詞 #匹配多個 

以釋出日記為例

建立釋出者

package cn.et.five;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class PubEmailFive {
	
	/**  
     * 交換器名稱  
     */  
    private static final String EXCHANGE_NAME = "db.ge";  
    public static void main(String[] args) throws Exception {  
    	  //連線遠端rabbit-server伺服器  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("192.168.126.128");  
        factory.setPort(5672);  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        //定義建立一個交換器 引數1 名稱  引數2 交換器型別 引數3表示將交換器資訊永久儲存在伺服器磁碟上 關閉rabbitmqserver也不會丟失  
        //路由模式 direct
        channel.exchangeDeclare(EXCHANGE_NAME, "topic",true);  
        //第二個引數就是routingkey     
        channel.basicPublish(EXCHANGE_NAME, "d.error", MessageProperties.PERSISTENT_TEXT_PLAIN, "這是錯誤資訊".getBytes("UTF-8"));  
        channel.basicPublish(EXCHANGE_NAME, "d.info", MessageProperties.PERSISTENT_TEXT_PLAIN, "這是程式執行資訊".getBytes("UTF-8"));  
        channel.basicPublish(EXCHANGE_NAME, "d.warning", MessageProperties.PERSISTENT_TEXT_PLAIN, "這是警告".getBytes("UTF-8"));
        channel.basicPublish(EXCHANGE_NAME, "a.c.error", MessageProperties.PERSISTENT_TEXT_PLAIN, "zs這是錯誤資訊".getBytes("UTF-8"));  
        channel.basicPublish(EXCHANGE_NAME, "a.c.info", MessageProperties.PERSISTENT_TEXT_PLAIN, "ls這是程式執行資訊".getBytes("UTF-8"));  
        channel.basicPublish(EXCHANGE_NAME, "a.c.warning", MessageProperties.PERSISTENT_TEXT_PLAIN, "ww這是警告".getBytes("UTF-8"));  
          System.out.println("[X] sent 6 message");
        channel.close();  
        connection.close();  
    }  
    
}
建立生產者
package cn.et.five;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

//消費者從佇列消費
public class Mail_ConsFive {
	
	/**  
     * 交換器名稱  
     */  
    private static final String EXCHANGE_NAME = "db.ge";  
	public static void main(String[] args) throws Exception {
		 //連線遠端rabbit-server伺服器 
		 ConnectionFactory factory = new ConnectionFactory();  
	        factory.setHost("192.168.126.128");  
	        Connection connection = factory.newConnection();  
	        Channel channel = connection.createChannel();  
	        //消費者也需要定義佇列 有可能消費者先於生產者啟動   
	        channel.exchangeDeclare(EXCHANGE_NAME, "topic",true);  
	      //  channel.basicQos(1);  
	        //產生一個隨機的佇列 該佇列用於從交換器獲取訊息  
	        String queueName = channel.queueDeclare().getQueue();  
	        //將佇列和某個交換器繫結 就可以正式獲取訊息了 
	         channel.queueBind(queueName, EXCHANGE_NAME, "d.*");  
	        System.out.println(" [*] Waiting for messages. To exit press CTRL+V");  
	        //定義回撥抓取訊息  
	        Consumer consumer = new DefaultConsumer(channel) {   
	            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,  
	                    byte[] body) throws IOException {  
	                String message = new String(body, "UTF-8");  
	                System.out.println(" [x] Received '" + message + "'");  
	                //引數2 true表示確認該佇列所有訊息  false只確認當前訊息 每個訊息都有一個訊息標記  
	                 
	            }  
	        };  
	        //引數2 表示手動確認  
	        channel.basicConsume(queueName, true, consumer);   
	}
}

先執行消費者,在把  channel.queueBind(queueName, EXCHANGE_NAME, "d.*");   d.*改為a.#在執行
 [*] Waiting for messages. To exit press CTRL+V
在執行生產者
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[X] sent 6 message
生產者
[*] Waiting for messages. To exit press CTRL+V
 [x] Received '這是錯誤資訊'
 [x] Received '這是程式執行資訊'
 [x] Received '這是警告'
 [*] Waiting for messages. To exit press CTRL+V
 [x] Received 'zs這是錯誤資訊'
 [x] Received 'ls這是程式執行資訊'
 [x] Received 'ww這是警告'




相關推薦

rabbitmq訊息佇列簡單入門

一rabbitmq rabbitmq降低主程式執行緒壓力,是一種非阻塞模式的分散式訊息佇列伺服器,有生產者生產到rabbitmq,消費者消費 二。rabbitmq安裝 1.安裝rabbitmq 安裝過程 參考 (http://www.rabbitmq.com/instal

RabbitMQ .NET訊息佇列使用入門(三)【MVC實現RPC例子】

每一個孤獨的靈魂都需要陪伴 RPC(Remote Procedure Call Protocol)——遠端過程呼叫協議,它是一種通過網路從遠端計算機程式上請求服務,而不需要了解底層網路技術的協議。RPC協議假定某些傳輸協議的存在,如TCP或UDP,為通訊程式之

RabbitMQ訊息佇列入門篇(環境配置+Java例項+基礎概念)

一、訊息佇列使用場景或者其好處 訊息佇列一般是在專案中,將一些無需即時返回且耗時的操作提取出來,進行了非同步處理,而這種非同步處理的方式大大的節省了伺服器的請求響應時間,從而提高了系統的吞吐量。 在專案啟動之初來預測將來專案會碰到什麼需求,是極其困難的。訊息

入門RabbitMQ訊息佇列結合SSH框架(配置篇)

使用RabbitMQ訊息佇列,因為訊息佇列的非同步思想,解耦,以及允許短暫的不一致性,就像我現在把東西放在桌子上,你可以去拿,別人也可以去拿,而我不用等人拿完我便放東西上去,這樣就保證了我(生產者)和接收者沒有什麼聯絡,而且接受者可以隨時去拿。我們要使用RabbitMQ,安裝

RabbitMQ .NET訊息佇列使用入門(二)【多個佇列訊息傳輸】

孤獨將會是人生中遇見的最大困難。 實體類: DocumentType.cs public enum DocumentType { //日誌 Journal = 1, //論文

可能是目前最簡單,最詳細的RabbitMQ訊息佇列搭建了

                                                ========= 本文,只涉及到RabbitMQ搭建應用 ========== 1,windows環境下安裝 1.1 安裝Erlang語言環境 首先,在Windows安裝E

C# Queue與RabbitMQ的愛恨情仇(文末附原始碼):Q與MQ訊息佇列簡單應用(一)

首先我們簡單瞭解一下什麼堆、棧、佇列。 堆是在程式執行時,而不是在程式編譯時,申請某個大小的記憶體空間。即動態分配記憶體,對其訪問和對一般記憶體的訪問沒有區別。 棧就是一個容器,後放進去的先拿出來,它下面本來有的東西要等它出來之後才能出來。(先進後出or後進先出) 佇列只能在隊頭做刪除操作,在隊尾做插入操作.

C# Queue與RabbitMQ的愛恨情仇(文末附原始碼):Q與MQ訊息佇列簡單應用(二)

上一章我們講了佇列( Queue),這一章我們講Message Queue訊息佇列,簡稱MQ。 定義:   MQ是MessageQueue,訊息佇列的簡稱(是流行的開源訊息佇列系統,利用erlang語言開發)。MQ是一種應用程式對應用程式的通訊方法。 應用程式通過讀寫入隊和出隊的訊息來通訊,無

RabbitMQ 訊息佇列入門

文件 入門 主要的內容:one two three four five six seven 前言 中介軟體 訊息佇列 非同步處理,註冊完發簡訊 應用解耦,訂單介面呼叫扣庫存介面,失敗了怎麼辦? 流量削峰,大量請求到達業務介面,這不行! 日誌處理,每個業務程式碼都呼叫一下寫日誌的方法嗎?結合AOP思想,業

RabbitMQ 訊息佇列之 Exchange Types

寫在前面 RabbitMQ遵循AMQP 0-9-1協議 複製程式碼 AMQP 0-9-1協議簡介 訊息釋出到交換站,這通常被比作郵局或郵箱。然後交換器使用稱為繫結的規則將訊息副本分發到佇列。然後,AMQP代理將訊息傳遞給訂閱佇列的消費者,或者根據需要從佇列中獲取訊息。 釋出訊息時,釋出者可以指定各種

python 64式: 第3式、rabbitmq訊息佇列使用

topicProductor.py內容如下 #!/usr/bin/env python # -*- coding: utf-8 -*- import pika import sys ''' 問題: 實現基於rabbitmq的生產者和消費者,消費者可以支援繫結路由鍵為notification.

python之RabbitMQ訊息佇列

RabbitMQ:訊息佇列 PY裡的佇列有:執行緒QUEUE、程序QUEUE 程序queue可以用於父程序與子程序進行互動,或者同屬於一父程序下多個子程序進行互動,但如果是兩個獨立的程式,是不能用這個QUEUE進行通訊的。 兩個獨立的程式之間,要找一箇中間代理,比如可以用socket通訊

RabbitMQ訊息佇列的基本原理

1.背景 RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue)的開源實現。 2.應用場景 2.1非同步處理 場景說明:使用者註冊後,需要發註冊郵件和註冊簡訊,傳統的做法有兩種1.序列的方式;2.並行的方

Rabbitmq---訊息佇列

一 . MQ:message queue   訊息佇列的作用:   1 通訊解耦   2 高峰限流 原理分析: 一開始,認證系統是強耦合的,A系統傳遞認證系統訊息接收計算結果的過程中   1 傳給認證系統   2 認證系統計算   3 返回計算結果   4 讀取A系統邏輯 只要當前計算

RabbitMQ 訊息佇列 - topic 模式分發訊息

推薦閱讀 https://blog.csdn.net/column/details/15500.html topic 模式 根據 Binding 指定的 RoutingKey, Exchange 對 key 進行模式匹配後投遞到相應的 Queue, 模式匹配時符號

RabbitMQ 訊息佇列 - fanout 模式分發訊息

推薦閱讀 https://blog.csdn.net/column/details/15500.html fanout 模式 將同一個 message 傳送到所有同該 Exchange 繫結的 queue, 只要 RoutingKey 是一樣, 這條訊息都會被投遞

RabbitMQ 訊息佇列 - direct 模式分發訊息

推薦閱讀 https://blog.csdn.net/column/details/15500.html direct 模式 根據 Binding 指定的 Routing Key, 將符合Key的訊息傳送到 Binding 的 Queue p_direc

rabbitmq訊息佇列配置】

      #erlang語言支援包     #rabbitmq-server安裝支援   #新增使用者     #刪除使用者   #使用者角色   #啟動 &nbs

rabbitmq訊息佇列設定過期時間和過期訊息處理

rabbitmq訊息佇列設定過期時間和過期訊息處理 適用場景 電商秒殺搶購活動中處理使用者下單和付款時間不一致,設定過期時間,過期則不允許付款 參考 https://blog.csdn.net/zhu_tianwei/article/details/53563

初探 RabbitMQ 訊息佇列

初探 RabbitMQ 訊息佇列   rabbitmq基礎概念常見應用場景匯入依賴屬性配置具體編碼定義佇列實體類控制器訊息消費者主函式測試總結說點什麼 SpringBoot 是為了簡化 Spring 應用的建立、執行、除錯、部署等一系列