訊息中介軟體Rabbitmq(二)-使用詳解
https://blog.csdn.net/Dante_003/article/details/79377908
Rabbitmq 是基於amqp(高階訊息佇列協議)實現的佇列技術,在他之上可以完成多種型別的訊息轉發模型。
下面列舉一些常用的訊息轉發場景,在rabbitmq中是怎樣實現的。
1.原理
先來看一下rabbitmq訊息轉發的原理,便於理解訊息轉發以及怎樣實現常見的訊息轉發模型。
1.1生產者
1.生產者建立連線,並建立通道連線rabbitmq
2.使用routing key繫結exchange和佇列,可以繫結多個routing key到不同的佇列。
3.生產者生產訊息傳送給exchange
4.exchange根據routing key匹配到對應佇列名字,把訊息轉發到指定的queue上,訊息會暫存在佇列中,等待消費者來消費。
1.2消費者
1.消費者建立連線,並建立通道連線rabbitmq
2.消費者消費指定佇列訊息
注意:exchange、queue都必須要提前建立或者使用系統預設的也可以。
1.3exchange
exchange分為4種,分別是
1.direct,直接轉發。exchange通過精確匹配routing key傳送訊息給佇列
2.fanout,廣播。會將訊息廣播到所有繫結到這個exchange的佇列,無視傳送訊息時指定的routingkey。
3.topic,釋出、訂閱。routing可以可以使用萬用字元(#、*)來根據主題傳送訊息到不同的佇列。
4.Headers,頭資訊。根據頭資訊的引數來決定傳送到哪個佇列。
1.4佇列
佇列是用來儲存訊息,在程式碼中可以建立臨時佇列,臨時佇列的屬性是durable (false)、exclusive(true)、autoDelete(true)的佇列,訊息處理完自動刪除。
佇列的幾種屬性
- durable
是否持久化,佇列會一直存在。 - exclusive
佇列是否對當前連線特有,其它連線不能使用,當前連線斷開後佇列會消失。
exclusive和durable是互斥的。 - autoDelete
佇列不再使用時會自動刪除。
2.入門例子,訊息轉發
最簡單的例子,生產者生產訊息到佇列,消費者從佇列取資料,這個例子也是訊息列隊最基本、最常用的模型。
下面是建立連線的程式碼,後面的例子不再重複寫這個。
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitmqUtil {
private static Connection connection;
// 獲取連線
public static Connection getConnection() {
if (connection != null) {
return connection;
}
String userName = "admin";
String password = "admin";
String host = "192.168.1.248";
int port = 5672;
boolean recoveryEnabled = true;
// connection factory
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setHost(host);
factory.setPort(port);
factory.setAutomaticRecoveryEnabled(recoveryEnabled);
try {
connection = factory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return connection;
}
// 獲取通道
public static Channel getChannel() {
Channel channel = null;
Connection connection2 = getConnection();
try {
channel = connection2.createChannel();
} catch (IOException e) {
e.printStackTrace();
}
return channel;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
生產者、消費者
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
//簡單例子
public class Test1 {
private String queueName = "testQueue";
private String routingKey = "routingKey";
private String exchange = "testExchange";
public void produce() {
Channel channel = null;
try {
// 建立通道
channel = RabbitmqUtil.getChannel();
// 建立佇列,持久、非專用、非自動刪除的佇列
channel.queueDeclare(queueName, true, false, false, null);
// 建立一個exchange,使用rabbitmq內建預設exchange也可以,預設的exchange是""一個direct型別
channel.exchangeDeclare(exchange, "direct");
// 使用routing key繫結exchange和queue
channel.queueBind(queueName, exchange, routingKey);
for (;;) {
channel.basicPublish(exchange, routingKey, null, "test".getBytes());
System.out.println("生產者:test");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
public void consume() {
Channel channel = null;
try {
channel = RabbitmqUtil.getConnection().createChannel();
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String message = new String(body, "UTF-8");
System.out.println(" 接收訊息:'" + message + "'");
}
};
channel.basicConsume(queueName, true, defaultConsumer);
} catch (IOException e) {
e.printStackTrace();
} finally {
}
}
public static void main(String[] args) {
final Test1 test1 = new Test1();
new Thread() {
@Override
public void run() {
test1.produce();
}
}.start();
new Thread() {
@Override
public void run() {
test1.consume();
}
}.start();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
3.點對點通訊
在rabbitmq中也可以實現點對點的通訊,沒有訊息佇列,所以不會儲存訊息,生產者和消費者之間直接通訊。
說是沒有佇列,其實是建立了一個臨時佇列,這個佇列不會持久化、自動刪除、通道專用。
如下圖。
在程式碼實現時,生產者只發送訊息給exchnage,不繫結佇列;消費者程式碼中建立臨時佇列,並繫結到exchange開始消費訊息。
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
//點對點通訊
public class Test2 {
private String routingKey = "routingKey";
private String exchange = "testExchange";
public void produce() {
Channel channel = null;
try {
// 建立通道
channel = RabbitmqUtil.getChannel();
// 建立一個exchange,使用rabbitmq內建預設exchange也可以,預設的exchange是""一個direct型別
channel.exchangeDeclare(exchange, "direct");
for (;;) {
channel.basicPublish(exchange, routingKey, null, "test".getBytes());
System.out.println("生產者:test");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
public void consume() {
Channel channel = null;
try {
channel = RabbitmqUtil.getConnection().createChannel();
// 在消費者程式碼中建立臨時佇列,並繫結到指定的exchange
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue, exchange, routingKey);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String message = new String(body, "UTF-8");
System.out.println(" 接收訊息:'" + message + "'");
}
};
channel.basicConsume(queue, true, defaultConsumer);
} catch (IOException e) {
e.printStackTrace();
} finally {
}
}
public static void main(String[] args) {
final Test2 test1 = new Test2();
new Thread() {
@Override
public void run() {
test1.produce();
}
}.start();
new Thread() {
@Override
public void run() {
test1.consume();
}
}.start();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
4.訊息複製
像下圖這樣,兩個佇列使用相同的routing key繫結到同一個exchange上,訊息會複製分發到兩個佇列中。程式碼不再實現,和例子1一樣,只不過多建立一個佇列。
利用這種模型可以實現多種場景的例子,例如日誌收集,一個佇列需要採集所有型別的日誌,一個佇列只採集錯誤日誌,這樣通過rabbitmq,就真正實現了一個訊息錄入,多種訊息模式的轉發。
5.廣播模式
會將訊息廣播到所有繫結到這個exchange的佇列,無視傳送訊息時指定的routingkey。
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
//廣播模式
public class Test3 {
private String queueName = "testQueue";
private String routingKey = "routingKey";
private String exchange = "testExchange1";
public void produce() {
Channel channel = null;
try {
// 建立通道
channel = RabbitmqUtil.getChannel();
// 建立佇列,持久、非專用、非自動刪除的佇列
channel.queueDeclare(queueName, true, false, false, null);
// 建立一個exchange,使用rabbitmq內建預設exchange也可以,預設的exchange是""一個direct型別
channel.exchangeDeclare(exchange, "fanout");
// 使用routing key繫結exchange和queue
channel.queueBind(queueName, exchange, routingKey);
for (;;) {
//傳送時的routingkey可以隨意指定,所有繫結到這個exchange上的佇列都會接收到訊息
channel.basicPublish(exchange, "無視routingkey", null, "test".getBytes());
System.out.println("生產者:test");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
public void consume() {
Channel channel = null;
try {
channel = RabbitmqUtil.getConnection().createChannel();
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String message = new String(body, "UTF-8");
System.out.println(" 接收訊息:'" + message + "'");
}
};
channel.basicConsume(queueName, true, defaultConsumer);
} catch (IOException e) {
e.printStackTrace();
} finally {
}
}
public static void main(String[] args) {
final Test3 test1 = new Test3();
new Thread() {
@Override
public void run() {
test1.produce();
}
}.start();
new Thread() {
@Override
public void run() {
test1.consume();
}
}.start();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
6.釋出、訂閱
和釋出訂閱訊息一樣,可以使用萬用字元關鍵字訂閱指定的訊息。
傳送訊息的routingkey和繫結exchange的routingkey可以是一組用”.”分開的詞。詞裡面可以使用萬用字元
“*”:表示任意一個關鍵詞
“#”:表示0個或者多個關鍵詞
注意:兩個詞以上的一定要用”.”號隔開,上面兩個萬用字元只是通配關鍵詞,並非單個字母,如”my*.my1”這樣是錯誤的。
下圖是引用官網
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
//釋出訂閱模式
public class Test4 {
private String queueName = "testQueue";
private String queueName1 = "testQueue1";
private String exchange = "testExchange2";
public void produce() {
Channel channel = null;
try {
// 建立通道
channel = RabbitmqUtil.getChannel();
// 建立兩個佇列接收訂閱訊息
channel.queueDeclare(queueName, true, false, false, null);
channel.queueDeclare(queueName1, true, false, false, null);
// 建立一個exchange
channel.exchangeDeclare(exchange, "topic");
// 給兩個佇列指定對應的訂閱內容
channel.queueBind(queueName, exchange, "#.2.#");
channel.queueBind(queueName1, exchange, "#.2.?");
for (;;) {
// 傳送兩個routingkey訊息內容
// 傳送routingkey “my test1”,匹配兩個佇列
channel.basicPublish(exchange, "2.11", null, "test1".getBytes());
System.out.println("生產者:test1");
// 傳送routingkey “my test2”,去掉後面的11,兩個佇列都能接收到
channel.basicPublish(exchange, "2.13.11", null, "test2".getBytes());
System.out.println("生產者:test2");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
public void consume() {
Channel channel = null;
try {
channel = RabbitmqUtil.getConnection().createChannel();
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String message = new String(body, "UTF-8");
System.out.println(" 接收訊息1:'" + message + "'");
}
};
channel.basicConsume(queueName, true, defaultConsumer);
} catch (IOException e) {
e.printStackTrace();
} finally {
}
}
public void consume2() {
Channel channel = null;
try {
channel = RabbitmqUtil.getConnection().createChannel();
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String message = new String(body, "UTF-8");
System.out.println(" 接收訊息2:'" + message + "'");
}
};
channel.basicConsume(queueName1, true, defaultConsumer);
} catch (IOException e) {
e.printStackTrace();
} finally {
}
}
public static void main(String[] args) {
final Test4 test1 = new Test4();
new Thread() {
@Override
public void run() {
test1.produce();
}
}.start();
new Thread() {
@Override
public void run() {
test1.consume();
}
}.start();
new Thread() {
@Override
public void run() {
test1.consume2();
}
}.start();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
rabbitmq利用exchange和queue搭配的靈活性已經exchange的型別,可以完成業務場景中各種各樣的需求。