訊息中介軟體系列五:RabbitMQ的使用場景(非同步處理、應用解耦)
一、非同步處理
場景:
使用者註冊,寫入資料庫成功以後,傳送郵件和簡訊。
準備工作:
1)安裝RabbitMQ,參考前面的文章
2)新建一個名為RabbitMQAsyncProc的maven web工程,在pom.xml檔案裡面引入如下依賴
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.study.demo</groupId> <artifactId>RabbitMQAsyncProc</artifactId> <packaging>war</packaging> <version>0.0.1-SNAPSHOT</version> <name>RabbitMQAsyncProc Maven Webapp</name> <url>http://maven.apache.org</url> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency><groupId>javax</groupId> <artifactId>javaee-web-api</artifactId> <version>7.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>4.3.11.RELEASE</version> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>jstl</artifactId> <version>1.2</version> </dependency> <!--日誌--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.5</version> </dependency> <!-- <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>1.7.5</version> </dependency>--> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.0.13</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.0.13</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-access</artifactId> <version>1.0.13</version> </dependency> <!--JSON--> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-mapper-asl</artifactId> <version>1.9.13</version> </dependency> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-core-asl</artifactId> <version>1.9.13</version> </dependency> <!-- RabbitMQ --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.0.0</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.0.0.RELEASE</version> </dependency> <!--使用AspectJ方式註解需要相應的包--> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjrt</artifactId> <version>1.6.12</version> </dependency> <!--使用AspectJ方式註解需要相應的包--> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> <version>1.6.12</version> </dependency> </dependencies> <build> <finalName>RabbitMQAsyncProc</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-war-plugin</artifactId> <version>2.6</version> <configuration> <failOnMissingWebXml>false</failOnMissingWebXml> </configuration> </plugin> </plugins> <resources> <resource> <directory>${basedir}/src/main/java</directory> <includes> <include>**/*.xml</include> </includes> </resource> </resources> </build> </project>
1. 新建一個使用者資訊實體
package com.study.demo.vo; import java.util.UUID; /** * * @Description: 使用者資訊實體 * @author liguangsheng * @date 2018年9月18日 * */ public class User { private final String userId; private final String userName; private final String email; private final String phoneNumber; public User(String userId, String userName, String email, String phoneNumber) { this.userId = userId; this.userName = userName; this.email = email; this.phoneNumber = phoneNumber; } public String getUserId() { return userId; } public String getUserName() { return userName; } public String getEmail() { return email; } public String getPhoneNumber() { return phoneNumber; } public static User makeUser(String userName,String email,String phoneNumber){ String userId = UUID.randomUUID().toString(); return new User(userId,userName,email,phoneNumber); } @Override public String toString() { return "User{" + "userId='" + userId + '\'' + ", userName='" + userName + '\'' + ", email='" + email + '\'' + ", phoneNumber='" + phoneNumber + '\'' + '}'; } }
2. 新建一個使用者註冊介面
package com.study.demo.service; import com.study.demo.vo.User; /** * * @Description: 使用者註冊介面 * @author leeSmall * @date 2018年9月18日 * */ public interface IUserReg { public boolean userRegister(User user); }
3. 新建三個業務類
1)儲存使用者資料到資料庫
package com.study.demo.service.busi; import java.util.concurrent.ConcurrentHashMap; import org.springframework.stereotype.Service; import com.study.demo.vo.User; /** * * @Description: 儲存使用者資料到資料庫 * @author leeSmall * @date 2018年9月18日 * */ @Service public class SaveUser { private ConcurrentHashMap<String,User> userData = new ConcurrentHashMap<String, User>(); public void saveUser(User user){ try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } userData.putIfAbsent(user.getUserId(),user); } public User getUser(String userId){ return userData.get(userId); } }
2)傳送郵件的服務
package com.study.demo.service.busi; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; /** * * @Description: 傳送郵件的服務 * @author leeSmall * @date 2018年9月18日 * */ @Service public class SendEmail { private Logger logger = LoggerFactory.getLogger(SendEmail.class); public void sendEmail(String email){ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } logger.info("-------------Already Send email to "+email); } }
3)傳送簡訊的服務
package com.study.demo.service.busi; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; /** * * @Description: 傳送簡訊的服務 * @author leeSmall * @date 2018年9月18日 * */ @Service public class SendSms { private Logger logger = LoggerFactory.getLogger(SendSms.class); public void sendSms(String phoneNumber){ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } logger.info("-------------Already Send Sms to "+phoneNumber); } }
4. 新建/RabbitMQAsyncProc/src/main/java/applicationContext.xml配置檔案
<?xml version="1.0" encoding="UTF-8"?> <!-- 查詢最新的schemaLocation 訪問 http://www.springframework.org/schema/ --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:task="http://www.springframework.org/schema/task" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.0.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsd"> <aop:aspectj-autoproxy /> <!-- 配置掃描路徑 --> <context:component-scan base-package="com.study.demo"> <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/> </context:component-scan> <!-- rabbitMQ配置 --> <bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <constructor-arg value="127.0.0.1"/> <property name="username" value="guest"/> <property name="password" value="guest"/> <property name="channelCacheSize" value="8"/> <property name="port" value="5672"></property> </bean> <rabbit:admin connection-factory="rabbitConnectionFactory"/> <!--郵件相關佇列--> <rabbit:queue name="email_queue" durable="false"/> <!--簡訊相關佇列--> <rabbit:queue name="sms_queue" durable="false"/> <!--將佇列和交換器通過路由鍵繫結--> <rabbit:direct-exchange name="user-reg-exchange" xmlns="http://www.springframework.org/schema/rabbit" durable="true"> <rabbit:bindings> <rabbit:binding queue="email_queue" key="email" ></rabbit:binding> <rabbit:binding queue="sms_queue" key="sms" ></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!-- 建立rabbitTemplate 訊息模板類 --> <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"> <constructor-arg ref="rabbitConnectionFactory"></constructor-arg> </bean> </beans>
5. 新建一個序列的使用者註冊實現類
package com.study.demo.service.impl; import com.study.demo.service.IUserReg; import com.study.demo.service.busi.SaveUser; import com.study.demo.service.busi.SendEmail; import com.study.demo.service.busi.SendSms; import com.study.demo.vo.User; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; /** * * @Description: 序列的使用者註冊 * @author leeSmall * @date 2018年9月18日 * */ @Service @Qualifier("serial") public class SerialProcess implements IUserReg { @Autowired private SaveUser saveUser; @Autowired private SendEmail sendEmail; @Autowired private SendSms sendSms; public boolean userRegister(User user) { try { saveUser.saveUser(user); sendEmail.sendEmail(user.getEmail()); sendSms.sendSms(user.getPhoneNumber()); return true; } catch (Exception e) { return false; } } }
6. 新建一個並行的使用者註冊實現類
package com.study.demo.service.impl; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import com.study.demo.service.IUserReg; import com.study.demo.service.busi.SaveUser; import com.study.demo.service.busi.SendEmail; import com.study.demo.service.busi.SendSms; import com.study.demo.vo.User; /** * * @Description: 並行的使用者註冊 * @author leeSmall * @date 2018年9月18日 * */ @Service @Qualifier("para") public class ParalllerProcess implements IUserReg { private static Logger logger = LoggerFactory.getLogger(ParalllerProcess.class); @Autowired private SaveUser saveUser; @Autowired private SendEmail sendEmail; @Autowired private SendSms sendSms; //傳送郵件的執行緒 private static class SendEmailThread implements Callable<Boolean>{ private SendEmail sendEmail; private String email; public SendEmailThread(SendEmail sendEmail, String email) { this.sendEmail = sendEmail; this.email = email; } public Boolean call() throws Exception { sendEmail.sendEmail(email); logger.info("SendEmailThread send mail to"+email); return true; } } //傳送簡訊的執行緒 private static class SendSmsThread implements Callable<Boolean>{ private SendSms sendSms; private String phoneNumber; public SendSmsThread(SendSms sendSms, String phoneNumber) { this.sendSms = sendSms; this.phoneNumber = phoneNumber; } public Boolean call() throws Exception { sendSms.sendSms(phoneNumber); logger.info("SendSmsThread send mail to"+phoneNumber); return true; } } public boolean userRegister(User user) { FutureTask<Boolean> sendEmailFuture = new FutureTask<Boolean>(new SendEmailThread(sendEmail,user.getEmail())); FutureTask<Boolean> sendSmsFuture = new FutureTask<Boolean>(new SendSmsThread(sendSms,user.getPhoneNumber())); try { saveUser.saveUser(user); new Thread(sendEmailFuture).start(); new Thread(sendSmsFuture).start(); sendEmailFuture.get();//獲取郵件傳送的結果 sendSmsFuture.get();//獲取簡訊傳送的結果 return true; } catch (Exception e) { logger.error(e.toString()); return false; } } }
7. 新建一個RabbitMQ實現的非同步使用者註冊
package com.study.demo.service.impl; import com.study.demo.service.IUserReg; import com.study.demo.service.busi.SaveUser; import com.study.demo.vo.User; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; /** * * @Description: RabbitMQ實現的非同步使用者註冊 * @author leeSmall * @date 2018年9月18日 * */ @Service @Qualifier("async") public class AsyncProcess implements IUserReg{ private Logger logger = LoggerFactory.getLogger(AsyncProcess.class); @Autowired private RabbitTemplate rabbitTemplate; @Autowired private SaveUser saveUser; public boolean userRegister(User user) { try { saveUser.saveUser(user); rabbitTemplate.send("user-reg-exchange","email", new Message(user.getEmail().getBytes(),new MessageProperties())); rabbitTemplate.send("user-reg-exchange","sms", new Message(user.getEmail().getBytes(),new MessageProperties())); } catch (AmqpException e) { logger.error(e.toString()); return false; } return true; } }
8.新建一個RabbitMQ訊息消費端監聽郵件訊息類
package com.study.demo.service.mq; import com.study.demo.service.busi.SendEmail; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * * @Description: RabbitMQ訊息消費端監聽郵件訊息類 * @author leeSmall * @date 2018年9月18日 * */ @Component public class ProcessUserEmail implements MessageListener { private Logger logger = LoggerFactory.getLogger(ProcessUserEmail.class); @Autowired private SendEmail sendEmail; public void onMessage(Message message) { logger.info("accept message,ready process......"); sendEmail.sendEmail(new String(message.getBody())); } }
在/RabbitMQAsyncProc/src/main/java/applicationContext.xml新增郵件訊息監聽類配置:
<rabbit:listener-container connection-factory="rabbitConnectionFactory"> <rabbit:listener queues="email_queue" ref="processUserEmail" method="onMessage"/> </rabbit:listener-container>
9. 新建一個RabbitMQ訊息消費端監聽sms訊息類
package com.study.demo.service.mq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.study.demo.service.busi.SendSms; /** * * @Description: RabbitMQ訊息消費端監聽sms訊息類 * @author leeSmall * @date 2018年9月18日 * */ @Component public class ProcessUserSms implements MessageListener { private Logger logger = LoggerFactory.getLogger(ProcessUserSms.class); @Autowired private SendSms sendSms; public void onMessage(Message message) { logger.info("accept message,ready process......"相關推薦
訊息中介軟體系列五:RabbitMQ的使用場景(非同步處理、應用解耦)
一、非同步處理 場景: 使用者註冊,寫入資料庫成功以後,傳送郵件和簡訊。 準備工作: 1)安裝RabbitMQ,參考前面的文章 2)新建一個名為RabbitMQAsyncProc的maven web工程,在pom.xml檔案裡面引入如下依賴 <project xmlns="http://maven.a
RabbitMQ系列之七 分散式訊息佇列應用場景之非同步處理、應用解耦、流量削鋒和訊息通訊理解分析
摘要:訊息佇列中介軟體是分散式系統中重要的元件,主要解決應用耦合,非同步訊息,流量削鋒等問題。實現高效能,高可用,可伸縮和最終一致性架構。是大型分散式系統不可缺少的中介軟體。 目前在生產環境,使用較多的訊息佇列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,
訊息中介軟體系列五、rabbit訊息的確認機制
一、訊息的確認機制 1、消費者收到的每一條訊息都必須進行確認。(分為自動確認和消費者自行確認) 消費者在宣告佇列時,指定autoAck引數,true自動確認,false時rabbitmq會等到消費者顯示的發回一個ack訊號才會刪除訊息。autoAck=fals
訊息中介軟體學習2:RabbitMQ
2018年11月18日 12:22:30 wat1r 閱讀數:3 個人分類: 訊息中介軟體
訊息中介軟體系列第1講:為什麼要用訊息佇列?
訊息佇列中介軟體可以說是Java開發中最常使用的一塊技術了,基本上上了規模的系統都會使用訊息佇列來優化系統架構。那麼為什麼要使用訊息佇列?我們使用訊息佇列來解決什麼問題呢? 訊息佇列的優點 對於大多數系統來說,我們使用訊息佇列來做下面三件事情:解耦、削峰、非同步。[1] 第一個作用:解耦。 在多個系統
訊息中介軟體學習1:ActiveMQ
TODO 歡迎使用Markdown編輯器 你好! 這是你第一次使用 Markdown編輯器 所展示的歡迎頁。如果你想學習如何使用Markdown編輯器, 可以仔細閱讀這篇文章,瞭解一下Markdown的基本語法知識。 新的改變 我們對Markdown編輯器進行了一些功能拓展與
訊息中介軟體系列六,rabbit與spring整合實戰
本專案是rabbit和spring整合的實戰學習專案,模擬電商下單和庫存管理的過程,看過前面幾篇部落格的同學,相信這篇部落格對你不會再難了。一些和本章學習不太相關的內容不會做過多說明,需要的朋友可以下載原始碼自己檢視執行:rabbit與spring整合實戰原始
訊息中介軟體學習4:Kafka
2018年11月18日 12:23:45 wat1r 閱讀數:2 個人分類: 訊息中介軟體
訊息中介軟體學習3:RocketMQ
2018年11月18日 12:23:11 wat1r 閱讀數:4 個人分類: 訊息中介軟體
訊息中介軟體系列-JMS基本概念和模型
一、定義: JMS 全稱:Java Message Service,Java訊息服務,是Java EE中的一個技術,它定義了Java 中訪問訊息中介軟體的介面,並沒有給予實現,實現JMS介面的訊息中介軟體成為JMS Provider,例如:Active MQ 二、訊息模型
訊息中介軟體系列-JMS例項(ActiveMQ)
一、ActiveMQ安裝、配置、啟動、視覺化介面 1、安裝 下載地址:http://activemq.apache.org/download.html2、配置(conf目錄下) 1)使用者名稱密碼設定 2)開啟jmx監控 activemq.xml中進行如下修改 注:這裡的配
訊息中介軟體系列筆記
介紹 今天介紹的是中介軟體技術☞訊息中介軟體,將對市面比較流行的四款訊息中介軟體進行一個全網知識點彙總,所涉及的圖片版權如有侵權請聯絡我會對內容進行調整 什麼是訊息中介軟體 訊息中介軟體顧名思義,訊息傳遞的中間技術,用作端對端,點對點,進行通訊的技術,目前
MySQL技術內幕 InnoDB儲存引擎:鎖問題(髒讀、不可重複讀)
1、髒讀 在理解髒讀(Dirty Read)之前,需要理解髒資料的概念。但是髒資料和之前所介紹的髒頁完全是兩種不同的概念。髒頁指的是在緩衝池中已經被修改的頁,但是還沒有重新整理到磁碟中,即資料庫例項記憶體中的頁和磁碟中的頁的資料是不一致的,當然在重新整理到磁碟之前,日誌都已經被寫人到
【直播預告】:Java Spring Boot開發實戰系列課程【第11講】:訊息中介軟體 RabbitMQ 與api原始碼解析
內容概要:mq訊息中介軟體在高併發系統架構中扮演關鍵角色,阿里雙11高併發使用了mq技術。本次課程一起學習最新Java Spring Boot 2.0、RabbitMQ中介軟體的最新特性與實戰應用,同樣會分析核心api原始碼。主講人:徐雷(阿里雲棲特邀Java專家)直播時間:2019年1月8日 週二 今晚20
RabbitMQ(訊息中介軟體)在工作中的應用場景
RabbitMQ(訊息中介軟體)在工作中的應用場景 1、跨系統的非同步通訊,所有需要非同步互動的地方都可以使用訊息佇列。就像我們除了打電話(同步)以外,還需要發簡訊,發電子郵件(非同步)的通訊方式。 2、多個應用之間的耦合,由於訊息是平臺無關和語言
RabbitMQ(訊息中介軟體)的應用場景
1、跨系統的非同步通訊,所有需要非同步互動的地方都可以使用訊息佇列。就像我們除了打電話(同步)以外,還需要發簡訊,發電子郵件(非同步)的通訊方式。 2、多個應用之間的耦合,由於訊息是平臺無關和語言無關的,而且語義上也不再是函式呼叫,因此更適合作為多個應用之
Java進階面試系列之一:哥們,你們的系統架構中為什麼要引入訊息中介軟體?
歡迎關注個人公眾號:石杉的架構筆記(ID:shishan100) 週一至週五早8點半!精品技術文章準時送上! “ 這篇文章開始,我們把訊息中介軟體這塊高頻的面試問題來給大家說一下,也會涵蓋一些MQ中介軟體常見的技術問題。假如說面試官看你簡歷裡寫了MQ中介軟體的使用經驗,很可能會有如下的問題:你們公司生產
【Java進階面試系列之二】:哥們,那你說說系統架構引入訊息中介軟體有什麼缺點?
歡迎關注個人公眾號:石杉的架構筆記(ID:shishan100) 週一至週五早8點半!精品技術文章準時送上! 一、前情回顧 上篇文章「Java進階面試系列之一」你們系統架構中為何要引入訊息中介軟體?,給大家講了講訊息中介軟體引入系統架構的作用,主要是解決哪些問題的。 其比較常見的實踐場景是: 複雜系統
訊息中介軟體/佇列:ActiveMQ、RabbitMQ、Kafka、RocketMQ、ZeroMq
Kafka最高,RabbitMq 次之, ActiveMq 最差。 2)吞吐量對比: kafka具有高的吞吐量,內部採用訊息的批量處理,zero-copy機制,資料的儲存和獲取是本地磁碟順序批量操作,具有O(1)的複雜度,訊息處理的效率很高。 rabbitMQ在吞吐量方面稍遜於kafka,他們的出發點不一樣,
【Java進階面試系列之五】訊息中介軟體叢集崩潰,如何保證百萬生產資料不丟失?【石杉的架構筆記】
歡迎關注個人公眾號:石杉的架構筆記(ID:shishan100) 週一至週五早8點半!精品技術文章準時送上! “上一篇講訊息中介軟體的文章《扎心!線上服務宕機時,如何保證資料100%不丟失?》,初步給大家介紹了一個在生產環境中可能遇到的問題,就是你的消費者服務可能會宕機,一旦宕機,你就需要考慮是否會導致