rabbitMq及安裝、fanout交換機-分發(發布/訂閱)
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
最常見的幾種消息通信模式主要有發布-訂閱、點對點這兩種
http://blog.csdn.net/woogeyu/article/details/51119101 集群
http://blog.csdn.net/column/details/rabbitmq.html Python版
http://www.cnblogs.com/LipeiNet/category/896408.html
http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Channel.html api
RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue )的開源實現
其他公開標準(如 COBAR的 IIOP ,或者是 SOAP 等)
從Erlang的官網 http://www.erlang.org/download.html 下載最新的erlang安裝包,Linux和MacOSX下載的版本是 http://www.erlang.org/download.html
1、安裝依賴
# yum install build-essential m4
# yum install openssl
# yum install openssl-devel
# yum install unixODBC
# yum install unixODBC-devel
# yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
# yum install perl
2、配置並安裝erlang
# tar -zxvf otp_src_20.1.tar
# cd otp_src_20.1
# ./configure --prefix=/usr/local/erlang --enable-hipe --enable-threads --enable-smp-support --enable-kernel-poll
./configure
--prefix=/usr/local/erlang --with-ssl --enable-threads
--enable-smp-support --enable-kernel-poll --enable-hipe --without-javac
./configure --prefix=/usr/local/erlang --enable-all (選這個)
make
make install
3、設置erlang環境變量
打開/etc/profile設置環境變量 查看PATH:echo $PATH 環境變量
ERL_HOME=/usr/local/erlang
PATH=$ERL_HOME/bin:$PATH
export ERL_HOME PATH
4、安裝mq
http://www.rabbitmq.com/install-rpm.html
http://www.rabbitmq.com/releases/rabbitmq-server/ 下載rpm包
rabbitmq-server-3.6.14-1.el6.noarch
yum install -y socat(不需要)
ln -s /usr/local/erlang/bin/erl /usr/bin/erl 建立軟連接 (不建可能會報錯)
rpm -i --nodeps rabbitmq-server-3.6.5-1.noarch
執行結果 warning:rabbitmq-server-3.6.5-1.noarch.rpm: Header V4 RSA/SHA512 Signature, key ID 6026dfca: NOKEY
5、起停mq
/sbin/service rabbitmq-server stop/start/etc.
[root@iZ250x18mnzZ ~]#service rabbitmq-server start 啟動服務
[root@iZ250x18mnzZ ~]#service rabbitmq-server etc 查看哪些命令可以使用
[root@iZ250x18mnzZ ~]#service rabbitmq-server stop 停止服務
[root@iZ250x18mnzZ ~]#service rabbitmq-server status 查看服務狀態
啟動監控管理器:rabbitmq-plugins enable rabbitmq_management
關閉監控管理器:rabbitmq-plugins disable rabbitmq_management
查看所有的隊列:rabbitmqctl list_queues
清除所有的隊列:rabbitmqctl reset
關閉應用:rabbitmqctl stop_app 不同於停止服務
啟動應用:rabbitmqctl start_app
6、卸載等命令
#rpm -qa|grep rabbitmq
rabbitmq-server-3.6.5-1.noarch
卸載 mq
#rpm -e --nodeps rabbitmq-server-3.6.5-1.noarch
#rpm -qa|grep erlang
esl-erlang-18.3-1.x86_64
#rpm -e --nodeps esl-erlang-18.3-1.x86_64
7、訪問後臺
關閉防火墻 service iptables stop
安裝啟動後其實還不能在其它機器訪問, rabbitmq默認的 guest 賬號只能在本地機器訪問, 如果想在其它機器訪問必須配置其它賬號
配置管理員賬號:
rabbitmqctl add_user admin adminpasspord
rabbitmqctl set_user_tags admin administrator
啟動rabbitmq內置web插件, 管理rabbitmq賬號等信息
rabbitmq-plugins enable rabbitmq_management
訪問 http://192.168.89.131:15672/#/users 為剛建的賬號 set permission
http://127.0.0.1:15672
(1)添加用戶
rabbitmqctl add_user admin admin
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
(2) 刪除一個用戶
rabbitmqctl delete_user Username
(3) 修改用戶的密碼
rabbitmqctl change_password Username Newpassword
(4) 查看當前用戶列表
rabbitmqctl list_users
4、設置開機啟動
chkconfig rabbitmq-server on
5、java客戶端amqp-client版本號為3.6.5 與 rabbitmq-server-3.6.5-1.noarch服務版本號必須匹配
8、命令
http://www.linuxidc.com/Linux/2016-10/136493.htm
1、信息分發
向指定隊列發送多條信息,多個消費者來獲取該隊列中信息
channel.basicQos(1);保證一次只分發一個,否則可能有些消費者獲取較多消息有的消費者獲取不到消息
可以使消費者采用手動答復,保證在一個消費者處理消息失敗後(此時不答復)其他消費者還能繼續獲取並處理該消息
2、交換機
rabbitMQ其實真正的思想是生產者不發送任何信息到隊列,甚至不知道信息將發送到哪個隊列。相反生產者只能發送信息到交換機,交換機接收到生產者的信息,然後按照規則把它推送到對列中
fanout,表示分發,所有的消費者得到同樣的隊列信息,發布/訂閱,channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
direct,發送信息到不同的路由,消費者根據不同路由獲取不同信息
topics,發送信息到不同的路由,消費者根據模糊匹配獲取某些路由的信息
1,單任務消息
生產者:根據一定的 QUEUE_NAME 生產單個消息
消費者:根據 QUEUE_NAME 獲取消息
2,多任務分發
生產fenfa() 消費fenfa1()fenfa2()
生產者:根據一定的 QUEUE_NAME 生產多個消息
消費者:根據 QUEUE_NAME 獲取消息(channel.basicQos(1)一次只分發一個,通過手動回復使多個消費者公平的獲取和處理消息)
3,交換機
生產exchange() 消費exchange1()exchange2()
生產者:根據一定的 EXCHANGE_NAME,生產多個消息(fanout表示分發,所有的消費者得到同樣的信息)
消費者:根據 EXCHANGE_NAME,每一個交換機都會獲取一遍消息
service
package com.xmh.mq;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.MessageProperties;
public class ServerTest {
private static final String EXCHANGE_NAME = "logs";
public final static String QUEUE_NAME="rabbitMQ.test1";
public static void main(String[] args) throws Exception{
}
public static void exchange(){
try{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.89.131");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//fanout表示分發,所有的消費者得到同樣的隊列信息
//分發信息
for (int i=0;i<5;i++){
String message="Hello World"+i;
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
System.out.println("EmitLog Sent ‘" + message + "‘");
}
channel.close();
connection.close();
}catch(Exception e){
e.printStackTrace();
}
}
/**
* 多任務分發,兩個以上客戶端處理消息
*/
public static void fenfa(){
try{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.89.131");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,true,null);
//分發信息
for (int i=0;i<10;i++){
String message="Hello RabbitMQ"+i;
channel.basicPublish("",QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
System.out.println("NewTask send ‘"+message+"‘");
}
channel.close();
connection.close();
}catch(Exception e){
e.printStackTrace();
}
}
/**
* 註1:queueDeclare第一個參數表示隊列名稱、第二個參數為是否持久化(true表示是,隊列將在服務器重啟時生存)、
* 第三個參數為是否是獨占隊列(創建者可以使用的私有隊列,斷開後自動刪除)、第四個參數為當所有消費者客戶端連接斷開時是否自動刪除隊列、第五個參數為隊列的其他參數
註2:basicPublish第一個參數為交換機名稱、第二個參數為隊列映射的路由key、第三個參數為消息的其他屬性、第四個參數為發送信息的主體
聲明隊列後mq後臺可看到該隊列及存放的消息
*/
public static void quene(){
try{
//創建連接工廠
ConnectionFactory factory = new ConnectionFactory();
//設置RabbitMQ相關信息
factory.setHost("192.168.89.131");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
// factory.setVirtualHost("/");
//創建一個新的連接
Connection connection = factory.newConnection();
//創建一個通道
Channel channel = connection.createChannel();
// 聲明一個隊列
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
String message = "Hello RabbitMQ-1";
//發送消息到隊列中
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("Producer Sendss +‘" + message + "‘");
//關閉通道和連接
channel.close();
connection.close();
}catch(Exception e){
e.printStackTrace();
}
}
}
註1:queueDeclare第一個參數表示隊列名稱、第二個參數為是否持久化(true表示是,隊列將在服務器重啟時生存)、第三個參數為是否是獨占隊列(創建者可以使用的私有隊列,斷開後自動刪除)、第四個參數為當所有消費者客戶端連接斷開時是否自動刪除隊列、第五個參數為隊列的其他參數
註2:basicPublish第一個參數為交換機名稱、第二個參數為隊列映射的路由key、第三個參數為消息的其他屬性、第四個參數為發送信息的主體
package com.xmh.mq;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
public class ConsumerTest {
private final static String QUEUE_NAME = "rabbitMQ.test1";
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
test_exchange();
test_exchange2();
}
/**
* 必須先訂閱(先啟動客戶端,才能收到服務端消息) 發布/訂閱
* rabbitMQ其實真正的思想是生產者不發送任何信息到隊列,甚至不知道信息將發送到哪個隊列。相反生產者只能發送信息到交換機,交換機接收到生產者的信息,
* 然後按照規則把它推送到對列中,交換機發布/訂閱 Fanout扇形交換機 其他有(Direct直連交換機、Topic主題交換機)
*/
public static void exchange1(){
try{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.89.131");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//產生一個隨機的隊列名稱
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");//對隊列進行綁定
System.out.println("ReceiveLogs1 Waiting for messages");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("ReceiveLogs1 Received ‘" + message + "‘");
}
};
channel.basicConsume(queueName, true, consumer);//隊列會自動刪除
}catch(Exception e){
e.printStackTrace();
}
}
public static void exchange2(){
try{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.89.131");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//產生一個隨機的隊列名稱
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");//對隊列進行綁定
System.out.println("ReceiveLogs2 Waiting for messages");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("ReceiveLogs2 Received ‘" + message + "‘");
}
};
channel.basicConsume(queueName, true, consumer);//隊列會自動刪除
}catch(Exception e){
e.printStackTrace();
}
}
/**
* 對於分發信息的處理
* 註:channel.basicQos(1);保證一次只分發一個 。autoAck是否自動回復,如果為true的話,每次生產者只要發送信息就會從內存中刪除,那麽如果消費者程序異常退出,
* 那麽就無法獲取數據,我們當然是不希望出現這樣的情況,所以才去手動回復,每當消費者收到並處理信息然後在通知生成者。最後從隊列中刪除這條信息。如果消費者異常退出,
* 如果還有其他消費者,那麽就會把隊列中的消息發送給其他消費者,如果沒有,等消費者啟動時候再次發送。
*/
public static void fenfa1(){
try{
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.89.131");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
System.out.println("Worker1 Waiting for messages");
//每次從隊列獲取的數量
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
//envelope - packaging data for the message body - the message body (opaque, client-specific byte array)
String message = new String(body, "UTF-8");
System.out.println("Worker1 Received ‘" + message + "‘");
try {
doWork(message);
channel.basicAck(envelope.getDeliveryTag(),false);//Acknowledge one or several received messages. .getDeliveryTag() 消息傳遞標記
System.out.println("Worker1 Done");
}catch (Exception e){
channel.abort();//終止渠道
}finally {
}
}
};
boolean autoAck=false;//手動回復
//消息消費完成確認
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
} catch (Exception e){
e.printStackTrace();
}
}
//對於分發信息的處理
public static void fenfa2(){
try{
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.89.131");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
System.out.println("Worker2 Waiting for messages");
//每次從隊列獲取的數量
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
//envelope - packaging data for the message body - the message body (opaque, client-specific byte array)
String message = new String(body, "UTF-8");
System.out.println("Worker2 Received ‘" + message + "‘");
try {
doWork(message);
//Integer.parseInt("s");
channel.basicAck(envelope.getDeliveryTag(),false);//Acknowledge one or several received messages. .getDeliveryTag() 消息傳遞標記
System.out.println("Worker2 Done");
}catch (Exception e){
channel.abort();//終止渠道,另外一個客戶端會繼續獲取消息
System.out.println("Worker2 客戶端處理異常");
}finally {
}
}
};
boolean autoAck=false;
//消息消費完成確認
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
} catch (Exception e){
e.printStackTrace();
}
}
private static void doWork(String task) {
try {
Thread.sleep(1000); // 暫停1秒鐘
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
/**
* 手動關掉客戶端後,該隊列會自動刪除(聲明隊列時設置)
*/
public static void test2(){
try{
// 創建連接工廠
ConnectionFactory factory = new ConnectionFactory();
//設置RabbitMQ地址
factory.setHost("192.168.89.131");// 192.168.89.131 :15672
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
//創建一個新的連接
Connection connection = factory.newConnection();
//創建一個通道
Channel channel = connection.createChannel();
//聲明要關註的隊列
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
System.out.println("Customer Waiting Received messages");
//DefaultConsumer類實現了Consumer接口,通過傳入一個頻道,
// 告訴服務器我們需要那個頻道的消息,如果頻道中有消息,就會執行回調函數handleDelivery,持續監聽
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Customer Received ‘" + message + "‘");
}
};
//自動回復隊列應答 -- RabbitMQ中的消息確認機制
channel.basicConsume(QUEUE_NAME, true, consumer);
} catch (Exception e){
e.printStackTrace();
}
}
}
rabbitMq及安裝、fanout交換機-分發(發布/訂閱)