RabbitMQ 訊息生產與消費(五)
阿新 • • 發佈:2018-12-13
- ConnectionFactory: 獲取連線工廠
- Connection 一個連線
- Channel 資料通訊通道,可傳送和接受訊息
- Queue 具體訊息儲存佇列
- Producer & Consumer 生產者和消費者
具體實現程式碼
pom.xml
注意:這裡是5.30版本
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.3.0</version> </dependency>
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>wang.chunsen</groupId> <artifactId>rabbitmq-api</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>rabbitmq-api</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.3.0</version> </dependency> <!--<version>3.6.5</version>--> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
生產者:
package wang.chunsen.quickstart.version2; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @Author: itastro * @Description: * @Date: Created in 13:45 2018/12/12 * @Package: wang.chunsen.quickstart.version2 * @Modified By: */ public class Producer { private static final String EXCHANGE_NAME = "test"; public static void main(String[] args) throws Exception { //建立連線工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.0.114"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //建立一個連線 Connection connection = connectionFactory.newConnection(); //建立一個channel Channel channel = connection.createChannel(); //通過channel 傳送資料 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = "i love you rabbitmq"; for (int i = 0; i <10 ; i++) { channel.basicPublish(EXCHANGE_NAME, "test001", null, message.getBytes("UTF-8")); } } }
消費者
package wang.chunsen.quickstart.version2;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
/**
* @Author: itastro
* @Description:
* @Date: Created in 13:54 2018/12/12
* @Package: wang.chunsen.quickstart.version2
* @Modified By:
*/
public class Cosumer {
private static final String EXCHANGE_NAME = "test";
public static void main(String[] args) throws Exception {
//建立連線工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.114");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//建立一個連線
Connection connection = connectionFactory.newConnection();
//建立一個channel
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//4 宣告(建立)一個佇列
String queueName = "test002";
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "test001");
System.out.println(" [*] Waiting for messages");
// DeliverCallback deliverCallback = (consumerTag, delivery) -> {
//
// String message = new String(delivery.getBody(), "UTF-8");
// System.out.println(" [x] Received '" + message + "'");
// };
//
//
// channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
while (true) {
//消費訊息
boolean autoAck = false;
String consumerTag = "";
channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消費的路由鍵:" + routingKey);
System.out.println("消費的內容型別:" + contentType);
long deliveryTag = envelope.getDeliveryTag();
//確認訊息
channel.basicAck(deliveryTag, false);
System.out.println("消費的訊息體內容:");
String bodyStr = new String(body, "UTF-8");
System.out.println(bodyStr);
}
});
}
}
}
以下程式碼使用 3.6.5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.3.0</version>
</dependency>
生產者:
package wang.chunsen.quickstart.version1;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Author: itastro
* @Description: 生產者
* @Date: Created in 10:47 2018/12/12
* @Package: wang.chunsen.quickstart
* @Modified By:
*/
public class producer {
public static void main(String[] args) throws Exception {
//1 建立一個ConnectFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.114");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 通過連線工廠建立連線
Connection connection = connectionFactory.newConnection();
//3 建立一個Channel
Channel channel = connection.createChannel();
//4通過Channel 傳送資料
// void basicPublish(String var1, String var2, BasicProperties var3, byte[] var4) throws IOException;
for (int i = 0; i < 5; i++) {
String message = "Hello RabbitMQ";
// 1 exchange 2 routingkey
//當不指定exchange 時 會使用預設的
channel.basicPublish("", "test001", null, message.getBytes());
System.out.println("1");
}
//5 記得要關閉相關的連線
channel.close();
connection.close();
}
}
消費者:
package wang.chunsen.quickstart.version1;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Delivery;
/**
* @Author: itastro
* @Description: 消費端
* @Date: Created in 10:48 2018/12/12
* @Package: wang.chunsen.quickstart
* @Modified By:
*/
public class Consumer {
public static void main(String[] args) throws Exception {
//1 建立一個ConnectFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.114");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 通過連線工廠建立連線
Connection connection = connectionFactory.newConnection();
//3 建立一個Channel
Channel channel = connection.createChannel();
//4 宣告(建立)一個佇列
String queueName = "test001";
channel.queueDeclare(queueName, true, false, false, null);
//5 建立一個消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
//6 設定Chanel
channel.basicConsume(queueName, true, consumer);
while (true) {
//7 獲取訊息
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消費端: " + msg);
}
}
}