1. 程式人生 > >學習筆記--Java消息中間件

學習筆記--Java消息中間件

應用 true ons discover 方式 new 鏈路 消息同步 con

#### 消息中間件

消息中間件:關註於數據的發送和接受,利用高效可靠的異步消息傳遞機制集成分布式系統
JMS:Java消息服務,Java平臺中關於面向消息中間件的API
AMQP:提供統一消息服務的應用層標準協議
常見消息中間件
ActiveMQ
RabbitMQ
Kafka
JMS規範
提供者:實現JMS規範的消息中間件服務器
客戶端:發送或接受消息的應用程序
生產者/發布者:創建並發送消息的客戶端
消費者/訂閱者:接收並處理消息的客戶端
消息:應用程序之間傳遞的數據內容
消息模式:在客戶端之間傳遞消息的方式,JMS中定義了主題和隊列兩種模式
JMS消息模式
隊列模型:

  • 客戶端包括生產者和消費者
  • 消息只能被一個消費者消費
  • 隨時消費

主題模型:

  • 客戶端包括發布者和訂閱者
  • 消息能被所有訂閱者消費
  • 消費者不能消費訂閱之前就發送到主題中的消息

JMS編碼接口:

  • ConnectionFactory:用於創建連接到消息中間件的連接工廠
  • Connection:代表了應用程序和消息服務器之間的通信鏈路
  • Destination:消息發布和接收的地點,包括隊列和主題
  • Session:表示一個單線程的上下文,用於發送和接收消息
  • MessageConsumer:由會話創建,用於接收發送到目標的消息
  • MessageProducer:由會話創建,用於發送消息到目標
  • Message:在消費者和生產者之間傳送的對象,包括消息頭,一組消息屬性,一個消息體

使用ActiveMQ
隊列模型
producer

        //1. 創建ConnectionFactory
        ConnectionFactory factory = new ActiveMQConnectionFactory(url);

        //2. 創建Connection
        Connection connection = factory.createConnection();

        //3. 啟動Connection
        connection.start();

        //4. 創建Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5. 創建Destination
        Destination destination = session.createQueue(queueName);

        //6. 創建MessageProducer
        MessageProducer producer = session.createProducer(destination);

        for (int i = 0; i < 100; i++) {
            //7. 創建消息
            TextMessage message = session.createTextMessage("test" + i);

            //8. 發布消息
            producer.send(message);

            System.out.println("發送消息: " + message.getText());
        }


        //9. 關閉連接
        connection.close();

consumer

        //1. 創建ConnectionFactory
        ConnectionFactory factory = new ActiveMQConnectionFactory(url);

        //2. 創建Connection
        Connection connection = factory.createConnection();

        //3. 啟動Connection
        connection.start();

        //4. 創建Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5. 創建Destination
        Destination destination = session.createQueue(queueName);

        //6. 創建MessageConsumer
        MessageConsumer consumer = session.createConsumer(destination);

        //7. 創建消息監聽器
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接收消息: " + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });

        //9. 關閉連接(消息監聽異步執行,需程序全部運行結束才能關閉連接)
//        connection.close();

主題模型
producer

        //1. 創建ConnectionFactory
        ConnectionFactory factory = new ActiveMQConnectionFactory(url);

        //2. 創建Connection
        Connection connection = factory.createConnection();

        //3. 啟動Connection
        connection.start();

        //4. 創建Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5. 創建Destination
        Destination destination = session.createTopic(topicName);

        //6. 創建MessageProducer
        MessageProducer producer = session.createProducer(destination);

        for (int i = 0; i < 100; i++) {
            //7. 創建消息
            TextMessage message = session.createTextMessage("test" + i);

            //8. 發布消息
            producer.send(message);

            System.out.println("發送消息: " + message.getText());
        }
        

        //9. 關閉連接
        connection.close();

consumer

        //1. 創建ConnectionFactory
        ConnectionFactory factory = new ActiveMQConnectionFactory(url);

        //2. 創建Connection
        Connection connection = factory.createConnection();

        //3. 啟動Connection
        connection.start();

        //4. 創建Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5. 創建Destination
        Destination destination = session.createTopic(topicName);

        //6. 創建MessageConsumer
        MessageConsumer consumer = session.createConsumer(destination);

        //7. 創建消息監聽器
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接收消息: " + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });

        //9. 關閉連接(消息監聽異步執行,需程序全部運行結束才能關閉連接)
//        connection.close();

spring jms

  • ConnectionFactory 用於管理連接的連接工廠
  • 由spring提供
  • SingleConnectionFactory和CachingConnectionFactory
  • JmsTemplate 用於發送和接收消息的模板類
  • 由spring提供,在容器中註冊就可以使用
  • 線程安全
  • MessageListener 消息監聽器
  • 實現一個onMessage方法,只接收一個Message參數

spring使用jms示例
common.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:context="http://www.springframework.org/schema/context"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
    <context:annotation-config />

    <!-- ActiveMQ為我們提供的ConnectionFactory -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://127.0.0.1:61616" />
    </bean>
    <!-- spring jms為我們提供連接池 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    <!-- 一個隊列目的地,點對點的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="queue" />
    </bean>
    <!-- 一個主題目的地,發布訂閱消息 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic"/>
    </bean>
</beans>

producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <import resource="common.xml" />

    <!-- 配置JmsTemplate,用於發送消息-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>

    <bean class="com.qyluo.jms.spring.producer.ProducerServiceImpl" />
</beans>

cosumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!-- 導入公共配置 -->
    <import resource="common.xml" />

    <!-- 配置消息監聽器 -->
    <bean id="consumerMessageListener" class="com.qyluo.jms.spring.consumer.ConsumerMessageListener" />

    <!-- 配置消息監聽容器 -->
    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="topicDestination"/>
        <property name="messageListener" ref="consumerMessageListener"/>
    </bean>

</beans>

ProducerServiceImpl

public class ProducerServiceImpl implements ProducerService {
    @Autowired
    JmsTemplate jmsTemplate;

    @Resource(name = "topicDestination")
    Destination destination;

    @Override
    public void sendMessage(final String message) {
        //使用JmsTemplate發送消息
        jmsTemplate.send(destination, new MessageCreator() {
            //創建一個消息
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage(message);
                return textMessage;
            }
        });
        System.out.println("發送消息: " + message);
    }
}

AppProducer

public class AppProducer {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
        ProducerService service = context.getBean(ProducerService.class);
        for (int i = 0; i < 100; i++) {
            service.sendMessage("text" + i);
        }
        context.close();
    }
}

ConsumerMessageListener
public class ConsumerMessageListener implements MessageListener { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收消息: " + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }

AppConsumer

public class AppConsumer {
    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
    }
}

ActiveMQ集群

集群方式

  • 客戶端集群:多個消費者消費同一個隊列
  • Broker clusters:多個Broker之間同步消息
  • Master Slave:實現高可用

ActiveMQ失效轉移(failover)
允許當其中一臺消息服務器宕機時,客戶端在傳輸層上重新連接到其它消息服務器
語法:failover:(uri1,...,uriN)?transportOptions
transportOptions參數說明

  • randomize 默認為true,表示在URI列表中選擇URI連接時是否采用隨機策略
  • initialReconnectDelay 默認為10,單位毫秒,表示第一次嘗試重新連接之間等待的時間
  • maxReconnectDelay 默認為30000,單位毫秒,最長重連的時間間隔

Broker Cluster集群配置
NetworkConnector(網絡連接器):ActiveMQ服務器之間的網絡通訊方式
分為靜態連接器和動態連接器
靜態連接器:

<networkConnectors>
    <networkConnector uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)"/>
</networkConnectors>

動態連接器:

<networkConnectors>
    <networkConnector uri="multicast://default"/>
</networkConnectors>

<transportConnectors>
    <transportConnector uri="tcp://localhost:0" discoverUri="multicast://default"/>
</transportConnectors>

Master/Slave集群配置
ActiveMQ Master Slave集群方案

  • Share nothing storage master/slave (已過時,5.8+後移除)
  • Shared storage master/slave 共享存儲
  • Replicated LevelDB Store 基於復制的LevelDB Store

兩種集群方式對比
方式 | 高可用 | 負載均衡 |
--|----------|--------------|
Master/Slave | 是 | 否 |
Broker Cluster | 否 | 是 |

三臺服務器的完美集群方案
Node A和Node B做消息同步,Node A和Node C做消息同步,Node B和Node C做Master / Slave對資源進行持久化

服務器 服務端口 管理端口 存儲 網絡連接器 用途
Node-A 61616 8161 - Node-B、Node-C 消費者
Node-B 61617 8162 /share_file/kahadb Node-A 生產者,消費者
Node-C 61618 8163 /share_file/kahadb Node-A 生產者,消費者

企業系統中的最佳實踐

實際業務場景特點

  • 子業務系統都有集群的可能性
  • 同一個消息會廣播給關註該類消息的所有子業務系統
  • 同一類消息在集群中被負載消費
  • 業務的發生和消息的發布最終一致性

使用ActiveMQ的虛擬主題解決方案

  • 發布者:將消息發布到一個主題中,主題名以VirtualTopic開頭,如VirtualTopic.TEST
  • 消費者:從隊列中獲取消息,在隊列名中表明自己身份,如Consumer.A.VirtualTopic.TEST

使用JMS中XA系列接口保證強一致性

  • 引入分布式事務
  • 要求業務操作必須支持XA協議

使用消息表的本地事務解決方案

使用內存日誌的解決方案

基於消息機制的事件總線
事件驅動架構

RabbitMQ

RabbitMQ:使用交換器綁定到隊列

  • 創建ConnectionFactory
  • 創建Connection
  • 創建Channel
  • 定義Exchange,類型I必須為fanout
  • 定義Queue並且綁定隊列

Kafka

Kafka使用group.id分組消費者

  • 配置消息者參數group.id相同時對消息進行負載處理
  • 配置服務器partitions參數,控制同一個group.id下的consumer數量小於partitions
  • kafka只保證同一個group.id下的消息是有序的

學習筆記--Java消息中間件