1. 程式人生 > >RabbitMQ 訊息生產與消費(五)

RabbitMQ 訊息生產與消費(五)

  1. ConnectionFactory: 獲取連線工廠
  2. Connection 一個連線
  3. Channel  資料通訊通道,可傳送和接受訊息
  4. Queue  具體訊息儲存佇列
  5. Producer & Consumer  生產者和消費者

 

具體實現程式碼

pom.xml

注意:這裡是5.30版本  

 <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.3.0</version>
    </dependency>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.0.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>
  <groupId>wang.chunsen</groupId>
  <artifactId>rabbitmq-api</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>
  <name>rabbitmq-api</name>
  <description>Demo project for Spring Boot</description>

  <properties>
    <java.version>1.8</java.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.3.0</version>
    </dependency>
    <!--<version>3.6.5</version>-->
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>


</project>

生產者:

package wang.chunsen.quickstart.version2;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Author: itastro
 * @Description:
 * @Date: Created in 13:45 2018/12/12
 * @Package: wang.chunsen.quickstart.version2
 * @Modified By:
 */
public class Producer {

  private static final String EXCHANGE_NAME = "test";

  public static void main(String[] args) throws Exception {

    //建立連線工廠
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.114");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    //建立一個連線
    Connection connection = connectionFactory.newConnection();

    //建立一個channel
    Channel channel = connection.createChannel();

    //通過channel 傳送資料
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

    String message = "i love you rabbitmq";
    for (int i = 0; i <10 ; i++) {
      channel.basicPublish(EXCHANGE_NAME, "test001", null, message.getBytes("UTF-8"));
    }

  }

}

消費者

package wang.chunsen.quickstart.version2;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Envelope;
import java.io.IOException;

/**
 * @Author: itastro
 * @Description:
 * @Date: Created in 13:54 2018/12/12
 * @Package: wang.chunsen.quickstart.version2
 * @Modified By:
 */
public class Cosumer {

  private static final String EXCHANGE_NAME = "test";

  public static void main(String[] args) throws Exception {

    //建立連線工廠
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.114");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    //建立一個連線
    Connection connection = connectionFactory.newConnection();

    //建立一個channel
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

    //4 宣告(建立)一個佇列
    String queueName = "test002";
    channel.queueDeclare(queueName, true, false, false, null);

    channel.queueBind(queueName, EXCHANGE_NAME, "test001");

    System.out.println(" [*] Waiting for messages");

//    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
//
//      String message = new String(delivery.getBody(), "UTF-8");
//      System.out.println(" [x] Received '" + message + "'");
//    };
//
//
//    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

    while (true) {
      //消費訊息
      boolean autoAck = false;
      String consumerTag = "";
      channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag,
            Envelope envelope,
            AMQP.BasicProperties properties,
            byte[] body) throws IOException {
          String routingKey = envelope.getRoutingKey();
          String contentType = properties.getContentType();
          System.out.println("消費的路由鍵:" + routingKey);
          System.out.println("消費的內容型別:" + contentType);
          long deliveryTag = envelope.getDeliveryTag();
          //確認訊息
          channel.basicAck(deliveryTag, false);
          System.out.println("消費的訊息體內容:");
          String bodyStr = new String(body, "UTF-8");
          System.out.println(bodyStr);

        }
      });
    }

  }
}

以下程式碼使用 3.6.5

<dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.3.0</version>
    </dependency>

生產者:

package wang.chunsen.quickstart.version1;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Author: itastro
 * @Description: 生產者
 * @Date: Created in 10:47 2018/12/12
 * @Package: wang.chunsen.quickstart
 * @Modified By:
 */
public class producer {

  public static void main(String[] args) throws Exception {

    //1 建立一個ConnectFactory

    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.114");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");

    //2 通過連線工廠建立連線
    Connection connection = connectionFactory.newConnection();

    //3 建立一個Channel
    Channel channel = connection.createChannel();

    //4通過Channel 傳送資料
    // void basicPublish(String var1, String var2, BasicProperties var3, byte[] var4) throws IOException;

    for (int i = 0; i < 5; i++) {
      String message = "Hello RabbitMQ";

      // 1 exchange 2 routingkey

      //當不指定exchange 時  會使用預設的
      channel.basicPublish("", "test001", null, message.getBytes());
      System.out.println("1");

    }
    //5 記得要關閉相關的連線
    channel.close();
    connection.close();
  }

}

消費者:

 

package wang.chunsen.quickstart.version1;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Delivery;


/**
 * @Author: itastro
 * @Description: 消費端
 * @Date: Created in 10:48 2018/12/12
 * @Package: wang.chunsen.quickstart
 * @Modified By:
 */
public class Consumer {

  public static void main(String[] args) throws Exception {
    //1 建立一個ConnectFactory

    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.114");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");

    //2 通過連線工廠建立連線
    Connection connection = connectionFactory.newConnection();

    //3 建立一個Channel
    Channel channel = connection.createChannel();

    //4 宣告(建立)一個佇列
    String queueName = "test001";
    channel.queueDeclare(queueName, true, false, false, null);

    //5 建立一個消費者
    QueueingConsumer consumer = new QueueingConsumer(channel);

    //6 設定Chanel

    channel.basicConsume(queueName, true, consumer);

    while (true) {
      //7 獲取訊息
      Delivery delivery = consumer.nextDelivery();
      String msg = new String(delivery.getBody());
      System.err.println("消費端: " + msg);

    }

  }

  }