【RabbitMQ】非同步任務
一、前言
上一篇部落格介紹了用執行緒池實現非同步任務。這一篇部落格談一談用MQ實現非同步任務。MQ的產品有灰常多,像什麼MSMQ、activeMQ、RocketMQ、RabbitMQ、kafak等。在此之前先談一談對訊息佇列的理解。
二、MQ
MQ是一種應用程式對應用程式的通訊方法,應用程式通過讀寫出入佇列的訊息來進行通訊,兩者無需建立連線,釋出者和消費者無需知道對方的存在。
MQ是生產者--消費者模型的一個代表,一端往訊息佇列中不斷寫入訊息,而另一端則可以不斷讀取或訂閱佇列中的訊息。
使用場景:
1、非同步處理:在專案中將一些無需及時返回且耗時的操作提取出來,進行非同步處理,採用非同步處理,將大大節省了伺服器的請求響應時間,從而提高了系統的吞吐量。
2、訊息推送
三、RabbitMQ特點
RabbitMQ是一個由Erlang語言開發的AMQP的開源實現。
RabbitMQ的特點為:
1、可靠性
RabbitMQ提供了多種技術可以讓你在效能和可靠性之間進行權衡,如永續性、傳輸確認、投遞確認。
2、靈活的路由
在訊息進行佇列之前,通過Exchange來路由訊息。對於典型的路由功能,RabbitMQ已經提供了一些內建的Exchange來實現,針對更復雜的路由功能,可以將多個Exchange繫結在一起,也可以通過外掛機制來實現自己的Exchange。
3、訊息叢集
多個RabbitMQ伺服器可以組成一個叢集,形成一個邏輯Broker。
4、高可用
在同一叢集中佇列可以被映象到多臺機器,使得在部分節點出現問題的情況下,佇列仍然可用。
5、多協議
RabbitMQ支援多種訊息佇列協議。
6、多語言客戶端
支援java、.NET、Ruby等
7、管理介面
RabbitMQ提供了一個易用的使用者介面,使用者可以監控和管理Broker的許多方面。
8、跟蹤機制
如果訊息異常,RabbitMQ提供了訊息跟蹤機制,使用者可以找出發生了什麼。
9、外掛機制
RabbitMQ提供了很多外掛,來從多方面進行擴充套件,也可以編寫各種外掛。
四、RabbitMQ中的概念模型
- 訊息抽象模型
消費者訂閱某個佇列,生產者建立訊息,然後釋出到佇列中,最後將訊息傳送到監聽的消費者。
- RabbitMQ的基本概念
上圖是一個最簡單的抽象模型,接下來將展示一個更加詳細的模型。RabbitMQ內部結構如下:
1、Message
Message由訊息頭和訊息體組成,訊息體是不透明的,訊息頭是由一些屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其他訊息的優先權)、delivery-mode(指出該訊息可能需要永續性儲存)等。
2、Publisher
訊息的生產者,也是一個向交換機發布訊息的客戶端應用程式。
3、Exchange
用來接收生產者傳送的訊息,並按照一定的規則將這些訊息路由給伺服器中的佇列。
4、Binding
繫結,用於訊息佇列和交換器之間的路由關聯,一個繫結就是將交換器和佇列連線起來的路由規則。
5、Queue
用來儲存訊息直到傳送給消費者,它是訊息的容器,1個訊息可投入一個到多個佇列,訊息一直在佇列裡,等待消費者取走。
6、Connection
網路連線,比如一個TCP連線。
7、Channel
通道,或者稱之為管道,是一條雙向資料流通道,AMQP都是通過通道發出去的,不管是釋出訊息,訂閱佇列,還是接受訊息,這些動作都是通過通道來完成的。
8、Consumer
訊息的消費者,表示一個從訊息佇列中取的訊息的客戶端應用程式。
9、Virtual Host
表示一批交換器、訊息佇列和相關物件。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 伺服器,擁有自己的佇列、交換器、繫結和許可權機制。vhost 是 AMQP 概念的基礎,必須在連線時指定,RabbitMQ 預設的 vhost 是 / 。
10、Broker
表示RabbitMQ伺服器實體
五、入門Demo
pom檔案新增依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>com.github.sstone</groupId>
<artifactId>amqp-client_2.10</artifactId>
<version>1.3</version>
</dependency>
1、Publisher
public class Publisher {
//佇列名稱
private final static String QUEUE_NAME = "Queue";
public static void main(String[] args) {
// 建立連線工廠
ConnectionFactory factory = null;
// 建立到代理伺服器到連線
Connection connection = null;
// 獲得通道
Channel channel = null;
try {
factory = new ConnectionFactory();
//設定使用者名稱和密碼
factory.setUsername("guest");
factory.setPassword("guest");
// 設定 RabbitMQ 地址
factory.setHost("localhost");
// 建立到代理伺服器到連線
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "hello world,hello world";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("傳送 message[" + message + "] to "+ QUEUE_NAME +" success!");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
try {
// 關閉資源
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
}
2、consumer
public class Consumer {
//佇列名稱
private final static String QUEUE_NAME = "Queue";
public static void main(String[] args) {
// 建立連線工廠
ConnectionFactory factory = null;
// 建立到代理伺服器到連線
Connection connection = null;
// 獲得通道
Channel channel = null;
try {
factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
// 建立到代理伺服器到連線
connection = factory.newConnection();
channel = connection.createChannel();
// 1.佇列名2.是否持久化,3是否侷限與連結,4不再使用是否刪除,5其他的屬性
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 宣告一個消費者,配置好獲取訊息的方式
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
// 迴圈獲取訊息
while (true) {
// 迴圈獲取資訊
// 指向下一個訊息,如果沒有會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("接收 message[" + msg + "] from " + QUEUE_NAME);
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (ShutdownSignalException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
// 關閉資源
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
}
效果圖如下:
相關推薦
【RabbitMQ】非同步任務
一、前言 上一篇部落格介紹了用執行緒池實現非同步任務。這一篇部落格談一談用MQ實現非同步任務。MQ的產品有灰常多,像什麼MSMQ、activeMQ、RocketMQ、RabbitMQ、kafak等。在此之前先談一談對訊息佇列的理解。二、MQ MQ是一種應用
【RabbitMQ】5、RabbitMQ任務分發機制
它的 rtu 忘記 順序 sin spa 機制 一段時間 cto 當有Consumer需要大量的運算時,RabbitMQ Server需要一定的分發機制來balance每個Consumer的load。接下來我們分布講解。 應用場景就是RabbitMQ Server會
【玩轉SpringBoot】非同步任務執行與其執行緒池配置
同步程式碼寫起來簡單,但就是怕遇到耗時操作,會影響效率和吞吐量。此時非同步程式碼才是王者,但涉及多執行緒和執行緒池,以及非同步結果的獲取,寫起來頗為麻煩。不過在遇到SpringBoot非同步任務時,這個問題就不存在了。因為Spring家族是最替使用者考慮的。結果就是,像同步一樣簡單,像非同步一樣強大。眾所熟悉
【RabbitMQ】3、win7下安裝RabbitMQ
默認 窗體 releases style gen gem 執行 file spl RabbitMQ依賴erlang,所以先安裝erlang,然後再安裝RabbitMQ; erlang,下載地址:http://www.erlang.org/download Rabb
【RabbitMQ】4、幾種Exchange 模式
copy 消息發送 但是 net with .html ole img lis AMQP協議中的核心思想就是生產者和消費者隔離,生產者從不直接將消息發送給隊列。生產者通常不知道是否一個消息會被發送到隊列中,只是將消息發送到一個交換機。先由Exchange來接收,然後Exch
【RabbitMQ】7、RabbitMQ主備復制是異步還是同步?
處理 問題 主從 https 可靠 sql 關鍵點 不返回 當前 轉自:https://yq.aliyun.com/articles/73040?spm=5176.100240.searchblog.116.RcXYdl 我們知道RabbitMQ可以配置成Queue做主從復
【BZOJ2726】[SDOI2012]任務安排 斜率優化+cdq分治
時間 斜率 bool print pan i+1 main 最小 可能 【BZOJ2726】[SDOI2012]任務安排 Description 機器上有N個需要處理的任務,它們構成了一個序列。這些任務被標號為1到N,因此序列的排列為1,2,3...N。這N個任務被
【題解】CQOI2015任務查詢系統
nod node bool ger -- oid 上進 while div 主席樹,操作上面基本上是一樣的。每一個時間節點一棵樹,一個樹上的每個節點代表一個優先級的節點。把開始和結束時間點離散,在每一棵樹上進行修改。註意因為一個時間節點可能會有多個修改,但我們要保證都在
【RabbitMQ】4、三種Exchange模式——訂閱、路由、通配符模式
message final 支持 sim 使用 完全 自己的 print ued 前兩篇博客介紹了兩種隊列模式,這篇博客介紹訂閱、路由和通配符模式,之所以放在一起介紹,是因為這三種模式都是用了Exchange交換機,消息沒有直接發送到隊列,而是發送到了交換機,經過隊列綁定交
【Quartz】定時任務中Job、JobDetail、JobDataMap、Trigger概述
1. Job中實現主要的業務邏輯;JobDetail中儲存引數等job的配置資訊,如JobDataMap;Scheduler主體實現類;Trigger觸發Job的執行。 JobDetail jobDetail = JobBuilder.newJob(TimedTaskGroupbuy.class
Django+celery+ RabbitMQ實現非同步任務
一,首先安裝celery pip install django-celery 二,安裝rabbitmq ubuntu環境下執行以下 sudo apt-get install rabbitmq-server 新增使用者,myuser為使用者名稱,mypassword為使用者
【RabbitMQ】連線RabbitMQ異常: com.rabbitmq.client.ShutdownSignalException: connection error; protocol meth
測試該工具類: package com.wj.utils; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; imp
【RabbitMq】rabbitMq訊息確認機制
一、提出問題 生產者將訊息傳送出去後,訊息是否到達RabbitMq伺服器呢?預設的情況下,是不知道的 二、引入訊息確認機制 兩種方式: 1.AMQP實現事務機制 &
【Linux】定時任務crontab
在Linux或類Unix系統中,通常使用 crontab 命令在指定的時間執行一個shell指令碼或者一系列Linux命令,也就是通常所說的定時任務。 一、cron 程序 在詳細介紹crontab之前,必須要說一下 cron 程序。 1、cron程序是linux中的守護程序
【RabbitMQ】 RabbitMQ安裝
MQ全稱為Message Queue, 訊息佇列(MQ)是一種應用程式對應用程式的通訊方法。應用程式通過讀寫出入佇列的訊息(針對應用程式的資料)來通訊,而無需專用連線來連結它們。訊息傳遞指的是程式之間通過在訊息中傳送資料進行通訊,而不是通過直接呼叫彼此來通訊,直接呼叫通常是用於諸如遠端過程呼叫的
【RabbitMQ】 RabbitMQ配置開機啟動 【Erlang】原始碼安裝 【RabbitMQ】 RabbitMQ安裝
環境 系統:Linux(CentOS 7.2) Erlang環境:21.1(安裝參考【Erlang】原始碼安裝) RabbitMQ:3.7.9(安裝參考【RabbitMQ】 RabbitMQ安裝) 配置開機啟動 1、增加自啟動指令碼: 在/etc/init.d目錄下新建一個
【封裝】非同步HttpURLConnection網路訪問
知識擴充套件: 【面相物件】靜態程式碼塊、構造程式碼塊和建構函式的執行順序 http://blog.csdn.net/u013806583/article/details/69934058 【封裝】使用okHttp進行網路請求及上傳下載進度監聽
【RabbitMQ】三種類型交換器 Fanout,Direct,Topic(轉)
出處:https://blog.csdn.net/fxq8866/article/details/62049393 RabbitMQ伺服器會根據路由鍵將訊息從交換器路由到佇列中,如何處理投遞到多個佇列的情況?這裡不同型別的交換器起到了重要的作用。分別是fanout,direct,topic,每一種型別實現了
1.【RabbitMQ】生產者,消費者,通道,佇列,交換器和繫結
瞭解訊息通訊中的一些重點概念對於深化對RabbitMQ的理解有重要的意義;下面從生產者,消費者,通道,佇列,交換器和繫結,來介紹他們在訊息通訊過程中的角色和作用; 生產者: 建立訊息,然後釋出到代理伺服器(RabbitMQ) 消費者: 連線到代理伺服器
【RabbitMQ】——centos7安裝rabbitmq教程
引言 訊息佇列現在在網際網路專案中應用的還是非常多的,在接下來的部落格中小編會深入的瞭解MQ的實現過程,在此部落格中將介紹如何在centos7下面安裝MQ以及遇到的問題。 第一步:安裝Erlang 因為rabbitMQ是Erlang語言編寫的,所以我們首先需