1. 程式人生 > >在專案中運用rabbitmq

在專案中運用rabbitmq

本章內容

  • 解耦思維
  • 例項解析
  • 實現RPC功能

       在專案開發中,我們談論最多的應該就是專案的可擴充套件性,解耦專案中的各模組就是解決擴充套件性的一種實現方式。為什麼要解耦?什麼樣的場景需要解耦呢?

解耦思維

       專案開發使用的技術就好像是一個專案的發動機一樣,主要是為了業務提供充足的動力。隨時間的推移,公司業務會不斷的發生變化。就需要技術進行支援。所以就要求在專案的開發過程中,預留一定的擴充套件性,而要擴充套件就需要按某種規則拆分專案為小的模組,而模組與模組之間的耦合性越低,相互影響就越小,擴充套件時系統穩定性就越好。同時,也利於日常的維護。至於拆分的粒度也要充分討論,如果拆分的太細,在之後需要開發新需求時就需要更多的組共同進行協作,溝通成本會很大。如果太粗又達不到解耦的目的。應用程式和儲存之間直接耦合通常就是導致程式被淘汰的主要原因。這也正是MQ能夠幫我們解決的問題所在。

        面向訊息的設計主要是考慮如何把一個比較耗時的程式移出,單獨處理而使主程式繼續它的工作。本質就是非同步思維考慮解耦請求和具體的操作(與生產者和訊息者思想相同),很像(AIO的做法)。這也比較符合現實世界生活,例如:我們去餐廳單餐時,前臺接待我們後把訂單交給後廚,之後又接收其它客戶,而我們在點餐完成後也沒有一直在那裡等待而是玩著手機等待叫號。

        面向訊息的程式還有一個特點就是 你關心的是完成任務,但並不是實時完成的,無須應答請求。而當我們完成任務的動作無法跟上請求的速度時,我們還可以利用自動輪詢的模式,把MQ充當負載均衡器來使用,使得程式有更好的擴充套件性。

發後即忘模型

匹配這種模型的幾種一般型別的任務:

  • 批處理,針對大型資料集合的工作。這種型別的工作一般可以構建為單一的任務請求,或者多個任務對資料集合的獨立部分進行操作。
  • 通知,對發生事情的描述。其內容可以是某些日誌,或者是一個報告需要傳送給另一個或多個程式。
  • 並行處理,對相互獨立的程式有統一的觸發點。例如:客戶下單後(觸發點),給客戶新增積分、通知倉庫準備發貨、通知財務開相應的開票等等 這些操作並沒有明顯的序列關係。

 

例項解析

例項一

       星期天,你在家躺在沙發上嗑著瓜子、看著電視(也有可能是你老婆,哈哈)。中午到了,你命令你媳婦做飯,糖醋排骨,你媳婦就去忙活了。5分鐘後你又想吃雞了,於是又命令你媳婦再做一個土豆燒雞。

       特點:你自己就可以看到是一個生產者(命令的發出者),而你媳婦就是一個消費者(命令的接收執行者)。你與你媳婦之間只有一個佇列通道,接收到命令後也是按順序做的飯,所以可以使用direct型別的交換器。

首先,就是你媳婦接收你的命令

public void entertainYou(String need){
    Connection connection = ConnectionPool.getConnection();
    try {
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(Consts.EXCHANGE_NAME,
                BuiltinExchangeType.DIRECT,true);
        channel.queueDeclare(Consts.QUEUE_NAME,true,false,false,null);
        channel.queueBind(Consts.QUEUE_NAME,Consts.EXCHANGE_NAME,"command.make_lunch");
        Map<String,String> content = new HashMap<String, String>();
        content.put("id", UUID.randomUUID().toString());
        content.put("need",need);
        content.put("time", Calculator.getCurrTime());
        String str = new JSONWriter().write(content);
        channel.basicPublish(Consts.EXCHANGE_NAME,"command.make_lunch",getBasicProperties(),
                str.getBytes("UTF-8"));

        ConnectionPool.closeChannel(channel);
        ConnectionPool.closeConnection(connection);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

private AMQP.BasicProperties getBasicProperties(){
   return basicProperties==null?
    basicProperties = new AMQP.BasicProperties.Builder()
           .contentType(Consts.ContentType.JSON)
           .contentEncoding("UTF-8")
           .build():basicProperties;
}

然後去廚房做飯:

public void makeLunch(){
    Connection connection = ConnectionPool.getConnection();
    try {
        final Channel channel = connection.createChannel();

        channel.exchangeDeclare(Consts.EXCHANGE_NAME,
                BuiltinExchangeType.DIRECT,true);
        channel.queueDeclare(Consts.QUEUE_NAME,true,false,false,null);
        channel.queueBind(Consts.QUEUE_NAME,Consts.EXCHANGE_NAME,"command.make_lunch");

        channel.basicConsume(Consts.QUEUE_NAME,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String contentType = properties.getContentType();
                //System.out.println(contentType);


                Map<String,String> content = (Map<String,String>)new JSONReader().read(new String(body));
                System.out.println("你媳婦接受了你的請求:"+content);
                getChannel().basicAck(envelope.getDeliveryTag(),false);
            }
        });

    } catch (IOException e) {
        e.printStackTrace();
    }
}

測試你的命令是否可用呢?

public static void main(String args[]){
    //我發出命令
    System.out.println("我命令媳婦去做飯!!");
    cashier.entertainYou("我要吃糖醋排骨");
    System.out.println("我繼續嗑著瓜子、看著電視");
    //媳婦接收命令後就去做飯了
    kitchen.makeLunch();
    //我發出命令
    System.out.println("我又想吃其它的了!!");
    cashier.entertainYou("我還要吃土豆燒雞");

}

測試的結果:     

我命令媳婦去做飯!!
我繼續嗑著瓜子、看著電視
我又想吃其它的了!!
你媳婦正在做:{need=我要吃糖醋排骨, id=652e8542-bd9c-4e78-9703-459b3d885b0c, time=2018-12-06 15:28:13}
你媳婦正在做:{need=我還要吃土豆燒雞, id=0a1dfb75-b20f-4b93-8da7-67178b1cd6aa, time=2018-12-06 15:28:13}

例項二

       小明在你的購物網站上花2000塊錢買一部手機,確認下單並且已經付款後,你的網站需要做如下操作:1.給小明的積分加20,2.把訂單發給倉庫準備出貨處理,3.通知財務開一張發票。

 

首先接收訂單:

private static final String ORDER_EXCHANGE = "exchange.order";
private static final String ORDER_QUEUE = "queue.order";
private static final String ORDER_ROUTING = "order";
public void acceptOrder(Map<String,String> orderMap){
    Connection connection = ConnectionPool.getConnection();
    try {
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(ORDER_EXCHANGE,
                BuiltinExchangeType.FANOUT,true);
        channel.queueDeclare(ORDER_QUEUE+".jf",true,false,false,null);
        channel.queueDeclare(ORDER_QUEUE+".ck",true,false,false,null);
        channel.queueDeclare(ORDER_QUEUE+".cw",true,false,false,null);
        channel.queueBind(ORDER_QUEUE+".jf",ORDER_EXCHANGE,ORDER_ROUTING);
        channel.queueBind(ORDER_QUEUE+".ck",ORDER_EXCHANGE,ORDER_ROUTING);
        channel.queueBind(ORDER_QUEUE+".cw",ORDER_EXCHANGE,ORDER_ROUTING);
        Map<String,Object> content = new HashMap<String, Object>();
        Map<String,String> body = new HashMap<String, String>();
        content.put("id", UUID.randomUUID().toString());
        content.put("time", Calculator.getCurrTime());
        content.put("orderContent",body);
        body.putAll(orderMap);
        String str = new JSONWriter().write(content);
        channel.basicPublish(ORDER_EXCHANGE,ORDER_ROUTING,getBasicProperties(),
                str.getBytes("UTF-8"));

        ConnectionPool.closeChannel(channel);
        ConnectionPool.closeConnection(connection);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

然後,三個消費者:

public void jf(){
    Connection connection = ConnectionPool.getConnection();
    try {
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(ORDER_EXCHANGE,
                BuiltinExchangeType.FANOUT,true);
        channel.queueDeclare(ORDER_QUEUE+".jf",true,false,false,null);
        channel.queueBind(ORDER_QUEUE+".jf",ORDER_EXCHANGE,ORDER_ROUTING);

        channel.basicConsume(ORDER_QUEUE+".jf",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String contentType = properties.getContentType();
                //System.out.println(contentType);
                Map<String,String> content = (Map<String,String>)new JSONReader().read(new String(body));
                System.out.println("積分系統收到了訂單:"+content);
                getChannel().basicAck(envelope.getDeliveryTag(),false);
            }
        });

    } catch (IOException e) {
        e.printStackTrace();
    }
}
public void ck(){
    Connection connection = ConnectionPool.getConnection();
    try {
        final Channel channel = connection.createChannel();

        channel.exchangeDeclare(ORDER_EXCHANGE,
                BuiltinExchangeType.FANOUT,true);
        channel.queueDeclare(ORDER_QUEUE+".ck",true,false,false,null);
        channel.queueBind(ORDER_QUEUE+".ck",ORDER_EXCHANGE,ORDER_ROUTING);

        channel.basicConsume(ORDER_QUEUE+".ck",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String contentType = properties.getContentType();
                //System.out.println(contentType);
                Map<String,String> content = (Map<String,String>)new JSONReader().read(new String(body));
                System.out.println("倉庫系統收到了訂單:"+content);
                getChannel().basicAck(envelope.getDeliveryTag(),false);
            }
        });

    } catch (IOException e) {
        e.printStackTrace();
    }
}

public void cw(){
    Connection connection = ConnectionPool.getConnection();
    try {
        final Channel channel = connection.createChannel();

        channel.exchangeDeclare(ORDER_EXCHANGE,
                BuiltinExchangeType.FANOUT,true);
        channel.queueDeclare(ORDER_QUEUE+".cw",true,false,false,null);
        channel.queueBind(ORDER_QUEUE+".cw",ORDER_EXCHANGE,ORDER_ROUTING);

        channel.basicConsume(ORDER_QUEUE+".cw",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String contentType = properties.getContentType();
                //System.out.println(contentType);
                Map<String,String> content = (Map<String,String>)new JSONReader().read(new String(body));
                System.out.println("財務系統收到了訂單:"+content);
                getChannel().basicAck(envelope.getDeliveryTag(),false);
            }
        });

    } catch (IOException e) {
        e.printStackTrace();
    }
}

測試:

public static void main(String args[]){
    //小明已經下單,產生訂單資訊
    Map<String,String> order = new HashMap<String, String>();
    order.put("customerId","12008384");
    order.put("goodsId","8030999");
    order.put("goods","手機");
    order.put("money","2000");
    //接收訂單進行分發
    cashier.acceptOrder(order);
    //三個系統級消費者並行處理各自的業務
    cashier.jf();
    cashier.ck();
    cashier.cw();

}

結果:

積分系統收到了訂單:{id=f897e922-4605-4602-a50e-c0fe863018b7, time=2018-12-06 16:02:05, orderContent={money=2000, goodsId=8030999, customerId=12008384, goods=手機}}
倉庫系統收到了訂單:{id=f897e922-4605-4602-a50e-c0fe863018b7, time=2018-12-06 16:02:05, orderContent={money=2000, goodsId=8030999, customerId=12008384, goods=手機}}
財務系統收到了訂單:{id=f897e922-4605-4602-a50e-c0fe863018b7, time=2018-12-06 16:02:05, orderContent={money=2000, goodsId=8030999, customerId=12008384, goods=手機}}

例項三

       公司有一個已經上線的業務系統,主要可以分成兩個模組M1,M2,但這個系統可能會出現問題,按CTO要求在出現問題的時候需要及時的處理,否則扣工資。你的上級負責整個系統,需要知道整個系統是否出現問題,你主要負責M1模組,只要此模組不出問題你就不會扣工資。另一位同事小明負責M2。

         特點:這就是一個告警通知的例項,不同的就是你的上級需要接收M1和M2兩個模組的告警,兩位負責人只負責各自模組即可。由此分析可以使用topic型別的交換器。

 

產生告警後傳送出去:

private static final String ALARM_EXCHANGE = "exchange.alarm1";
private static final String ALARM_QUEUE = "queue.alarm1";
public void acceptAlarm(Map<String,String> orderMap,String routing){
    Connection connection = ConnectionPool.getConnection();
    try {
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(ALARM_EXCHANGE,
                BuiltinExchangeType.TOPIC,true);
        channel.queueDeclare(ALARM_QUEUE+".M1",true,false,false,null);
        channel.queueDeclare(ALARM_QUEUE+".M2",true,false,false,null);
        channel.queueBind(ALARM_QUEUE+".M1",ALARM_EXCHANGE,"M1.*");
        channel.queueBind(ALARM_QUEUE+".M2",ALARM_EXCHANGE,"*.M2");
        Map<String,Object> content = new HashMap<String, Object>();
        Map<String,String> body = new HashMap<String, String>();
        content.put("id", UUID.randomUUID().toString());
        content.put("time", Calculator.getCurrTime());
        content.put("content",body);
        body.putAll(orderMap);
        String str = new JSONWriter().write(content);
        channel.basicPublish(ALARM_EXCHANGE,routing,getBasicProperties(),
                str.getBytes("UTF-8"));

        ConnectionPool.closeChannel(channel);
        ConnectionPool.closeConnection(connection);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

接收告警資訊

public void alarm_M1(){
    Connection connection = ConnectionPool.getConnection();
    try {
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(ALARM_EXCHANGE,
                BuiltinExchangeType.TOPIC,true);
        channel.queueDeclare(ALARM_QUEUE+".M1",true,false,false,null);
        channel.queueBind(ALARM_QUEUE+".M1",ALARM_EXCHANGE,"M1.*");
        channel.basicConsume(ALARM_QUEUE+".M1",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String contentType = properties.getContentType();
                //System.out.println(contentType);
                Map<String,String> content = (Map<String,String>)new JSONReader().read(new String(body));
                System.out.println("收到了M1系統的告警併發送給我:"+content);
                System.out.println("收到了M1系統的告警併發送給領導:"+content);
                getChannel().basicAck(envelope.getDeliveryTag(),false);
            }
        });

    } catch (IOException e) {
        e.printStackTrace();
    }
}

public void alarm_M2(){
    Connection connection = ConnectionPool.getConnection();
    try {
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(ALARM_EXCHANGE,
                BuiltinExchangeType.TOPIC,true);
        channel.queueDeclare(ALARM_QUEUE+".M2",true,false,false,null);
        channel.queueBind(ALARM_QUEUE+".M2",ALARM_EXCHANGE,"*.M2");
        channel.basicConsume(ALARM_QUEUE+".M2",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String contentType = properties.getContentType();
                //System.out.println(contentType);
                Map<String,String> content = (Map<String,String>)new JSONReader().read(new String(body));
                System.out.println("收到了M2系統的告警併發送給小明:"+content);
                System.out.println("收到了M2系統的告警併發送給領導:"+content);
                getChannel().basicAck(envelope.getDeliveryTag(),false);
            }
        });

    } catch (IOException e) {
        e.printStackTrace();
    }
}

 

測試,產生告警資訊後分發

public static void main(String args[]){
    //M1模組出錯了
    Map<String,String> alarm = new HashMap<String, String>();
    alarm.put("errorMsg","M1系統發現錯誤");
    cashier.acceptAlarm(alarm,"M1.error");
    //M1和M2都出錯了
    Map<String,String> alarm2 = new HashMap<String, String>();
    alarm2.put("errorMsg","M1和M2系統都發現錯誤");
    cashier.acceptAlarm(alarm2,"M1.M2");//這樣,資訊會發到M1和M2佇列中
    cashier.alarm_M1();
    cashier.alarm_M2();


}

測試結果,總共傳送6條資訊,M1中有兩條資訊傳送給自己後也會發送給領導,M2中只有一條資訊:

收到了M1系統的告警併發送給我:{id=ed87c923-0302-4d1d-b729-4b061b3bb9e1, time=2018-12-06 16:52:41, content={errorMsg=M1系統發現錯誤}}
收到了M1系統的告警併發送給領導:{id=ed87c923-0302-4d1d-b729-4b061b3bb9e1, time=2018-12-06 16:52:41, content={errorMsg=M1系統發現錯誤}}
收到了M1系統的告警併發送給我:{id=e9d0ec24-3f76-492c-8e5d-b1d0e5a7cfc4, time=2018-12-06 16:52:41, content={errorMsg=M1和M2系統都發現錯誤}}
收到了M1系統的告警併發送給領導:{id=e9d0ec24-3f76-492c-8e5d-b1d0e5a7cfc4, time=2018-12-06 16:52:41, content={errorMsg=M1和M2系統都發現錯誤}}
收到了M2系統的告警併發送給小明:{id=e9d0ec24-3f76-492c-8e5d-b1d0e5a7cfc4, time=2018-12-06 16:52:41, content={errorMsg=M1和M2系統都發現錯誤}}
收到了M2系統的告警併發送給領導:{id=e9d0ec24-3f76-492c-8e5d-b1d0e5a7cfc4, time=2018-12-06 16:52:41, content={errorMsg=M1和M2系統都發現錯誤}}

 

實現RPC功能

       RPC功能常見的有REST API和SOAP等,而這些都有自己的協議。而使用rabbitmq實現則不需要任何協議,rabbit並不關心生產者或消費者是誰,他只負責傳遞給定的內容即完成它的任務。rabbit會負責繫結來路的訊息到達合適的佇列。RPC伺服器會從這些佇列上消費訊息。但問題在於AMQP訊息是單向的,如何將應答返回給客戶端呢??

     由rabbit處理RPC伺服器和RPC客戶端中間,它並不知道是誰傳送和消費的訊息。rabbit有一個解決辦法是使用訊息來發迴應答。在每個AMQP訊息頭裡有個欄位叫replyTo,訊息的生產者可以通過此欄位確定佇列名稱,並監聽佇列等待應答。而replyTo可以使用queue的預設名稱(rabbit會產生唯一的ID),因為此佇列只是臨時應答使用,所以用完就要銷燬且是RPC客戶端執行緒的私有佇列。

 

具體實現,RPC伺服器

private static final String RPC_EXCHANGE = "exchange.rpc2";
private static final String RPC_QUEUE = "queue.rpc2";
private static final String RPC_ROUTING = "queue.rpc2";
public void rpcServer(){
    Connection connection = ConnectionPool.getConnection();
    try {
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(RPC_EXCHANGE,
                BuiltinExchangeType.DIRECT,true);
        channel.queueDeclare(RPC_QUEUE,true,false,false,null);
        channel.queueBind(RPC_QUEUE,RPC_EXCHANGE,RPC_ROUTING);
        //訂閱RPC佇列,如果有訊息就消費
        channel.basicConsume(RPC_QUEUE,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //確認接收
                getChannel().basicAck(envelope.getDeliveryTag(),false);
                //取內容並且包裝返回結果
                String content = new String(body);
                Map<String,String> alarm = new HashMap<String, String>();
                alarm.put("content",content);
                alarm.put("id", UUID.randomUUID().toString());
                alarm.put("time", Calculator.getCurrTime());
                String str = new JSONWriter().write(alarm);
                System.out.println(properties.getReplyTo());
                //交換器為空,表示以佇列名稱為路由,且把返回的內容放到 客戶端傳過來的臨時佇列
                getChannel().basicPublish("",properties.getReplyTo(),getBasicProperties(),
                        str.getBytes("UTF-8"));


            }
        });
    } catch (IOException e) {
        e.printStackTrace();
    }
}

客戶端

public void rpcClient(String str1){
    Connection connection = ConnectionPool.getConnection();
    try {
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(RPC_EXCHANGE,
                BuiltinExchangeType.DIRECT,true);
        channel.queueDeclare(RPC_QUEUE,true,false,false,null);
        //重新建立一個臨時的佇列,佇列名稱由rabbit分配,並且是私有 自動刪除
        AMQP.Queue.DeclareOk declareOk = channel.queueDeclare();
        channel.queueBind(RPC_QUEUE,RPC_EXCHANGE,RPC_ROUTING);
        AMQP.BasicProperties bp =  new AMQP.BasicProperties.Builder()
                .contentType(Consts.ContentType.JSON)
                .contentEncoding("UTF-8")
                //replyTo記錄臨時佇列的名稱,傳給伺服器
                .replyTo(declareOk.getQueue())
                .build();
        channel.basicPublish(RPC_EXCHANGE,RPC_ROUTING,bp,
                str1.getBytes("UTF-8"));
        //訂閱臨時佇列,如果有內容就取出來
        channel.basicConsume(declareOk.getQueue(),false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                getChannel().basicAck(envelope.getDeliveryTag(),false);
                String str = new String(body);
                System.out.println("返回的內容是:"+consumerTag+str);
                //如何取消訂閱???
                getChannel().basicCancel(consumerTag);
                //完成訪問,關閉通道,同時也關閉連線(按需要,可以考慮不關閉AMQP連線Connection)
                ConnectionPool.closeChannel(getChannel());
                ConnectionPool.closeConnection(getChannel().getConnection());
            }
        });
        //System.out.println("declareOk:"+declareOk.getQueue());
        //GetResponse response = channel.basicGet(declareOk.getQueue(),false);
       // String str = new String(response.getBody());
        //System.out.println("返回的內容是:"+str);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

測試,先啟動伺服器,然後啟動客戶端

public static void main(String args[]){
    //cashier.rpcServer();
    cashier.rpcClient("入參");

}

測試結果:

返回的內容是:amq.ctag-OaoaqOQUAHGyXcqAaPT6lw{"id":"fc243322-30fe-4f1e-be5d-e7cd8406349d","time":"2018-12-06 18:06:13","content":"入參"}

總結

        本章主要從實踐的角度結合前兩章學習的內容進行編碼。給出一般接到專案時一般的思路,在編碼時如何選擇交換器的型別,如何考慮專案的擴充套件性。時開發前必須要思考的問題。