1. 程式人生 > >RabbitMQ實戰之Hello World(三)

RabbitMQ實戰之Hello World(三)

 Producer

package base;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {

    private final static String QUEUE_NAME = "hello";
    
    public static void main(String[] args) {
        foo();
    }

    
private static void foo() { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); //宣告一個我們將要傳送資料進去的queue,然後向其傳送資料。queue的宣告是冪等的-只有不存在的話才會實際去建立
//資料內容是byte陣列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 5; i++) { String message = "hello world"+i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("Sent:"+message); } channel.close(); connection.close(); }
catch (Exception e) { e.printStackTrace(); } } }

 

Consumer

package base;

import java.io.UnsupportedEncodingException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Recv {

    private final static String QUEUE_NAME = "hello";
    
    public static void main(String[] args) {
        foo();
    }
    
    private static void foo() {
        //向本地localhost建立一個和物理連線
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try {
            //tcp物理連線的一個抽象,關注協議版本和認證
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            //這裡同樣宣告queue,因為Producer可能先於Consumer執行,所以需要保證queue存在
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //定義一個消費者實現類來處理訊息的上報
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {
                    String message = new String(body,"UTF-8");
                    System.out.println("Received:"+message);
                }
            };
            channel.basicConsume(QUEUE_NAME, true, consumer);
            
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }
}

 

執行結果

Producer

Sent:hello world0
Sent:hello world1
Sent:hello world2
Sent:hello world3
Sent:hello world4

Consumer

Received:hello world0
Received:hello world1
Received:hello world2
Received:hello world3
Received:hello world4

工程結構如下圖:

主要引入了下面三個jar包,log4j和slf4j是為了解決編譯問題