1. 程式人生 > >rabbitmq 入門教程一(只有佇列,不涉及路由)

rabbitmq 入門教程一(只有佇列,不涉及路由)

為何要學習rabbitmq

因為在spring cloud bus中使用到的是rabbitmq和kafka,所以就需要搞一下rabbitmq和kafka。先上圖

在這裡插入圖片描述

 生產者(Producer)
    Producer(生產者),產生訊息並向RabbitMq傳送訊息。就是clientA和clientB
消費者(consumer)
    Consumer(消費者),等待RabbitMq訊息到來並處理訊息。就是client1,client2,client3
訊息佇列Queue
    Queue(佇列),依存於RabbitMQ內部,雖然訊息通過RabbitMQ在你的應用中傳遞,但是它們只能儲存在queue中。佇列不受任
    何限制,可以儲存任何數量的訊息—本質上是一個無限制的快取。很多producers可以通過同一個佇列傳送訊息,相同的很
    多consumers可以從同一個佇列上接收訊息。
 這篇文章主要介紹的是最簡單的是:

在這裡插入圖片描述

pom檔案

<dependencies>
       <dependency>
           <groupId>com.rabbitmq</groupId>
           <artifactId>amqp-client</artifactId>
           <version>4.1.0</version>
       </dependency>
   </dependencies>

Producer程式碼:

package com.bobo.rabbitmq1;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.Charset; import java.util.concurrent.TimeoutException; /** * 訊息佇列生產者,這個例子只是簡單的通過queue使用, * 所以是exchange是“” * @author [email protected]
* @create 2018-11-04 13:36 **/
public class Send { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello",false,false,false,null); String message = "hello world"; for (int i =0 ;i <10;i++) { channel.basicPublish("","hello",null,message.getBytes(Charset.forName("UTF-8"))); } System.out.println(" [x] Sent '" + message + "'"); //關閉連線 channel.close(); connection.close(); } }
channel.queueDeclare這個方法就是聲明瞭一個對列。

consumer程式碼

package com.bobo.rabbitmq1;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 訊息接收者
* @author [email protected]
* @create 2018-11-04 13:46
**/
public class Receiver {

   public static void main(String[] args) throws IOException, TimeoutException {
       ConnectionFactory connectionFactory = new ConnectionFactory();
       connectionFactory.setHost("localhost");
       Connection connection = connectionFactory.newConnection();
       Channel channel = connection.createChannel();
       channel.queueDeclare("hello", false, false, false, null);
       System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
       //回撥消費訊息
       Consumer consumer = new DefaultConsumer(channel) {
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                   throws IOException {
               String message = new String(body, "UTF-8");
               System.out.println(" [x] Received '" + message + "'");
           }
       };
       channel.basicConsume("hello", true, consumer);
   }
}

在這裡,其實還是獲取到queue,這裡是使用了DefaultConsumer這個類,然後重寫了handleDelivery方法,這個方法中的body就是
訊息體,可以再這裡對訊息進行處理。