在專案中運用rabbitmq
本章內容
- 解耦思維
- 例項解析
- 實現RPC功能
在專案開發中,我們談論最多的應該就是專案的可擴充套件性,解耦專案中的各模組就是解決擴充套件性的一種實現方式。為什麼要解耦?什麼樣的場景需要解耦呢?
解耦思維
專案開發使用的技術就好像是一個專案的發動機一樣,主要是為了業務提供充足的動力。隨時間的推移,公司業務會不斷的發生變化。就需要技術進行支援。所以就要求在專案的開發過程中,預留一定的擴充套件性,而要擴充套件就需要按某種規則拆分專案為小的模組,而模組與模組之間的耦合性越低,相互影響就越小,擴充套件時系統穩定性就越好。同時,也利於日常的維護。至於拆分的粒度也要充分討論,如果拆分的太細,在之後需要開發新需求時就需要更多的組共同進行協作,溝通成本會很大。如果太粗又達不到解耦的目的。應用程式和儲存之間直接耦合通常就是導致程式被淘汰的主要原因。這也正是MQ能夠幫我們解決的問題所在。
面向訊息的設計主要是考慮如何把一個比較耗時的程式移出,單獨處理而使主程式繼續它的工作。本質就是非同步思維考慮解耦請求和具體的操作(與生產者和訊息者思想相同),很像(AIO的做法)。這也比較符合現實世界生活,例如:我們去餐廳單餐時,前臺接待我們後把訂單交給後廚,之後又接收其它客戶,而我們在點餐完成後也沒有一直在那裡等待而是玩著手機等待叫號。
面向訊息的程式還有一個特點就是 你關心的是完成任務,但並不是實時完成的,無須應答請求。而當我們完成任務的動作無法跟上請求的速度時,我們還可以利用自動輪詢的模式,把MQ充當負載均衡器來使用,使得程式有更好的擴充套件性。
發後即忘模型
匹配這種模型的幾種一般型別的任務:
- 批處理,針對大型資料集合的工作。這種型別的工作一般可以構建為單一的任務請求,或者多個任務對資料集合的獨立部分進行操作。
- 通知,對發生事情的描述。其內容可以是某些日誌,或者是一個報告需要傳送給另一個或多個程式。
- 並行處理,對相互獨立的程式有統一的觸發點。例如:客戶下單後(觸發點),給客戶新增積分、通知倉庫準備發貨、通知財務開相應的開票等等 這些操作並沒有明顯的序列關係。
例項解析
例項一
星期天,你在家躺在沙發上嗑著瓜子、看著電視(也有可能是你老婆,哈哈)。中午到了,你命令你媳婦做飯,糖醋排骨,你媳婦就去忙活了。5分鐘後你又想吃雞了,於是又命令你媳婦再做一個土豆燒雞。
特點:你自己就可以看到是一個生產者(命令的發出者),而你媳婦就是一個消費者(命令的接收執行者)。你與你媳婦之間只有一個佇列通道,接收到命令後也是按順序做的飯,所以可以使用direct型別的交換器。
首先,就是你媳婦接收你的命令
public void entertainYou(String need){ Connection connection = ConnectionPool.getConnection(); try { Channel channel = connection.createChannel(); channel.exchangeDeclare(Consts.EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true); channel.queueDeclare(Consts.QUEUE_NAME,true,false,false,null); channel.queueBind(Consts.QUEUE_NAME,Consts.EXCHANGE_NAME,"command.make_lunch"); Map<String,String> content = new HashMap<String, String>(); content.put("id", UUID.randomUUID().toString()); content.put("need",need); content.put("time", Calculator.getCurrTime()); String str = new JSONWriter().write(content); channel.basicPublish(Consts.EXCHANGE_NAME,"command.make_lunch",getBasicProperties(), str.getBytes("UTF-8")); ConnectionPool.closeChannel(channel); ConnectionPool.closeConnection(connection); } catch (IOException e) { e.printStackTrace(); } } private AMQP.BasicProperties getBasicProperties(){ return basicProperties==null? basicProperties = new AMQP.BasicProperties.Builder() .contentType(Consts.ContentType.JSON) .contentEncoding("UTF-8") .build():basicProperties; }
然後去廚房做飯:
public void makeLunch(){ Connection connection = ConnectionPool.getConnection(); try { final Channel channel = connection.createChannel(); channel.exchangeDeclare(Consts.EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true); channel.queueDeclare(Consts.QUEUE_NAME,true,false,false,null); channel.queueBind(Consts.QUEUE_NAME,Consts.EXCHANGE_NAME,"command.make_lunch"); channel.basicConsume(Consts.QUEUE_NAME,false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String contentType = properties.getContentType(); //System.out.println(contentType); Map<String,String> content = (Map<String,String>)new JSONReader().read(new String(body)); System.out.println("你媳婦接受了你的請求:"+content); getChannel().basicAck(envelope.getDeliveryTag(),false); } }); } catch (IOException e) { e.printStackTrace(); } }
測試你的命令是否可用呢?
public static void main(String args[]){ //我發出命令 System.out.println("我命令媳婦去做飯!!"); cashier.entertainYou("我要吃糖醋排骨"); System.out.println("我繼續嗑著瓜子、看著電視"); //媳婦接收命令後就去做飯了 kitchen.makeLunch(); //我發出命令 System.out.println("我又想吃其它的了!!"); cashier.entertainYou("我還要吃土豆燒雞"); }
測試的結果:
我命令媳婦去做飯!! 我繼續嗑著瓜子、看著電視 我又想吃其它的了!! 你媳婦正在做:{need=我要吃糖醋排骨, id=652e8542-bd9c-4e78-9703-459b3d885b0c, time=2018-12-06 15:28:13} 你媳婦正在做:{need=我還要吃土豆燒雞, id=0a1dfb75-b20f-4b93-8da7-67178b1cd6aa, time=2018-12-06 15:28:13}
例項二
小明在你的購物網站上花2000塊錢買一部手機,確認下單並且已經付款後,你的網站需要做如下操作:1.給小明的積分加20,2.把訂單發給倉庫準備出貨處理,3.通知財務開一張發票。
首先接收訂單:
private static final String ORDER_EXCHANGE = "exchange.order"; private static final String ORDER_QUEUE = "queue.order"; private static final String ORDER_ROUTING = "order"; public void acceptOrder(Map<String,String> orderMap){ Connection connection = ConnectionPool.getConnection(); try { Channel channel = connection.createChannel(); channel.exchangeDeclare(ORDER_EXCHANGE, BuiltinExchangeType.FANOUT,true); channel.queueDeclare(ORDER_QUEUE+".jf",true,false,false,null); channel.queueDeclare(ORDER_QUEUE+".ck",true,false,false,null); channel.queueDeclare(ORDER_QUEUE+".cw",true,false,false,null); channel.queueBind(ORDER_QUEUE+".jf",ORDER_EXCHANGE,ORDER_ROUTING); channel.queueBind(ORDER_QUEUE+".ck",ORDER_EXCHANGE,ORDER_ROUTING); channel.queueBind(ORDER_QUEUE+".cw",ORDER_EXCHANGE,ORDER_ROUTING); Map<String,Object> content = new HashMap<String, Object>(); Map<String,String> body = new HashMap<String, String>(); content.put("id", UUID.randomUUID().toString()); content.put("time", Calculator.getCurrTime()); content.put("orderContent",body); body.putAll(orderMap); String str = new JSONWriter().write(content); channel.basicPublish(ORDER_EXCHANGE,ORDER_ROUTING,getBasicProperties(), str.getBytes("UTF-8")); ConnectionPool.closeChannel(channel); ConnectionPool.closeConnection(connection); } catch (IOException e) { e.printStackTrace(); } }
然後,三個消費者:
public void jf(){ Connection connection = ConnectionPool.getConnection(); try { Channel channel = connection.createChannel(); channel.exchangeDeclare(ORDER_EXCHANGE, BuiltinExchangeType.FANOUT,true); channel.queueDeclare(ORDER_QUEUE+".jf",true,false,false,null); channel.queueBind(ORDER_QUEUE+".jf",ORDER_EXCHANGE,ORDER_ROUTING); channel.basicConsume(ORDER_QUEUE+".jf",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String contentType = properties.getContentType(); //System.out.println(contentType); Map<String,String> content = (Map<String,String>)new JSONReader().read(new String(body)); System.out.println("積分系統收到了訂單:"+content); getChannel().basicAck(envelope.getDeliveryTag(),false); } }); } catch (IOException e) { e.printStackTrace(); } } public void ck(){ Connection connection = ConnectionPool.getConnection(); try { final Channel channel = connection.createChannel(); channel.exchangeDeclare(ORDER_EXCHANGE, BuiltinExchangeType.FANOUT,true); channel.queueDeclare(ORDER_QUEUE+".ck",true,false,false,null); channel.queueBind(ORDER_QUEUE+".ck",ORDER_EXCHANGE,ORDER_ROUTING); channel.basicConsume(ORDER_QUEUE+".ck",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String contentType = properties.getContentType(); //System.out.println(contentType); Map<String,String> content = (Map<String,String>)new JSONReader().read(new String(body)); System.out.println("倉庫系統收到了訂單:"+content); getChannel().basicAck(envelope.getDeliveryTag(),false); } }); } catch (IOException e) { e.printStackTrace(); } } public void cw(){ Connection connection = ConnectionPool.getConnection(); try { final Channel channel = connection.createChannel(); channel.exchangeDeclare(ORDER_EXCHANGE, BuiltinExchangeType.FANOUT,true); channel.queueDeclare(ORDER_QUEUE+".cw",true,false,false,null); channel.queueBind(ORDER_QUEUE+".cw",ORDER_EXCHANGE,ORDER_ROUTING); channel.basicConsume(ORDER_QUEUE+".cw",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String contentType = properties.getContentType(); //System.out.println(contentType); Map<String,String> content = (Map<String,String>)new JSONReader().read(new String(body)); System.out.println("財務系統收到了訂單:"+content); getChannel().basicAck(envelope.getDeliveryTag(),false); } }); } catch (IOException e) { e.printStackTrace(); } }
測試:
public static void main(String args[]){ //小明已經下單,產生訂單資訊 Map<String,String> order = new HashMap<String, String>(); order.put("customerId","12008384"); order.put("goodsId","8030999"); order.put("goods","手機"); order.put("money","2000"); //接收訂單進行分發 cashier.acceptOrder(order); //三個系統級消費者並行處理各自的業務 cashier.jf(); cashier.ck(); cashier.cw(); }
結果:
積分系統收到了訂單:{id=f897e922-4605-4602-a50e-c0fe863018b7, time=2018-12-06 16:02:05, orderContent={money=2000, goodsId=8030999, customerId=12008384, goods=手機}} 倉庫系統收到了訂單:{id=f897e922-4605-4602-a50e-c0fe863018b7, time=2018-12-06 16:02:05, orderContent={money=2000, goodsId=8030999, customerId=12008384, goods=手機}} 財務系統收到了訂單:{id=f897e922-4605-4602-a50e-c0fe863018b7, time=2018-12-06 16:02:05, orderContent={money=2000, goodsId=8030999, customerId=12008384, goods=手機}}
例項三
公司有一個已經上線的業務系統,主要可以分成兩個模組M1,M2,但這個系統可能會出現問題,按CTO要求在出現問題的時候需要及時的處理,否則扣工資。你的上級負責整個系統,需要知道整個系統是否出現問題,你主要負責M1模組,只要此模組不出問題你就不會扣工資。另一位同事小明負責M2。
特點:這就是一個告警通知的例項,不同的就是你的上級需要接收M1和M2兩個模組的告警,兩位負責人只負責各自模組即可。由此分析可以使用topic型別的交換器。
產生告警後傳送出去:
private static final String ALARM_EXCHANGE = "exchange.alarm1"; private static final String ALARM_QUEUE = "queue.alarm1"; public void acceptAlarm(Map<String,String> orderMap,String routing){ Connection connection = ConnectionPool.getConnection(); try { Channel channel = connection.createChannel(); channel.exchangeDeclare(ALARM_EXCHANGE, BuiltinExchangeType.TOPIC,true); channel.queueDeclare(ALARM_QUEUE+".M1",true,false,false,null); channel.queueDeclare(ALARM_QUEUE+".M2",true,false,false,null); channel.queueBind(ALARM_QUEUE+".M1",ALARM_EXCHANGE,"M1.*"); channel.queueBind(ALARM_QUEUE+".M2",ALARM_EXCHANGE,"*.M2"); Map<String,Object> content = new HashMap<String, Object>(); Map<String,String> body = new HashMap<String, String>(); content.put("id", UUID.randomUUID().toString()); content.put("time", Calculator.getCurrTime()); content.put("content",body); body.putAll(orderMap); String str = new JSONWriter().write(content); channel.basicPublish(ALARM_EXCHANGE,routing,getBasicProperties(), str.getBytes("UTF-8")); ConnectionPool.closeChannel(channel); ConnectionPool.closeConnection(connection); } catch (IOException e) { e.printStackTrace(); } }
接收告警資訊
public void alarm_M1(){ Connection connection = ConnectionPool.getConnection(); try { Channel channel = connection.createChannel(); channel.exchangeDeclare(ALARM_EXCHANGE, BuiltinExchangeType.TOPIC,true); channel.queueDeclare(ALARM_QUEUE+".M1",true,false,false,null); channel.queueBind(ALARM_QUEUE+".M1",ALARM_EXCHANGE,"M1.*"); channel.basicConsume(ALARM_QUEUE+".M1",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String contentType = properties.getContentType(); //System.out.println(contentType); Map<String,String> content = (Map<String,String>)new JSONReader().read(new String(body)); System.out.println("收到了M1系統的告警併發送給我:"+content); System.out.println("收到了M1系統的告警併發送給領導:"+content); getChannel().basicAck(envelope.getDeliveryTag(),false); } }); } catch (IOException e) { e.printStackTrace(); } } public void alarm_M2(){ Connection connection = ConnectionPool.getConnection(); try { Channel channel = connection.createChannel(); channel.exchangeDeclare(ALARM_EXCHANGE, BuiltinExchangeType.TOPIC,true); channel.queueDeclare(ALARM_QUEUE+".M2",true,false,false,null); channel.queueBind(ALARM_QUEUE+".M2",ALARM_EXCHANGE,"*.M2"); channel.basicConsume(ALARM_QUEUE+".M2",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String contentType = properties.getContentType(); //System.out.println(contentType); Map<String,String> content = (Map<String,String>)new JSONReader().read(new String(body)); System.out.println("收到了M2系統的告警併發送給小明:"+content); System.out.println("收到了M2系統的告警併發送給領導:"+content); getChannel().basicAck(envelope.getDeliveryTag(),false); } }); } catch (IOException e) { e.printStackTrace(); } }
測試,產生告警資訊後分發
public static void main(String args[]){ //M1模組出錯了 Map<String,String> alarm = new HashMap<String, String>(); alarm.put("errorMsg","M1系統發現錯誤"); cashier.acceptAlarm(alarm,"M1.error"); //M1和M2都出錯了 Map<String,String> alarm2 = new HashMap<String, String>(); alarm2.put("errorMsg","M1和M2系統都發現錯誤"); cashier.acceptAlarm(alarm2,"M1.M2");//這樣,資訊會發到M1和M2佇列中 cashier.alarm_M1(); cashier.alarm_M2(); }
測試結果,總共傳送6條資訊,M1中有兩條資訊傳送給自己後也會發送給領導,M2中只有一條資訊:
收到了M1系統的告警併發送給我:{id=ed87c923-0302-4d1d-b729-4b061b3bb9e1, time=2018-12-06 16:52:41, content={errorMsg=M1系統發現錯誤}} 收到了M1系統的告警併發送給領導:{id=ed87c923-0302-4d1d-b729-4b061b3bb9e1, time=2018-12-06 16:52:41, content={errorMsg=M1系統發現錯誤}} 收到了M1系統的告警併發送給我:{id=e9d0ec24-3f76-492c-8e5d-b1d0e5a7cfc4, time=2018-12-06 16:52:41, content={errorMsg=M1和M2系統都發現錯誤}} 收到了M1系統的告警併發送給領導:{id=e9d0ec24-3f76-492c-8e5d-b1d0e5a7cfc4, time=2018-12-06 16:52:41, content={errorMsg=M1和M2系統都發現錯誤}} 收到了M2系統的告警併發送給小明:{id=e9d0ec24-3f76-492c-8e5d-b1d0e5a7cfc4, time=2018-12-06 16:52:41, content={errorMsg=M1和M2系統都發現錯誤}} 收到了M2系統的告警併發送給領導:{id=e9d0ec24-3f76-492c-8e5d-b1d0e5a7cfc4, time=2018-12-06 16:52:41, content={errorMsg=M1和M2系統都發現錯誤}}
實現RPC功能
RPC功能常見的有REST API和SOAP等,而這些都有自己的協議。而使用rabbitmq實現則不需要任何協議,rabbit並不關心生產者或消費者是誰,他只負責傳遞給定的內容即完成它的任務。rabbit會負責繫結來路的訊息到達合適的佇列。RPC伺服器會從這些佇列上消費訊息。但問題在於AMQP訊息是單向的,如何將應答返回給客戶端呢??
由rabbit處理RPC伺服器和RPC客戶端中間,它並不知道是誰傳送和消費的訊息。rabbit有一個解決辦法是使用訊息來發迴應答。在每個AMQP訊息頭裡有個欄位叫replyTo,訊息的生產者可以通過此欄位確定佇列名稱,並監聽佇列等待應答。而replyTo可以使用queue的預設名稱(rabbit會產生唯一的ID),因為此佇列只是臨時應答使用,所以用完就要銷燬且是RPC客戶端執行緒的私有佇列。
具體實現,RPC伺服器
private static final String RPC_EXCHANGE = "exchange.rpc2"; private static final String RPC_QUEUE = "queue.rpc2"; private static final String RPC_ROUTING = "queue.rpc2"; public void rpcServer(){ Connection connection = ConnectionPool.getConnection(); try { Channel channel = connection.createChannel(); channel.exchangeDeclare(RPC_EXCHANGE, BuiltinExchangeType.DIRECT,true); channel.queueDeclare(RPC_QUEUE,true,false,false,null); channel.queueBind(RPC_QUEUE,RPC_EXCHANGE,RPC_ROUTING); //訂閱RPC佇列,如果有訊息就消費 channel.basicConsume(RPC_QUEUE,false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //確認接收 getChannel().basicAck(envelope.getDeliveryTag(),false); //取內容並且包裝返回結果 String content = new String(body); Map<String,String> alarm = new HashMap<String, String>(); alarm.put("content",content); alarm.put("id", UUID.randomUUID().toString()); alarm.put("time", Calculator.getCurrTime()); String str = new JSONWriter().write(alarm); System.out.println(properties.getReplyTo()); //交換器為空,表示以佇列名稱為路由,且把返回的內容放到 客戶端傳過來的臨時佇列 getChannel().basicPublish("",properties.getReplyTo(),getBasicProperties(), str.getBytes("UTF-8")); } }); } catch (IOException e) { e.printStackTrace(); } }
客戶端
public void rpcClient(String str1){ Connection connection = ConnectionPool.getConnection(); try { Channel channel = connection.createChannel(); channel.exchangeDeclare(RPC_EXCHANGE, BuiltinExchangeType.DIRECT,true); channel.queueDeclare(RPC_QUEUE,true,false,false,null); //重新建立一個臨時的佇列,佇列名稱由rabbit分配,並且是私有 自動刪除 AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(); channel.queueBind(RPC_QUEUE,RPC_EXCHANGE,RPC_ROUTING); AMQP.BasicProperties bp = new AMQP.BasicProperties.Builder() .contentType(Consts.ContentType.JSON) .contentEncoding("UTF-8") //replyTo記錄臨時佇列的名稱,傳給伺服器 .replyTo(declareOk.getQueue()) .build(); channel.basicPublish(RPC_EXCHANGE,RPC_ROUTING,bp, str1.getBytes("UTF-8")); //訂閱臨時佇列,如果有內容就取出來 channel.basicConsume(declareOk.getQueue(),false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { getChannel().basicAck(envelope.getDeliveryTag(),false); String str = new String(body); System.out.println("返回的內容是:"+consumerTag+str); //如何取消訂閱??? getChannel().basicCancel(consumerTag); //完成訪問,關閉通道,同時也關閉連線(按需要,可以考慮不關閉AMQP連線Connection) ConnectionPool.closeChannel(getChannel()); ConnectionPool.closeConnection(getChannel().getConnection()); } }); //System.out.println("declareOk:"+declareOk.getQueue()); //GetResponse response = channel.basicGet(declareOk.getQueue(),false); // String str = new String(response.getBody()); //System.out.println("返回的內容是:"+str); } catch (IOException e) { e.printStackTrace(); } }
測試,先啟動伺服器,然後啟動客戶端
public static void main(String args[]){ //cashier.rpcServer(); cashier.rpcClient("入參"); }
測試結果:
返回的內容是:amq.ctag-OaoaqOQUAHGyXcqAaPT6lw{"id":"fc243322-30fe-4f1e-be5d-e7cd8406349d","time":"2018-12-06 18:06:13","content":"入參"}
總結
本章主要從實踐的角度結合前兩章學習的內容進行編碼。給出一般接到專案時一般的思路,在編碼時如何選擇交換器的型別,如何考慮專案的擴充套件性。時開發前必須要思考的問題。