1. 程式人生 > >【RabbitMQ】非同步任務

【RabbitMQ】非同步任務

一、前言

     上一篇部落格介紹了用執行緒池實現非同步任務。這一篇部落格談一談用MQ實現非同步任務。MQ的產品有灰常多,像什麼MSMQ、activeMQ、RocketMQ、RabbitMQ、kafak等。在此之前先談一談對訊息佇列的理解。

二、MQ

       MQ是一種應用程式對應用程式的通訊方法,應用程式通過讀寫出入佇列的訊息來進行通訊,兩者無需建立連線,釋出者和消費者無需知道對方的存在。

      MQ是生產者--消費者模型的一個代表,一端往訊息佇列中不斷寫入訊息,而另一端則可以不斷讀取或訂閱佇列中的訊息。

      使用場景:

      1、非同步處理:在專案中將一些無需及時返回且耗時的操作提取出來,進行非同步處理,採用非同步處理,將大大節省了伺服器的請求響應時間,從而提高了系統的吞吐量。

       2、訊息推送

三、RabbitMQ特點

       RabbitMQ是一個由Erlang語言開發的AMQP的開源實現。

       RabbitMQ的特點為:

       1、可靠性

       RabbitMQ提供了多種技術可以讓你在效能和可靠性之間進行權衡,如永續性、傳輸確認、投遞確認。

       2、靈活的路由

       在訊息進行佇列之前,通過Exchange來路由訊息。對於典型的路由功能,RabbitMQ已經提供了一些內建的Exchange來實現,針對更復雜的路由功能,可以將多個Exchange繫結在一起,也可以通過外掛機制來實現自己的Exchange。

       3、訊息叢集

        多個RabbitMQ伺服器可以組成一個叢集,形成一個邏輯Broker。

       4、高可用

       在同一叢集中佇列可以被映象到多臺機器,使得在部分節點出現問題的情況下,佇列仍然可用。

       5、多協議

       RabbitMQ支援多種訊息佇列協議。

       6、多語言客戶端

       支援java、.NET、Ruby等

       7、管理介面

        RabbitMQ提供了一個易用的使用者介面,使用者可以監控和管理Broker的許多方面。

       8、跟蹤機制

        如果訊息異常,RabbitMQ提供了訊息跟蹤機制,使用者可以找出發生了什麼。

       9、外掛機制

        RabbitMQ提供了很多外掛,來從多方面進行擴充套件,也可以編寫各種外掛。

四、RabbitMQ中的概念模型

  •  訊息抽象模型

       消費者訂閱某個佇列,生產者建立訊息,然後釋出到佇列中,最後將訊息傳送到監聽的消費者。

        

  • RabbitMQ的基本概念

   上圖是一個最簡單的抽象模型,接下來將展示一個更加詳細的模型。RabbitMQ內部結構如下:

   

   1、Message

    Message由訊息頭和訊息體組成,訊息體是不透明的,訊息頭是由一些屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其他訊息的優先權)、delivery-mode(指出該訊息可能需要永續性儲存)等。

   2、Publisher

    訊息的生產者,也是一個向交換機發布訊息的客戶端應用程式。

   3、Exchange

    用來接收生產者傳送的訊息,並按照一定的規則將這些訊息路由給伺服器中的佇列。

   4、Binding

    繫結,用於訊息佇列和交換器之間的路由關聯,一個繫結就是將交換器和佇列連線起來的路由規則。

   5、Queue

   用來儲存訊息直到傳送給消費者,它是訊息的容器,1個訊息可投入一個到多個佇列,訊息一直在佇列裡,等待消費者取走。

   6、Connection

    網路連線,比如一個TCP連線。

   7、Channel

    通道,或者稱之為管道,是一條雙向資料流通道,AMQP都是通過通道發出去的,不管是釋出訊息,訂閱佇列,還是接受訊息,這些動作都是通過通道來完成的。

    8、Consumer

    訊息的消費者,表示一個從訊息佇列中取的訊息的客戶端應用程式。

    9、Virtual Host

    表示一批交換器、訊息佇列和相關物件。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 伺服器,擁有自己的佇列、交換器、繫結和許可權機制。vhost 是 AMQP 概念的基礎,必須在連線時指定,RabbitMQ 預設的 vhost 是 / 。

    10、Broker

     表示RabbitMQ伺服器實體

五、入門Demo

      pom檔案新增依賴

    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>4.3.0</version>
    </dependency>

    <dependency>
      <groupId>com.github.sstone</groupId>
      <artifactId>amqp-client_2.10</artifactId>
      <version>1.3</version>
    </dependency>

1、Publisher

public class Publisher {
    //佇列名稱
    private final static String QUEUE_NAME = "Queue";
    public static void main(String[] args) {
        // 建立連線工廠
        ConnectionFactory factory = null;
        // 建立到代理伺服器到連線
        Connection connection = null;
        // 獲得通道
        Channel channel = null;
        try {
            factory = new ConnectionFactory();
            //設定使用者名稱和密碼
            factory.setUsername("guest");
            factory.setPassword("guest");
            // 設定 RabbitMQ 地址
            factory.setHost("localhost");
            // 建立到代理伺服器到連線
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "hello world,hello world";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println("傳送  message[" + message + "] to "+ QUEUE_NAME +" success!");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            try {
// 關閉資源
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
}

2、consumer

public class Consumer {
    //佇列名稱
    private final static String QUEUE_NAME = "Queue";
    public static void main(String[] args) {
        // 建立連線工廠
        ConnectionFactory factory = null;
        // 建立到代理伺服器到連線
        Connection connection = null;
        // 獲得通道
        Channel channel = null;
        try {
            factory = new ConnectionFactory();
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setHost("localhost");
            // 建立到代理伺服器到連線
            connection = factory.newConnection();
            channel = connection.createChannel();
            // 1.佇列名2.是否持久化,3是否侷限與連結,4不再使用是否刪除,5其他的屬性
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 宣告一個消費者,配置好獲取訊息的方式
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(QUEUE_NAME, true, consumer);
            // 迴圈獲取訊息
            while (true) {
            // 迴圈獲取資訊
            // 指向下一個訊息,如果沒有會一直阻塞
                Delivery delivery = consumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println("接收 message[" + msg + "] from " + QUEUE_NAME);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (ShutdownSignalException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
            // 關閉資源
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
}

效果圖如下:

相關推薦

RabbitMQ非同步任務

一、前言     上一篇部落格介紹了用執行緒池實現非同步任務。這一篇部落格談一談用MQ實現非同步任務。MQ的產品有灰常多,像什麼MSMQ、activeMQ、RocketMQ、RabbitMQ、kafak等。在此之前先談一談對訊息佇列的理解。二、MQ       MQ是一種應用

RabbitMQ5、RabbitMQ任務分發機制

它的 rtu 忘記 順序 sin spa 機制 一段時間 cto 當有Consumer需要大量的運算時,RabbitMQ Server需要一定的分發機制來balance每個Consumer的load。接下來我們分布講解。 應用場景就是RabbitMQ Server會

玩轉SpringBoot非同步任務執行與其執行緒池配置

同步程式碼寫起來簡單,但就是怕遇到耗時操作,會影響效率和吞吐量。此時非同步程式碼才是王者,但涉及多執行緒和執行緒池,以及非同步結果的獲取,寫起來頗為麻煩。不過在遇到SpringBoot非同步任務時,這個問題就不存在了。因為Spring家族是最替使用者考慮的。結果就是,像同步一樣簡單,像非同步一樣強大。眾所熟悉

RabbitMQ3、win7下安裝RabbitMQ

默認 窗體 releases style gen gem 執行 file spl RabbitMQ依賴erlang,所以先安裝erlang,然後再安裝RabbitMQ; erlang,下載地址:http://www.erlang.org/download Rabb

RabbitMQ4、幾種Exchange 模式

copy 消息發送 但是 net with .html ole img lis AMQP協議中的核心思想就是生產者和消費者隔離,生產者從不直接將消息發送給隊列。生產者通常不知道是否一個消息會被發送到隊列中,只是將消息發送到一個交換機。先由Exchange來接收,然後Exch

RabbitMQ7、RabbitMQ主備復制是異步還是同步?

處理 問題 主從 https 可靠 sql 關鍵點 不返回 當前 轉自:https://yq.aliyun.com/articles/73040?spm=5176.100240.searchblog.116.RcXYdl 我們知道RabbitMQ可以配置成Queue做主從復

BZOJ2726[SDOI2012]任務安排 斜率優化+cdq分治

時間 斜率 bool print pan i+1 main 最小 可能 【BZOJ2726】[SDOI2012]任務安排 Description 機器上有N個需要處理的任務,它們構成了一個序列。這些任務被標號為1到N,因此序列的排列為1,2,3...N。這N個任務被

題解CQOI2015任務查詢系統

nod node bool ger -- oid 上進 while div   主席樹,操作上面基本上是一樣的。每一個時間節點一棵樹,一個樹上的每個節點代表一個優先級的節點。把開始和結束時間點離散,在每一棵樹上進行修改。註意因為一個時間節點可能會有多個修改,但我們要保證都在

RabbitMQ4、三種Exchange模式——訂閱、路由、通配符模式

message final 支持 sim 使用 完全 自己的 print ued 前兩篇博客介紹了兩種隊列模式,這篇博客介紹訂閱、路由和通配符模式,之所以放在一起介紹,是因為這三種模式都是用了Exchange交換機,消息沒有直接發送到隊列,而是發送到了交換機,經過隊列綁定交

Quartz定時任務中Job、JobDetail、JobDataMap、Trigger概述

1. Job中實現主要的業務邏輯;JobDetail中儲存引數等job的配置資訊,如JobDataMap;Scheduler主體實現類;Trigger觸發Job的執行。 JobDetail jobDetail = JobBuilder.newJob(TimedTaskGroupbuy.class

Django+celery+ RabbitMQ實現非同步任務

一,首先安裝celery pip install django-celery 二,安裝rabbitmq ubuntu環境下執行以下 sudo apt-get install rabbitmq-server 新增使用者,myuser為使用者名稱,mypassword為使用者

RabbitMQ連線RabbitMQ異常: com.rabbitmq.client.ShutdownSignalException: connection error; protocol meth

測試該工具類: package com.wj.utils; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; imp

RabbitMqrabbitMq訊息確認機制

  一、提出問題 生產者將訊息傳送出去後,訊息是否到達RabbitMq伺服器呢?預設的情況下,是不知道的 二、引入訊息確認機制 兩種方式:           1.AMQP實現事務機制     &

Linux定時任務crontab

在Linux或類Unix系統中,通常使用 crontab 命令在指定的時間執行一個shell指令碼或者一系列Linux命令,也就是通常所說的定時任務。 一、cron 程序 在詳細介紹crontab之前,必須要說一下 cron 程序。 1、cron程序是linux中的守護程序

RabbitMQ RabbitMQ安裝

  MQ全稱為Message Queue, 訊息佇列(MQ)是一種應用程式對應用程式的通訊方法。應用程式通過讀寫出入佇列的訊息(針對應用程式的資料)來通訊,而無需專用連線來連結它們。訊息傳遞指的是程式之間通過在訊息中傳送資料進行通訊,而不是通過直接呼叫彼此來通訊,直接呼叫通常是用於諸如遠端過程呼叫的

RabbitMQ RabbitMQ配置開機啟動 Erlang原始碼安裝 RabbitMQ RabbitMQ安裝

環境   系統:Linux(CentOS 7.2)   Erlang環境:21.1(安裝參考【Erlang】原始碼安裝)   RabbitMQ:3.7.9(安裝參考【RabbitMQ】 RabbitMQ安裝) 配置開機啟動   1、增加自啟動指令碼:     在/etc/init.d目錄下新建一個

封裝非同步HttpURLConnection網路訪問

知識擴充套件: 【面相物件】靜態程式碼塊、構造程式碼塊和建構函式的執行順序 http://blog.csdn.net/u013806583/article/details/69934058 【封裝】使用okHttp進行網路請求及上傳下載進度監聽

RabbitMQ三種類型交換器 Fanout,Direct,Topic(轉)

出處:https://blog.csdn.net/fxq8866/article/details/62049393 RabbitMQ伺服器會根據路由鍵將訊息從交換器路由到佇列中,如何處理投遞到多個佇列的情況?這裡不同型別的交換器起到了重要的作用。分別是fanout,direct,topic,每一種型別實現了

1.RabbitMQ生產者,消費者,通道,佇列,交換器和繫結

瞭解訊息通訊中的一些重點概念對於深化對RabbitMQ的理解有重要的意義;下面從生產者,消費者,通道,佇列,交換器和繫結,來介紹他們在訊息通訊過程中的角色和作用; 生產者:   建立訊息,然後釋出到代理伺服器(RabbitMQ) 消費者: 連線到代理伺服器

RabbitMQ——centos7安裝rabbitmq教程

引言   訊息佇列現在在網際網路專案中應用的還是非常多的,在接下來的部落格中小編會深入的瞭解MQ的實現過程,在此部落格中將介紹如何在centos7下面安裝MQ以及遇到的問題。   第一步:安裝Erlang   因為rabbitMQ是Erlang語言編寫的,所以我們首先需