1. 程式人生 > >rabbimq佇列之死信佇列和延遲佇列

rabbimq佇列之死信佇列和延遲佇列

        在專案開發設計過程中,我們經常用到非同步推送的場景,比如下單和扣減庫存非同步執行。常用的非同步中介軟體有mq有很多,我們這裡以Rabbitmq為例進行講解。我們知道只要是涉及非同步場景的問題,就有成功失敗之分,如果生產者傳送訊息一直失敗要怎麼做呢,這裡我們就用到了死信佇列來實現失敗訊息的處理。

流程圖如下圖:

廢話不多說直接上程式碼:

rabbitmq配置類:

package com.mq.test.config;


import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import 
com.rabbitmq.client.ConnectionFactory; import java.io.IOException; /** * Created by Administrator on 2018/6/7. */ public class RabbitmqConfig { private final static String IP = "172.16.11.119"; private final static int PORT = 5672; private final static String USERNAME = "root"; private final static
String PASSWORD = "root"; private static Connection connection = null; private static ConnectionFactory factory = null; static { factory = new ConnectionFactory(); Address[] addresses = new Address[]{new Address(IP, PORT)}; factory.setUsername(USERNAME); factory
.setPassword(PASSWORD); try { connection = factory.newConnection(addresses); } catch (IOException e) { e.printStackTrace(); } } public static Channel getChannel() throws IOException { if (connection == null) { if (factory != null) { return factory.newConnection().createChannel(); }else{ return null; } } return connection.createChannel(); } }
生產者類:
package com.mq.test;

import com.mq.test.config.RabbitmqConfig;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;

/**
 * Created by Administrator on 2018/6/7.
 */
public class MqProducerTest {
    private final static String DLX_EXCHANGE_NAME = "dlx_exchange";
    private final static String NORMAL_EXCHANGE_NAME = "normal_exchange";
//    private final static String DLX_ROUTINGKEY= "dlx";
private final static String NORMAL_ROUTINGKEY= "normal";
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitmqConfig.getChannel();
        channel.exchangeDeclare(NORMAL_EXCHANGE_NAME,"direct",true);
        channel.exchangeDeclare(DLX_EXCHANGE_NAME,"direct",true);
        channel.basicPublish(NORMAL_EXCHANGE_NAME,NORMAL_ROUTINGKEY, MessageProperties.PERSISTENT_TEXT_PLAIN,"dlx test...".getBytes());
    }
}
消費者類:
package com.mq.test;

import com.mq.test.config.RabbitmqConfig;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * Created by Administrator on 2018/6/7.
 */
public class MqConsumerTest {
    private final static String DLX_EXCHANGE_NAME = "dlx_exchange";
    private final static String NORMAL_EXCHANGE_NAME = "normal_exchange";
    private final static String DLX_ROUTINGKEY = "dlx";
    private final static String NORMAL_ROUTINGKEY = "normal";
    private final static String DLX_QUEUE = "dlx";
    private final static String NORMAL_QUEUE = "normal";

    public static void main(String[] args) throws IOException {
        final Channel channel = RabbitmqConfig.getChannel();
        //建立正常佇列和繫結,帶引數的
Map<String, Object> arguments = new HashMap<String, Object>();
        arguments.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
        arguments.put("x-dead-letter-routing-key", DLX_ROUTINGKEY);
        arguments.put("x-message-ttl",6000);//如果想實現延遲佇列的功能可以設定,只要訊息在佇列達到設定的這個時間沒有被消費,就自動傳送到死信佇列,注意這裡是訊息未被消費,而不是未被ack!!!
        channel.queueDeclare(NORMAL_QUEUE, true, false, false, arguments);
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE_NAME, NORMAL_ROUTINGKEY);
        //建立死信佇列和繫結
channel.queueDeclare(DLX_QUEUE, true, false, false, null);
        channel.queueBind(DLX_QUEUE, DLX_EXCHANGE_NAME, DLX_ROUTINGKEY);
        channel.basicQos(64);
        channel.basicConsume(NORMAL_QUEUE, false, "myConsumerTag", new Consumer() {
            @Override
public void handleConsumeOk(String s) {

            }

            @Override
public void handleCancelOk(String s) {

            }

            @Override
public void handleCancel(String s) throws IOException {

            }

            @Override
public void handleShutdownSignal(String s, ShutdownSignalException e) {

            }

            @Override
public void handleRecoverOk(String s) {

            }

            @Override
public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
                String routingKey = envelope.getRoutingKey();
                String contentType = basicProperties.getContentType();
                long deliveryTag = envelope.getDeliveryTag();
                try {
                    Thread.sleep(8000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
//                channel.basicReject(deliveryTag,false);
}
        });
    }
}
延時或是死信佇列實現:
package com.mq.test;

import com.mq.test.config.RabbitmqConfig;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * Created by Administrator on 2018/6/7.
 */
public class MqdeplayQueueTest {
    private final static String DLX_QUEUE = "dlx";
    public static void main(String[] args) throws IOException {
        final Channel channel = RabbitmqConfig.getChannel();
        channel.basicConsume(DLX_QUEUE, true, "myDlxConsumer", new Consumer() {
            @Override
public void handleConsumeOk(String s) {

            }

            @Override
public void handleCancelOk(String s) {

            }

            @Override
public void handleCancel(String s) throws IOException {

            }

            @Override
public void handleShutdownSignal(String s, ShutdownSignalException e) {

            }

            @Override
public void handleRecoverOk(String s) {

            }

            @Override
public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
                long deliveryTag = envelope.getDeliveryTag();
                System.out.println("這是延遲佇列收到的訊息:"+new String(bytes));
            }
        });
    }
}
這就是全部的程式碼,歡迎大家交流!


相關推薦

rabbimq佇列死信佇列延遲佇列

        在專案開發設計過程中,我們經常用到非同步推送的場景,比如下單和扣減庫存非同步執行。常用的非同步中介軟體有mq有很多,我們這裡以Rabbitmq為例進行講解。我們知道只要是涉及非同步場景的問題,就有成功失敗之分,如果生產者傳送訊息一直失敗要怎麼做呢,這裡我們就用

RabbitMQ死信機制實現延遲佇列

延遲佇列 延遲佇列儲存的物件肯定是對應的延時訊息,所謂”延時訊息”是指當訊息被髮送以後,並不想讓消費者立即拿到訊息,而是等待指定時間後,消費者才拿到這個訊息進行消費。 應用場景 三方支付,掃碼支付呼叫上游的掃碼介面,當掃碼有效期過後去呼叫查詢介面查詢結果。實現方式:每當一筆掃碼支付請求後

訊息佇列RabbitMQ - 簡介安裝

       訊息佇列:是簡單的生產者和消費者模式,它的出現是讓各個服務板塊之間解耦和訊息通知。比如,我們一般生成服務板塊中的資料存在有:資料庫,靜態檔案,搜尋系統,hdfs等,那麼如果資料庫中的資料發生了變化,怎麼把這個訊息推送給其他的資料儲存單元呢?如果單

Spring Boot + RabbitMQ死信機制實現延遲佇列

本文轉載自部落格:http://blog.csdn.net/nexttake/article/details/78607486 ------------------------------------------------------------------------------------

ActiveMQ虛擬主題映象佇列

轉自:http://blog.csdn.net/zhu_tianwei/article/details/46303419, 略做補充ActiveMQ支援的虛擬Destinations分為有兩種,分別是1.虛擬主題(Virtual Topics)2.組合 Destination

RabbitMQ訊息佇列二:消費者生產者 Demo

在使用RabbitMQ之前,需要了解RabbitMQ的工作原理。 RabbitMQ的工作原理 RabbitMQ是訊息代理。從本質上說,它接受來自生產者的資訊,並將它們傳遞給消費者。在兩者之間,它可以根據你給它的路由,緩衝規則進行傳遞訊息。 示例圖

ionic入門深連結延遲載入

主要模組 延遲載入聽起來像是一個複雜的過程,但實際上是非常直截了當。從概念上講,一段程式碼,隨著應用程式請求而載入。NgModules 是可以組織我們的應用程式頁,並將它們分開在不同的資料塊的方式。

計算機組成原理指令排程延遲分支

一.實驗目的 (1)加深對指令排程技術的理解。 (2)加深對延遲分支技術的理解。 (3)熟練掌握用指令排程技術解決流水線中的資料衝突的方法。 (4)進一步理解指令排程技術對CPU效能的改進。 (5)進一步理解延遲分支技術對CPU效能的改進。

hibernate檢索小結——查詢優化延遲載入

Hibernate抓取(檢索方式) 檢索分為:立即檢索和延遲檢索 區別和原理:立即檢索當(使用者)請求一發立刻向資料庫傳送sql語句,不管該物件有沒有被使用(訪問去屬性)。而延遲檢索則是類似於保留查詢,只有在該物件唄使用的時候才會想資料庫傳送sql語句,其實延遲檢索返回的是

Java併發(十八):阻塞佇列BlockingQueue BlockingQueue(阻塞佇列)詳解 二叉堆(一) 圖文解析 C語言的實現 多執行緒程式設計:阻塞、併發佇列的使用總結 Java併發程式設計:阻塞佇列 java阻塞佇列 BlockingQueue(阻塞佇列)詳解

阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。 這兩個附加的操作是:在佇列為空時,獲取元素的執行緒會等待佇列變為非空。當佇列滿時,儲存元素的執行緒會等待佇列可用。 阻塞佇列常用於生產者和消費者的場景,生產者是往佇列裡新增元素的執行緒,消費者是從佇列裡拿元素的執行緒。阻塞佇列就是生產者

訊息中介軟體--RabbitMQ學習(十七)---高階特性死信佇列

死信佇列:DLX,Dead- Letter- Exchange 利用DLX,當訊息在一個佇列中變成死信( dead message)之後它能被重新 publish到另一個 Exchange,這個 Exchange就是DLX 死信佇列訊息變成死信有一下幾種情況

java執行緒池ThreadPoolExecutor(二):任務入佇列任務丟棄

一、關於任務入佇列 在上一篇【java執行緒池之ThreadPoolExecutor(一)】中, ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,

【Linux】程序間通訊訊息佇列、訊號量共享儲存

訊息佇列、訊號量和共享儲存是IPC(程序間通訊)的三種形式,它們功能不同,但有相似之處,下面先介紹它們的相似點,然後再逐一說明。 1、相似點 每個核心中的IPC結構(訊息佇列、訊號量和共享儲存)都用一個非負整數的識別符號加以引用,與檔案描述符不同,當一個

資料結構java版《棧佇列

1、棧。(Android的Activity載入是基礎棧結構的)底層使用陣列實現package ch4; /** * 棧 * @author Howard * 特點: * 1、通常情況作為程式設計

RabbitMQ死信佇列

DLX, Dead-Letter-Exchange。利用DLX, 當訊息在一個佇列中變成死信(dead message)之後,它能被重新publish到另一個Exchange,這個Exchange就是DLX。訊息變成死信一向有一下幾種情況: 訊息被拒絕(ba

DelayQueue延遲佇列Redis快取實現訂單自動取消功能

首先這是一個操作頻繁的自動化定時功能,對比於定時器有著更大的使用空間和效能優化,無論是前端的setTimeout與setInterval 定時器還是後端的TimerTask定時器,在面對短期內的頻繁操作都會有著效能和多執行緒之間的問題,所以這時的佇列就起到很重要的作用了,尤其

堵塞佇列ArrayBlockingQueueLinkedBlockingQueue解析

線上程池建立的時候,需要傳一個堵塞佇列來維護需要執行的執行緒任務,其中最常用的是ArrayBlockingQueue和LinkedBlockingQueue。他們都繼承了BlockingQueue介面。 ArrayBlockingQueue 一個有邊

多程序程式設計程序間通訊-管道訊息佇列

1.程序間通訊 Linux作為一種新興的作業系統,幾乎支援所有的Unix下常用的程序間通訊方法:管道、訊息佇列、共享記憶體、訊號量、套介面等等。 2.管道 管道是程序間通訊中最古老的方式,它包括無名管道(或者匿名管道)和有名管道兩種,前者用於父程序和

Spring Boot 實現 RabbitMQ 延遲消費延遲重試佇列

並增加了自己的一些理解,記錄下來,以便日後查閱。 專案原始碼: 背景 何為延遲佇列? 顧名思義,延遲佇列就是進入該佇列的訊息會被延遲消費的佇列。而一般的佇列,訊息一旦入隊了之後就會被消費者馬上消費。 延遲佇列能做什麼?延遲佇列多用於需要延遲工作的場景。最常見的是以下兩種場景: 延遲消費。比如:使用者生成

C++佇列優先佇列

C++ queue(佇列)Priority queue(優先佇列) 【queue】 C++佇列是一種容器介面卡,它給予程式設計師一種先進先出(FIFO)的資料結構。1.back() 返回一個引用,指向最後一個元素2.empty() 如果佇列空則返回真3.front()