1. 程式人生 > >rabbitMq及安裝、fanout交換機-分發(發布/訂閱)

rabbitMq及安裝、fanout交換機-分發(發布/訂閱)

nature tex ignore fan world oge 屬性 mini 取數據

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>

最常見的幾種消息通信模式主要有發布-訂閱、點對點這兩種
http://blog.csdn.net/woogeyu/article/details/51119101 集群
http://blog.csdn.net/column/details/rabbitmq.html Python版
http://www.cnblogs.com/LipeiNet/category/896408.html
http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Channel.html api
RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue )的開源實現
其他公開標準(如 COBAR的 IIOP ,或者是 SOAP 等)


從Erlang的官網 http://www.erlang.org/download.html 下載最新的erlang安裝包,Linux和MacOSX下載的版本是 http://www.erlang.org/download.html
1、安裝依賴
# yum install build-essential m4
# yum install openssl
# yum install openssl-devel
# yum install unixODBC
# yum install unixODBC-devel
# yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
# yum install perl

2、配置並安裝erlang
# tar -zxvf otp_src_20.1.tar
# cd otp_src_20.1
# ./configure --prefix=/usr/local/erlang --enable-hipe --enable-threads --enable-smp-support --enable-kernel-poll
./configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe --without-javac
./configure --prefix=/usr/local/erlang --enable-all (選這個)
make
make install

3、設置erlang環境變量
打開/etc/profile設置環境變量 查看PATH:echo $PATH 環境變量
ERL_HOME=/usr/local/erlang
PATH=$ERL_HOME/bin:$PATH
export ERL_HOME PATH

4、安裝mq
http://www.rabbitmq.com/install-rpm.html http://www.rabbitmq.com/releases/rabbitmq-server/ 下載rpm包 rabbitmq-server-3.6.14-1.el6.noarch
yum install -y socat(不需要)
ln -s /usr/local/erlang/bin/erl /usr/bin/erl 建立軟連接 (不建可能會報錯)
rpm -i --nodeps rabbitmq-server-3.6.5-1.noarch
執行結果 warning:rabbitmq-server-3.6.5-1.noarch.rpm: Header V4 RSA/SHA512 Signature, key ID 6026dfca: NOKEY

5、起停mq
/sbin/service rabbitmq-server stop/start/etc.
[root@iZ250x18mnzZ ~]#service rabbitmq-server start 啟動服務
[root@iZ250x18mnzZ ~]#service rabbitmq-server etc 查看哪些命令可以使用
[root@iZ250x18mnzZ ~]#service rabbitmq-server stop 停止服務
[root@iZ250x18mnzZ ~]#service rabbitmq-server status 查看服務狀態
啟動監控管理器:rabbitmq-plugins enable rabbitmq_management
關閉監控管理器:rabbitmq-plugins disable rabbitmq_management
查看所有的隊列:rabbitmqctl list_queues
清除所有的隊列:rabbitmqctl reset
關閉應用:rabbitmqctl stop_app 不同於停止服務
啟動應用:rabbitmqctl start_app

6、卸載等命令
#rpm -qa|grep rabbitmq
rabbitmq-server-3.6.5-1.noarch
卸載 mq
#rpm -e --nodeps rabbitmq-server-3.6.5-1.noarch
#rpm -qa|grep erlang
esl-erlang-18.3-1.x86_64
#rpm -e --nodeps esl-erlang-18.3-1.x86_64

7、訪問後臺
關閉防火墻 service iptables stop
安裝啟動後其實還不能在其它機器訪問, rabbitmq默認的 guest 賬號只能在本地機器訪問, 如果想在其它機器訪問必須配置其它賬號
配置管理員賬號:
    rabbitmqctl add_user admin adminpasspord
    rabbitmqctl set_user_tags admin administrator
啟動rabbitmq內置web插件, 管理rabbitmq賬號等信息
    rabbitmq-plugins enable rabbitmq_management
訪問 http://192.168.89.131:15672/#/users 為剛建的賬號 set permission
http://127.0.0.1:15672

(1)添加用戶
rabbitmqctl add_user admin admin
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
(2) 刪除一個用戶
rabbitmqctl delete_user Username
(3) 修改用戶的密碼
rabbitmqctl change_password Username Newpassword
(4) 查看當前用戶列表
rabbitmqctl list_users

4、設置開機啟動
chkconfig rabbitmq-server on

5、java客戶端amqp-client版本號為3.6.5 與 rabbitmq-server-3.6.5-1.noarch服務版本號必須匹配

8、命令
http://www.linuxidc.com/Linux/2016-10/136493.htm

1、信息分發
向指定隊列發送多條信息,多個消費者來獲取該隊列中信息
channel.basicQos(1);保證一次只分發一個,否則可能有些消費者獲取較多消息有的消費者獲取不到消息
可以使消費者采用手動答復,保證在一個消費者處理消息失敗後(此時不答復)其他消費者還能繼續獲取並處理該消息
2、交換機
rabbitMQ其實真正的思想是生產者不發送任何信息到隊列,甚至不知道信息將發送到哪個隊列。相反生產者只能發送信息到交換機,交換機接收到生產者的信息,然後按照規則把它推送到對列中
fanout,表示分發,所有的消費者得到同樣的隊列信息,發布/訂閱,channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
direct,發送信息到不同的路由,消費者根據不同路由獲取不同信息
topics,發送信息到不同的路由,消費者根據模糊匹配獲取某些路由的信息

1,單任務消息
生產者:根據一定的 QUEUE_NAME 生產單個消息
消費者:根據 QUEUE_NAME 獲取消息
2,多任務分發
生產fenfa() 消費fenfa1()fenfa2()
生產者:根據一定的 QUEUE_NAME 生產多個消息
消費者:根據 QUEUE_NAME 獲取消息(channel.basicQos(1)一次只分發一個,通過手動回復使多個消費者公平的獲取和處理消息)
3,交換機
生產exchange() 消費exchange1()exchange2()
生產者:根據一定的 EXCHANGE_NAME,生產多個消息(fanout表示分發,所有的消費者得到同樣的信息)
消費者:根據 EXCHANGE_NAME,每一個交換機都會獲取一遍消息

service

package com.xmh.mq;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.MessageProperties;

public class ServerTest {

    private static final String EXCHANGE_NAME = "logs";
    public final static String QUEUE_NAME="rabbitMQ.test1";
    
    public static void main(String[] args) throws Exception{
        
    }
    
    public static void exchange(){
        try{
            ConnectionFactory factory=new ConnectionFactory();
            factory.setHost("192.168.89.131");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
            Connection connection=factory.newConnection();
            Channel channel=connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//fanout表示分發,所有的消費者得到同樣的隊列信息
            //分發信息
            for (int i=0;i<5;i++){
                String message="Hello World"+i;
                channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
                System.out.println("EmitLog Sent ‘" + message + "‘");
            }
            channel.close();
            connection.close();
        }catch(Exception e){
            e.printStackTrace();
        }
    }
    
    /**
     * 多任務分發,兩個以上客戶端處理消息
     */
    public static void fenfa(){
        try{
            ConnectionFactory factory=new ConnectionFactory();
            factory.setHost("192.168.89.131");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
            Connection connection=factory.newConnection();
            Channel channel=connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,true,false,true,null);
            //分發信息
            for (int i=0;i<10;i++){
                String message="Hello RabbitMQ"+i;
                channel.basicPublish("",QUEUE_NAME,
                        MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
                System.out.println("NewTask send ‘"+message+"‘");
            }
            channel.close();
            connection.close();
        }catch(Exception e){
            e.printStackTrace();
        }
    }
    /**
     * 註1:queueDeclare第一個參數表示隊列名稱、第二個參數為是否持久化(true表示是,隊列將在服務器重啟時生存)、
     * 第三個參數為是否是獨占隊列(創建者可以使用的私有隊列,斷開後自動刪除)、第四個參數為當所有消費者客戶端連接斷開時是否自動刪除隊列、第五個參數為隊列的其他參數
              註2:basicPublish第一個參數為交換機名稱、第二個參數為隊列映射的路由key、第三個參數為消息的其他屬性、第四個參數為發送信息的主體
             聲明隊列後mq後臺可看到該隊列及存放的消息  
     */
    public static void quene(){
        try{
         //創建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置RabbitMQ相關信息
        factory.setHost("192.168.89.131");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);
//        factory.setVirtualHost("/");
        //創建一個新的連接
        Connection connection = factory.newConnection();
        //創建一個通道
        Channel channel = connection.createChannel();
        //  聲明一個隊列       
        channel.queueDeclare(QUEUE_NAME, false, false, true, null);
        String message = "Hello RabbitMQ-1";
        //發送消息到隊列中
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        System.out.println("Producer Sendss +‘" + message + "‘");
        //關閉通道和連接
        channel.close();
        connection.close();
        }catch(Exception e){
            e.printStackTrace();
        }
    }
    
}

註1:queueDeclare第一個參數表示隊列名稱、第二個參數為是否持久化(true表示是,隊列將在服務器重啟時生存)、第三個參數為是否是獨占隊列(創建者可以使用的私有隊列,斷開後自動刪除)、第四個參數為當所有消費者客戶端連接斷開時是否自動刪除隊列、第五個參數為隊列的其他參數
註2:basicPublish第一個參數為交換機名稱、第二個參數為隊列映射的路由key、第三個參數為消息的其他屬性、第四個參數為發送信息的主體

package com.xmh.mq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;

public class ConsumerTest {

     private final static String QUEUE_NAME = "rabbitMQ.test1";
     private static final String EXCHANGE_NAME = "logs";

        public static void main(String[] args) throws IOException, TimeoutException {
            test_exchange();
            test_exchange2();
        }
        
        /**
         * 必須先訂閱(先啟動客戶端,才能收到服務端消息) 發布/訂閱 
         * rabbitMQ其實真正的思想是生產者不發送任何信息到隊列,甚至不知道信息將發送到哪個隊列。相反生產者只能發送信息到交換機,交換機接收到生產者的信息,
         * 然後按照規則把它推送到對列中,交換機發布/訂閱  Fanout扇形交換機 其他有(Direct直連交換機、Topic主題交換機)
         */
        public static void exchange1(){
            try{
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("192.168.89.131");
                factory.setUsername("admin");
                factory.setPassword("admin");
                factory.setPort(5672);
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();

                channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

                //產生一個隨機的隊列名稱
                String queueName = channel.queueDeclare().getQueue();
                channel.queueBind(queueName, EXCHANGE_NAME, "");//對隊列進行綁定

                System.out.println("ReceiveLogs1 Waiting for messages");
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String message = new String(body, "UTF-8");
                        System.out.println("ReceiveLogs1 Received ‘" + message + "‘");
                    }
                };
                channel.basicConsume(queueName, true, consumer);//隊列會自動刪除
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        public static void exchange2(){
            try{
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("192.168.89.131");
                factory.setUsername("admin");
                factory.setPassword("admin");
                factory.setPort(5672);
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();

                channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

                //產生一個隨機的隊列名稱
                String queueName = channel.queueDeclare().getQueue();
                channel.queueBind(queueName, EXCHANGE_NAME, "");//對隊列進行綁定

                System.out.println("ReceiveLogs2 Waiting for messages");
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String message = new String(body, "UTF-8");
                        System.out.println("ReceiveLogs2 Received ‘" + message + "‘");
                    }
                };
                channel.basicConsume(queueName, true, consumer);//隊列會自動刪除
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        
        /**
         * 對於分發信息的處理
         * 註:channel.basicQos(1);保證一次只分發一個 。autoAck是否自動回復,如果為true的話,每次生產者只要發送信息就會從內存中刪除,那麽如果消費者程序異常退出,
         * 那麽就無法獲取數據,我們當然是不希望出現這樣的情況,所以才去手動回復,每當消費者收到並處理信息然後在通知生成者。最後從隊列中刪除這條信息。如果消費者異常退出,
         * 如果還有其他消費者,那麽就會把隊列中的消息發送給其他消費者,如果沒有,等消費者啟動時候再次發送。
         */
        public static void fenfa1(){
            try{
                final ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("192.168.89.131");
                factory.setUsername("admin");
                factory.setPassword("admin");
                factory.setPort(5672);
                Connection connection = factory.newConnection();
                final Channel channel = connection.createChannel();

                channel.queueDeclare(QUEUE_NAME, true, false, true, null);
                System.out.println("Worker1  Waiting for messages");

                //每次從隊列獲取的數量
                channel.basicQos(1);

                final Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope,
                                               AMQP.BasicProperties properties,
                                               byte[] body) throws IOException {
                        //envelope - packaging data for the message  body - the message body (opaque, client-specific byte array)
                        String message = new String(body, "UTF-8");
                        System.out.println("Worker1  Received ‘" + message + "‘");
                        try {
                            doWork(message);
                            channel.basicAck(envelope.getDeliveryTag(),false);//Acknowledge one or several received messages.  .getDeliveryTag() 消息傳遞標記
                            System.out.println("Worker1 Done");
                        }catch (Exception e){
                            channel.abort();//終止渠道
                        }finally {
                        }
                    }
                };
                boolean autoAck=false;//手動回復
                //消息消費完成確認
                channel.basicConsume(QUEUE_NAME, autoAck, consumer);
                
                } catch (Exception e){
                    e.printStackTrace();
                }
        }
        //對於分發信息的處理
        public static void fenfa2(){
            try{
                final ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("192.168.89.131");
                factory.setUsername("admin");
                factory.setPassword("admin");
                factory.setPort(5672);
                Connection connection = factory.newConnection();
                final Channel channel = connection.createChannel();

                channel.queueDeclare(QUEUE_NAME, true, false, true, null);
                System.out.println("Worker2  Waiting for messages");

                //每次從隊列獲取的數量
                channel.basicQos(1);

                final Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope,
                                               AMQP.BasicProperties properties,
                                               byte[] body) throws IOException {
                        //envelope - packaging data for the message  body - the message body (opaque, client-specific byte array)
                        String message = new String(body, "UTF-8");
                        System.out.println("Worker2  Received ‘" + message + "‘");
                        try {
                            doWork(message);
                            //Integer.parseInt("s");
                            channel.basicAck(envelope.getDeliveryTag(),false);//Acknowledge one or several received messages.  .getDeliveryTag() 消息傳遞標記
                            System.out.println("Worker2 Done");
                        }catch (Exception e){
                            channel.abort();//終止渠道,另外一個客戶端會繼續獲取消息
                            System.out.println("Worker2 客戶端處理異常");
                        }finally {
                        }
                    }
                };
                boolean autoAck=false;
                //消息消費完成確認
                channel.basicConsume(QUEUE_NAME, autoAck, consumer);
                } catch (Exception e){
                    e.printStackTrace();
                }
        }
        private static void doWork(String task) {
            try {
                Thread.sleep(1000); // 暫停1秒鐘
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
        /**
         * 手動關掉客戶端後,該隊列會自動刪除(聲明隊列時設置)
         */
        public static void test2(){
            try{
            // 創建連接工廠
            ConnectionFactory factory = new ConnectionFactory();
            //設置RabbitMQ地址
            factory.setHost("192.168.89.131");// 192.168.89.131 :15672
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
            //創建一個新的連接
            Connection connection = factory.newConnection();
            //創建一個通道
            Channel channel = connection.createChannel();
            //聲明要關註的隊列
            channel.queueDeclare(QUEUE_NAME, false, false, true, null);
            System.out.println("Customer Waiting Received messages");
            //DefaultConsumer類實現了Consumer接口,通過傳入一個頻道,
            // 告訴服務器我們需要那個頻道的消息,如果頻道中有消息,就會執行回調函數handleDelivery,持續監聽
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Customer Received ‘" + message + "‘");
                }
            };
            //自動回復隊列應答 -- RabbitMQ中的消息確認機制
            channel.basicConsume(QUEUE_NAME, true, consumer);
            } catch (Exception e){
                e.printStackTrace();
            }
        }
}

rabbitMq及安裝、fanout交換機-分發(發布/訂閱)