rabbitmq 入門教程一(只有佇列,不涉及路由)
阿新 • • 發佈:2018-12-19
為何要學習rabbitmq
因為在spring cloud bus中使用到的是rabbitmq和kafka,所以就需要搞一下rabbitmq和kafka。先上圖
生產者(Producer) Producer(生產者),產生訊息並向RabbitMq傳送訊息。就是clientA和clientB 消費者(consumer) Consumer(消費者),等待RabbitMq訊息到來並處理訊息。就是client1,client2,client3 訊息佇列Queue Queue(佇列),依存於RabbitMQ內部,雖然訊息通過RabbitMQ在你的應用中傳遞,但是它們只能儲存在queue中。佇列不受任 何限制,可以儲存任何數量的訊息—本質上是一個無限制的快取。很多producers可以通過同一個佇列傳送訊息,相同的很 多consumers可以從同一個佇列上接收訊息。 這篇文章主要介紹的是最簡單的是:
pom檔案
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
</dependencies>
Producer程式碼:
package com.bobo.rabbitmq1;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.concurrent.TimeoutException;
/**
* 訊息佇列生產者,這個例子只是簡單的通過queue使用,
* 所以是exchange是“”
* @author [email protected]
* @create 2018-11-04 13:36
**/
public class Send {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello",false,false,false,null);
String message = "hello world";
for (int i =0 ;i <10;i++) {
channel.basicPublish("","hello",null,message.getBytes(Charset.forName("UTF-8")));
}
System.out.println(" [x] Sent '" + message + "'");
//關閉連線
channel.close();
connection.close();
}
}
channel.queueDeclare這個方法就是聲明瞭一個對列。
consumer程式碼
package com.bobo.rabbitmq1;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 訊息接收者
* @author [email protected]
* @create 2018-11-04 13:46
**/
public class Receiver {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//回撥消費訊息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume("hello", true, consumer);
}
}
在這裡,其實還是獲取到queue,這裡是使用了DefaultConsumer這個類,然後重寫了handleDelivery方法,這個方法中的body就是
訊息體,可以再這裡對訊息進行處理。