1. 程式人生 > >輕鬆搞定RabbitMQ(二)——工作佇列之訊息分發機制

輕鬆搞定RabbitMQ(二)——工作佇列之訊息分發機制

       上一篇博文中簡單介紹了一下RabbitMQ的基礎知識,並寫了一個經典語言入門程式——HelloWorld。本篇博文中我們將會建立一個工作佇列用來在工作者(consumer)間分發耗時任務。同樣是翻譯的官網例項

工作佇列


       在前一篇博文中,我們完成了一個簡單的對宣告的佇列進行傳送和接受訊息程式。下面我們將建立一個工作佇列,來向多個工作者(consumer)分發耗時任務。

       工作佇列(又名:任務佇列)的主要任務是為了避免立即做一個資源密集型的卻又必須等待完成的任務。相反的,我們進行任務排程:將任務封裝為訊息併發給佇列。在後臺執行的工作者(consumer)將其取出,然後最終執行。當你執行多個工作者(consumer),佇列中的任務被工作進行共享執行。

       這樣的概念對於在一個HTTP短連結的請求視窗中處理複雜任務的web應用程式,是非常有用的。

準備

       使用Thread.Sleep()方法來模擬耗時。採用小數點的數量來表示任務的複雜性。每一個點將住哪用1s的“工作”。例如,Hello... 處理完需要3s的時間。

       傳送端(生產者):NewTask.java

public class NewTask {
	private final static String QUEUE_NAME = "hello";

	public static void main(String[] args) throws IOException {
		/**
		 * 建立連線連線到MabbitMQ
		 */
		ConnectionFactory factory = new ConnectionFactory();
		// 設定MabbitMQ所在主機ip或者主機名
		factory.setHost("127.0.0.1");
		// 建立一個連線
		Connection connection = factory.newConnection();
		// 建立一個頻道
		Channel channel = connection.createChannel();
		// 指定一個佇列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		// 傳送的訊息
		String message = "Hello World...";
		// 往佇列中發出一條訊息
		channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
		System.out.println(" [x] Sent '" + message + "'");
		// 關閉頻道和連線
		channel.close();
		connection.close();
	}
}

       工作者(消費者)Worker.java

public class Worker {
	private final static String QUEUE_NAME = "hello";

	public static void main(String[] argv) throws IOException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		// 開啟連線和建立頻道,與傳送端一樣
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		// 宣告佇列,主要為了防止訊息接收者先執行此程式,佇列還不存在時建立佇列。
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		
		// 建立佇列消費者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");

			    System.out.println(" [x] Received '" + message + "'");
			    System.out.println(" [x] Proccessing... at " +new Date().toLocaleString());
			    try {
			    	for (char ch: message.toCharArray()) {
				        if (ch == '.') {
				        	Thread.sleep(1000);
				        }
				    }
				} catch (InterruptedException e) {
				} finally {
			      System.out.println(" [x] Done! at " +new Date().toLocaleString());
			    }
			  }
			};
			channel.basicConsume(QUEUE_NAME, true, consumer);
	}
}
       執行結果如下:


任務分發機制

       正主來了。。。下面開始介紹各種任務分發機制。

Round-robin(輪詢分發)

       使用任務佇列的優點之一就是可以輕易的並行工作。如果我們積壓了好多工作,我們可以通過增加工作者(消費者)來解決這一問題,使得系統的伸縮性更加容易。

修改一下NewTask,使用for迴圈模擬多次傳送訊息的過程:

		for (int i = 0; i < 5; i++) {
			// 傳送的訊息
			String message = "Hello World"+Strings.repeat(".", i);
			// 往佇列中發出一條訊息
			channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
			System.out.println(" [x] Sent '" + message + "'");
		}

       我們先啟動1個生產者例項,2個工作者例項,看一下如何執行:


       從上述的結果中,我們可以得知,在預設情況下,RabbitMQ將逐個傳送訊息到在序列中的下一個消費者(而不考慮每個任務的時長等等,且是提前一次性分配,並非一個一個分配)。平均每個消費者獲得相同數量的訊息。這種方式分發訊息機制稱為Round-Robin(輪詢)。

Fair dispatch(公平分發)

       您可能已經注意到,任務分發仍然沒有完全按照我們想要的那樣。比如:現在有2個消費者,所有的奇數的訊息都是繁忙的,而偶數則是輕鬆的。按照輪詢的方式,奇數的任務交給了第一個消費者,所以一直在忙個不停。偶數的任務交給另一個消費者,則立即完成任務,然後閒得不行。而RabbitMQ則是不瞭解這些的。

       這是因為當訊息進入佇列,RabbitMQ就會分派訊息。它不看消費者為應答的數目,只是盲目的將第n條訊息發給第n個消費者。


       為了解決這個問題,我們使用basicQos( prefetchCount = 1)方法,來限制RabbitMQ只發不超過1條的訊息給同一個消費者。當訊息處理完畢後,有了反饋,才會進行第二次傳送。

int prefetchCount = 1;
channel.basicQos(prefetchCount);
       注:如果所有的工作者都處於繁忙狀態,你的佇列有可能被填充滿。你可能會觀察佇列的使用情況,然後增加工作者,或者使用別的什麼策略。
       還有一點需要注意,使用公平分發,必須關閉自動應答,改為手動應答。這些內容會在下篇博文中講述。

       整體程式碼如下:生產者NewTask.java

public class NewTask {
	private final static String QUEUE_NAME = "hello";

	public static void main(String[] args) throws IOException {
		/**
		 * 建立連線連線到MabbitMQ
		 */
		ConnectionFactory factory = new ConnectionFactory();
		// 設定MabbitMQ所在主機ip或者主機名
		factory.setHost("127.0.0.1");
		// 建立一個連線
		Connection connection = factory.newConnection();
		// 建立一個頻道
		Channel channel = connection.createChannel();
		// 指定一個佇列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		int prefetchCount = 1;
		//限制發給同一個消費者不得超過1條訊息
		channel.basicQos(prefetchCount);
		for (int i = 0; i < 5; i++) {
			// 傳送的訊息
			String message = "Hello World"+Strings.repeat(".",5-i)+(5-i);
			// 往佇列中發出一條訊息
			channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
			System.out.println(" [x] Sent '" + message + "'");
		}
		// 關閉頻道和連線
		channel.close();
		connection.close();
	}
}

       消費者Worker.java

public class Worker {
	private final static String QUEUE_NAME = "hello";

	public static void main(String[] argv) throws IOException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		// 開啟連線和建立頻道,與傳送端一樣
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();

		// 宣告佇列,主要為了防止訊息接收者先執行此程式,佇列還不存在時建立佇列。
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		channel.basicQos(1);//保證一次只分發一個
		// 建立佇列消費者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");

			    System.out.println(" [x] Received '" + message + "'");
			    try {
			    	for (char ch: message.toCharArray()) {
				        if (ch == '.') {
				        	Thread.sleep(1000);
				        }
				    }
				} catch (InterruptedException e) {
				} finally {
			      System.out.println(" [x] Done! at " +new Date().toLocaleString());
			      channel.basicAck(envelope.getDeliveryTag(), false);  
			    }
			  }
			};
			channel.basicConsume(QUEUE_NAME, false, consumer);
	}
}
       執行結果如下:

相關推薦

輕鬆RabbitMQ——工作佇列訊息分發機制

       上一篇博文中簡單介紹了一下RabbitMQ的基礎知識,並寫了一個經典語言入門程式——HelloWorld。本篇博文中我們將會建立一個工作佇列用來在工作者(consumer)間分發耗時任務。同樣是翻譯的官網例項。 工作佇列        在前一篇博文中,我們完

輕鬆RabbitMQ——遠端過程呼叫RPC

翻譯:http://www.rabbitmq.com/tutorials/tutorial-six-java.html在第二篇博文中,我們已經瞭解到瞭如何使用工作佇列來向多個消費者分散耗時任務。但是付過我們需要在遠端電腦上執行一個方法然後等待結果,該怎麼辦?這是不同的需求。

RabbitMQ 工作佇列

本系列教程主要來自於官網入門教程的翻譯,然後自己進行了部分的修改與實驗,內容僅供參考。上一篇部落格中我們寫了通過一個命名的佇列傳送和接收訊息,如果你還不瞭解請點選:RabbitMQ 入門 Helloworld。這篇中我們將會建立一個工作佇列用來在工作者(consumer)間分

輕松RabbitMQ——路由選擇

byte[] view 轉發器 ews 磁盤空間 表示 info 直接 net 轉自 http://blog.csdn.net/xiaoxian8023/article/details/48733249 翻譯地址:http://www.rabbitmq.com/tutori

RabbitMQ入門工作佇列

  在文章RabbitMQ入門(一)之Hello World,我們編寫程式通過指定的佇列來發送和接受訊息。在本文中,我們將會建立工作佇列(Work Queue),通過多個workers來分配耗時任務。   工作佇列(Work Queue,也被成為Task Queue,任務佇列)的中心思想是,避免立即執行一個資

訊息中介軟體——RabbitMQ各大主流訊息中介軟體綜合對比介紹!

前言 訊息佇列已經逐漸成為企業IT系統內部通訊的核心手段。它具有低耦合、可靠投遞、廣播、流量控制、最終一致性等一系列功能,成為非同步RPC的主要手段之一。當今市面上有很多主流的訊息中介軟體,如老牌的ActiveMQ、RabbitMQ,炙手可熱的Kafka,阿里巴巴自主開發RocketMQ等。今天主要來

很easy的js雙向綁框架:控制器繼承

rst data 發出 sim 跟著 cti exec mar spa 初衷 上一篇已經實現了數據的雙向綁定,但model的控制範圍是整個文檔。在實際project中必需要有作用範圍,以便做ui模塊的拆分。 這一篇,我們希望實現像angularjs一

RabbitMQ ——工作隊列

bin 生產 就是 發送 定時 順序 -h 預取 list RabbitMQ(二) ——工作隊列 (轉載請附上本文鏈接——linhxx) 一、概述 工作隊列模式(work queue),是有多個消費者的情況下,可以共同消費隊列內的內容,加快消息處理速度

機器學習工作流程與模型調優

發生 較高的 mode lan 包含 因此 增加 絕對值 輸入 上一講中主要描述了機器學習特征工程的基本流程,其內容在這裏:機器學習(一)特征工程的基本流程 本次主要說明如下:   1)數據處理:此部分已經在上一節中詳細討論   2)特征工程:此部分已經在上一節中詳細討論

python采用pika庫使用rabbitmq --工作隊列

col min pro red exc 理解 需要 color 工作者 消息也可以理解為任務,消息發送者可以理解為任務分配者,消息接收者可以理解為工作者,當工作者接收到一個任務,還沒完成的時候,任務分配者又發一個任務過來,那就忙不過來了,於是就需要多個工作者來共同處理這些

基於CentOS6.5使用RabbitMQ

目錄 0、前言 1、啟動 2、使用者管理 3、角色管理 4、使用者許可權 0、前言 接上文《基於CentOS6.5安裝RabbitMQ》,完成了rabbitmq的安裝,接下里就是如何使用了。   1、啟動 註冊為服務,開機啟動 # chk

學習路-RabbitMQ:Mac安裝RabbitMQ

1.安裝: RabbitMQ:brew install rabbitmq 2.RabbitMQ的安裝位置: /usr/local/Cellar/rabbitmq/3.7.9 3.配置環境變數: cd~ vi .bash_profile export RABBIT_HOME=/usr/loca

Libcap庫學習::工作原理

Libpcap工作原理介紹 Libpcap是Unix/Linux平臺下的網路資料包捕獲資料庫。它是一個獨立於系統的使用者級資料包捕獲API介面,為底層網路監測提供了一個可以移植的框架。 一個包捕獲機制包含三個主要部分,分別是面向底層的包捕獲引擎,面向中間層的資料包過濾器,面向應用

RabbitMQAMQP協議mandatory和immediate標誌位區別

mandatory和immediate是AMQP協議中basic.pulish方法中的兩個標誌位,它們都有當訊息傳遞過程中不可達目的地時將訊息返回給生產者的功能。具體區別在於: 1. mandatory標誌位 當mandatory標誌位設定為true時,如果exchange

從原理上編碼-- Base64編碼

  開發者對Base64編碼肯定很熟悉,是否對它有很清晰的認識就不一定了。實際上Base64已經簡單到不能再簡單了,如果對它的理解還是模稜兩可實在不應該。大概介紹一下Base64的相關內容,花幾分鐘時間就可以徹底理解它。文章下邊貼了一個Base64的編解碼器,方便閱讀文章的同時來實驗。   一. Base6

2018年中總結工作中遇到的問題

年初在一家使用golang的公司待了一段時間,當時抱著一顆學習的心態試圖挑戰一下自己,結果因為自己基礎薄弱,能力不足,沒能堅持下去,但是卻學到了非常多的東西,是一段非常寶貴的工作經歷,這裡記錄一下在工作中遇到的奇奇怪怪的問題,儘量將具體的工作內容剝離出去,記錄問

訊息中介軟體Rabbitmq-使用詳解

https://blog.csdn.net/Dante_003/article/details/79377908Rabbitmq 是基於amqp(高階訊息佇列協議)實現的佇列技術,在他之上可以完成多種型別的訊息轉發模型。 下面列舉一些常用的訊息轉發場景,在rabbitmq中是

AI 學習路——輕鬆初探 Python 篇

這是「AI 學習之路」的第 2 篇,「Python 學習」的第 2 篇 我將分兩篇講解下 Python 的基礎語法,這是第一篇。大家也可以在很多地方看到入門的學習資料,我就簡單的根據自己理解和學習,用盡量簡單和好理解的方式,再來小入門一下,文中可能

RabbitMQ:例項demo

1、Windows下RabbitMQ的安裝 下載Erlang,地址:http://www.erlang.org/download/otp_win32_R15B.exe ,雙擊安裝即可(首先裝) 下載RabbitMQ,地址:http://www.rabbitmq.com/re

.NET Core微服務 許可權系統+工作工作流系統

一、前言   接上一篇 .NET Core微服務 許可權系統+工作流(一)許可權系統 ,再來一發   工作流,我在接觸這塊開發的時候一直好奇它的實現方式,翻看各種工作流引擎程式碼,探究其實現方式,個人總結出來一個核心要點:     實際上工作流引擎處理流轉的核心要義是如何解析流轉XML或者JS