RabbitMQ的三種Exchange,資料持久化,非持久化例項
阿新 • • 發佈:2019-01-01
由於最近專案需要用到Rabbitmq的資料持久化技術,利用空閒時間,分別對Rabbitmq的三種常用的Exchange(direct、fanout、topic)寫了個測試例項,僅供初學者參考學習,還望各路大神勿吐槽。
開發之前需要引用包:最好是3.4.0以下版本,因為以上版本試過,出現超時,具體原因尚未研究,還望研究過的大神留言告知一下,謝謝。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.4.0</version> </dependency>
1.direct(釋出與訂閱)
1.1生產者(Direct)
public class Direct { private static final String EXCHANGE_NAME = "temp_direct"; private static final String[] TYPE = { "info", "warning", "error" }; public static void main(String[] argv) throws java.io.IOException { // 建立連線和頻道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.32.129"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 宣告轉發器的型別 channel.exchangeDeclare(EXCHANGE_NAME, "direct",true); //傳送6條訊息 for (int i = 0; i <TYPE.length; i++) { String rand = getRandom(); String message = rand + "_log :" + UUID.randomUUID().toString(); //持久化 channel.queueDeclare(TYPE[i], true, false, false, null); //流量 channel.basicQos(1); //將訊息佇列繫結到Exchange channel.queueBind(TYPE[i], EXCHANGE_NAME, TYPE[i]); // 釋出訊息至轉發器,指定routingkey channel.basicPublish(EXCHANGE_NAME, TYPE[i], null, message .getBytes()); System.out.println("佇列" + TYPE[i] + "繫結成功!"); } channel.close(); connection.close(); } /** * 隨機產生一種日誌型別 * * @return */ private static String getRandom() { Random random = new Random(); int ranVal = random.nextInt(3); return TYPE[ranVal]; } }
1.2消費者
public class ReceiveDirect { private static final String EXCHANGE_NAME = "temp_direct"; private final static String HOST = "192.168.32.129"; private static final String[] TYPE = { "info", "warning", "error" }; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { // 建立連線和頻道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); // 宣告direct型別轉發器 channel.exchangeDeclare(EXCHANGE_NAME, "direct",true); for (int i = 0; i < TYPE.length; i++) { //持久化 channel.queueDeclare(TYPE[i], true, false, false, null); //流量控制 channel.basicQos(1); //將訊息佇列繫結到Exchange channel.queueBind(TYPE[i], EXCHANGE_NAME, TYPE[i]); System.out.println("佇列" + TYPE[i] + "繫結成功!"); } for (int i = 0; i < TYPE.length; i++) { final String queue = TYPE[i]; new Thread(){ public void run() { try { receive(channel, queue); } catch (Exception e) { e.printStackTrace(); } } }.start(); } } private static void receive(Channel channel,String QUEUE_NAME) throws Exception { // 宣告消費者 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, false, consumer); while (true) { // 等待佇列推送訊息 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody(), "UTF-8"); System.out.println(QUEUE_NAME + " Received '" + message + "'"); // 反饋給伺服器表示收到資訊 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
2.fanout(廣播)
2.1生產者
public class Fanout {
private final static String HOST = "192.168.32.129";
private final static String EXCHANGE_NAME = "fanout";
private final static String QUEUE = "temp_fanout";
private final static String ROUTKEY = "mq.fanout";
private final static boolean DURABLE = true;
public static void main(String[] args) throws IOException, TimeoutException{
// 建立連線和頻道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 宣告轉發器和型別
channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );
//持久化
channel.queueDeclare(QUEUE, DURABLE, false, false, null);
channel.basicQos(1);
channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTKEY);
String message = new Date().getTime()+" : fanout something";
// 往轉發器上傳送訊息
channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
2.2消費者
public class ReceiveFanout {
private final static String HOST = "192.168.32.129";
private final static String EXCHANGE_NAME = "fanout";
private final static String QUEUE = "temp_fanout";
private final static String ROUTKEY = "mq.fanout";
private final static boolean DURABLE = true;
public static void main(String[] args) throws java.io.IOException,
java.lang.InterruptedException, TimeoutException{
// 建立連線和頻道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//持久化
channel.queueDeclare(QUEUE, DURABLE, false, false, null);
channel.basicQos(1);
// 建立一個非持久的、唯一的且自動刪除的佇列
//String queueName = channel.queueDeclare().getQueue();
// 為轉發器指定佇列,設定binding
channel.queueBind(QUEUE, EXCHANGE_NAME,ROUTKEY);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
// 指定接收者,第二個引數為自動應答,無需手動應答
channel.basicConsume(QUEUE, true, consumer);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}
3.topic(主題)
3.1生產者
public class Topic {
private final static String HOST = "192.168.32.129";
private static final String EXCHANGE_NAME = "topic_Exc";
private static final String QUEUE = "temp_wwww";
private static final String ROUTKEY="*_topic";
private static final boolean durable = true;
public static void main(String[] argv) throws Exception
{
// 建立連線和頻道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//宣告轉發器
channel.exchangeDeclare(EXCHANGE_NAME, "topic",durable);
//持久化
channel.queueDeclare(QUEUE, durable, false, false, null);
channel.basicQos(1);
//將訊息佇列繫結到Exchange
channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTKEY);
String msg = UUID.randomUUID().toString();
channel.basicPublish(EXCHANGE_NAME, ROUTKEY, MessageProperties.PERSISTENT_TEXT_PLAIN, msg
.getBytes());
System.out.println(msg);
channel.close();
connection.close();
}
}
3.2消費者
public class ReceiveTopicFortopic{
private final static String HOST = "192.168.32.129";
private static final String EXCHANGE_NAME = "topic_Exc";
private static final String QUEUE = "temp_topic";
private static final String ROUTKEY="*_topic";
private static final boolean durable = true;
public static void main(String[] argv) throws Exception
{
// 建立連線和頻道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 宣告轉發器
channel.exchangeDeclare(EXCHANGE_NAME, "topic",durable);
//持久化
channel.queueDeclare(QUEUE, durable, false, false, null);
channel.basicQos(1);
//將訊息佇列繫結到Exchange
channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTKEY);
System.out
.println(" [*] Waiting for critical messages. To exit press CTRL+C");
//宣告消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE, true, consumer);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received routingKey = " + routingKey
+ ",msg = " + message + ".");
}
}
}
上面就是這三種Exchange例項,完畢。