1. 程式人生 > >叢集與負載均衡系列(4)——訊息佇列之Rabbitmq的搭建

叢集與負載均衡系列(4)——訊息佇列之Rabbitmq的搭建

        前面的三篇文章介紹了共享session,從這篇文章開始介紹訊息佇列,這裡用的是Rabbitmq。對於Rabbitmq的一些基本概念,不打算在這裡總結了。因為網上有大把總結的不錯的文章,比如點選開啟連結

        這篇文章介紹Rabbitmq的安裝。

  •         下載安裝erlang

         由於rabbitmq是基於erlang的,因此先下載安裝erlang。我這裡下載的版本為erlang-18.1-1.el6.x86_64.rpm

         rpm -ihv erlang-18.1-1.el6.x86_64.rpm

  •          下載安裝Rabbitmq

         一定要下載和erlang對應版本的Rabbitmq,對於erlang-18.1-1.el6.x86_64的對應Rabbitmq為rabbitmq-server-3.5.6-1.noarch

        rpm -ihv rabbitmq-server-3.5.6-1.noarch.rpm

  •         啟動服務

         /sbin/service rabbitmq-server start

       

  •         新增使用者

         ./rabbitmqctl add_user admin nmamtf

       

  •         授權

         ./rabbitmqctl set_user_tags admin administrator

        

  •          開啟瀏覽器管理

         ./rabbitmq-plugins enable

        ./rabbitmq-plugins enable rabbitmq_management

       

  •         在瀏覽器中管理

         地址為: ip:15672/#/

       

  •         使用者管理

      許可權是安裝virtual hosts為粒度進行管理的

      到此為止,RabbitMq已經搭建完成

  •      測試程式碼

       接下來,我們寫點簡單的程式碼來測試一下

       生產端傳送資訊

       1、建立rabbitmq服務連線

       2、建立連線佇列的channel

       3、建立佇列

       4、傳送訊息

       5、關閉連線和channel

package com.wlf.demo;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;

import com.rabbitmq.client.Channel;

public class SendTest {
	
	private static String QUEUE_NAME="test";

	public static void main(String[] argv) throws Exception{
		//create connection
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("192.168.58.144");
		factory.setPort(5672);
		factory.setUsername("admin");
		factory.setPassword("nmamtf");
		factory.setVirtualHost("/");
		Connection connection = factory.newConnection();
		//create channel
		Channel channel = connection.createChannel();
		//create queue
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		String message = "Hello World!";
		channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
		System.out.println(" [x] Sent '" + message + "'");
		//finally
		channel.close();
		connection.close();
	}
	
}
               這個時候,我們能看到test佇列裡有一條待處理的訊息

          消費端處理訊息

         1、建立rabbitmq服務連線

         2、建立到佇列的channel

         3、處理訊息的回撥

         4、處理訊息

package com.wlf.demo;

import java.io.IOException;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;


public class RecvTest {
	
	private static String QUEUE_NAME="test";

	public static void main(String[] argv) throws Exception{
		
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("192.168.58.140");
		factory.setPort(5672);
		factory.setUsername("admin");
		factory.setPassword("nmamtf");
		factory.setVirtualHost("/");
	    Connection connection = factory.newConnection();
	    Channel channel = connection.createChannel();

	    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
	    System.out.println(" [*] Waiting for messages.");
	    
	    Consumer consumer = new DefaultConsumer(channel) {
    	  @Override
    	  public void handleDelivery(String consumerTag, Envelope envelope,
    	                             AMQP.BasicProperties properties, byte[] body)
    	      throws IOException {
    	    String message = new String(body, "UTF-8");
    	    System.out.println(" [x] Received '" + message + "'");
    	  }
    	};
    	channel.basicConsume(QUEUE_NAME, true, consumer);
    	
	}
	
}

         同樣,我們可以看到訊息被處理了