1. 程式人生 > >java訊息佇列ActiveMQ的簡單使用

java訊息佇列ActiveMQ的簡單使用

activeMQ

是學習java訊息佇列的實現專案,使用jfinal + jfinal-ext + activeMQ + quartz快速構建。

1.訊息佇列

訊息佇列,其實是一種基於資料結構實現的服務。而java語言中的實現,有apache的activeMQ,比較主流。

2.環境搭建

首先去apache的官網下載apache-activeMQ-...-.zip的包,解壓後,執行bin中的activeMQ服務。
在瀏覽器中輸入http://localhost:8186/admin,出現登陸介面輸入admin/admin登陸即可。


然後建立一個FirstQueue佇列(給後面的例項提供服務)。

3.activeMQ原始操作

記住activeMQ服務一定要一直開啟,傳送者和接收者都會通過tcp協議去連結伺服器,以取得訊息佇列中的訊息體。
如下圖是我的伺服器cmd截圖:

3.1.首先建立傳送者Sender.java

<code class="language-java hljs  has-numbering"><span class="hljs-keyword">package</span> com.mg.demo;

<span class="hljs-keyword">import</span> javax.jms.Connection;
<span class="hljs-keyword">import</span> javax.jms.ConnectionFactory;
<span class="hljs-keyword">import</span> javax.jms.DeliveryMode;
<span class="hljs-keyword">import</span> javax.jms.Destination;
<span class="hljs-keyword">import</span> javax.jms.MessageProducer;
<span class="hljs-keyword">import</span> javax.jms.Session;
<span class="hljs-keyword">import</span> javax.jms.TextMessage;

<span class="hljs-keyword">import</span> org.apache.activemq.ActiveMQConnection;
<span class="hljs-keyword">import</span> org.apache.activemq.ActiveMQConnectionFactory;

<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Sender</span> {</span>
    <span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">final</span> <span class="hljs-keyword">int</span> SEND_NUMBER = <span class="hljs-number">5</span>;

    <span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span>(String[] args) {
        <span class="hljs-comment">// ConnectionFactory :連線工廠,JMS 用它建立連線</span>
        ConnectionFactory connectionFactory; 
        <span class="hljs-comment">// Connection :JMS 客戶端到JMS</span>
        <span class="hljs-comment">// Provider 的連線</span>
        Connection connection = <span class="hljs-keyword">null</span>; 
        <span class="hljs-comment">// Session: 一個傳送或接收訊息的執行緒</span>
        Session session; 
        <span class="hljs-comment">// Destination :訊息的目的地;訊息傳送給誰.</span>
        Destination destination; 
        <span class="hljs-comment">// MessageProducer:訊息傳送者</span>
        MessageProducer producer; 
        <span class="hljs-comment">// TextMessage message;</span>
        <span class="hljs-comment">// 構造ConnectionFactory例項物件,此處採用ActiveMq的實現jar</span>
        connectionFactory = <span class="hljs-keyword">new</span> ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, <span class="hljs-string">"tcp://localhost:61616"</span>);
        <span class="hljs-keyword">try</span> { <span class="hljs-comment">// 構造從工廠得到連線物件</span>
            connection = connectionFactory.createConnection();
            <span class="hljs-comment">// 啟動</span>
            connection.start();
            <span class="hljs-comment">// 獲取操作連線</span>
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            <span class="hljs-comment">// 獲取session注意引數值xingbo.xu-queue是一個伺服器的queue,須在在ActiveMq的console配置</span>
            destination = session.createQueue(<span class="hljs-string">"FirstQueue"</span>);
            <span class="hljs-comment">// 得到訊息生成者【傳送者】</span>
            producer = session.createProducer(destination);
            <span class="hljs-comment">// 設定不持久化,此處學習,實際根據專案決定</span>
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            <span class="hljs-comment">// 構造訊息,此處寫死,專案就是引數,或者方法獲取</span>
            sendMessage(session, producer);
            session.commit();
        } <span class="hljs-keyword">catch</span> (Exception e) {
            e.printStackTrace();
        } <span class="hljs-keyword">finally</span> {
            <span class="hljs-keyword">try</span> {
                <span class="hljs-keyword">if</span> (<span class="hljs-keyword">null</span> != connection)
                    connection.close();
            } <span class="hljs-keyword">catch</span> (Throwable ignore) {
            }
        }
    }

    <span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">sendMessage</span>(Session session, MessageProducer producer) <span class="hljs-keyword">throws</span> Exception {
        <span class="hljs-keyword">for</span> (<span class="hljs-keyword">int</span> i = <span class="hljs-number">1</span>; i <= SEND_NUMBER; i++) {
            TextMessage message = session.createTextMessage(<span class="hljs-string">"ActiveMq 傳送的訊息"</span> + i);
            <span class="hljs-comment">// 傳送訊息到目的地方</span>

            System.out.println(<span class="hljs-string">"傳送訊息:"</span> + <span class="hljs-string">"ActiveMq 傳送的訊息"</span> + i);
            producer.send(message);
        }
    }
}
</code><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li><li>20</li><li>21</li><li>22</li><li>23</li><li>24</li><li>25</li><li>26</li><li>27</li><li>28</li><li>29</li><li>30</li><li>31</li><li>32</li><li>33</li><li>34</li><li>35</li><li>36</li><li>37</li><li>38</li><li>39</li><li>40</li><li>41</li><li>42</li><li>43</li><li>44</li><li>45</li><li>46</li><li>47</li><li>48</li><li>49</li><li>50</li><li>51</li><li>52</li><li>53</li><li>54</li><li>55</li><li>56</li><li>57</li><li>58</li><li>59</li><li>60</li><li>61</li><li>62</li><li>63</li><li>64</li><li>65</li><li>66</li><li>67</li><li>68</li><li>69</li></ul><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li><li>20</li><li>21</li><li>22</li><li>23</li><li>24</li><li>25</li><li>26</li><li>27</li><li>28</li><li>29</li><li>30</li><li>31</li><li>32</li><li>33</li><li>34</li><li>35</li><li>36</li><li>37</li><li>38</li><li>39</li><li>40</li><li>41</li><li>42</li><li>43</li><li>44</li><li>45</li><li>46</li><li>47</li><li>48</li><li>49</li><li>50</li><li>51</li><li>52</li><li>53</li><li>54</li><li>55</li><li>56</li><li>57</li><li>58</li><li>59</li><li>60</li><li>61</li><li>62</li><li>63</li><li>64</li><li>65</li><li>66</li><li>67</li><li>68</li><li>69</li></ul>

3.2.再建立接收者Receiver.java

<code class="language-java hljs  has-numbering"><span class="hljs-keyword">package</span> com.mg.demo;

<span class="hljs-keyword">import</span> javax.jms.Connection;
<span class="hljs-keyword">import</span> javax.jms.ConnectionFactory;
<span class="hljs-keyword">import</span> javax.jms.Destination;
<span class="hljs-keyword">import</span> javax.jms.MessageConsumer;
<span class="hljs-keyword">import</span> javax.jms.Session;
<span class="hljs-keyword">import</span> javax.jms.TextMessage;

<span class="hljs-keyword">import</span> org.apache.activemq.ActiveMQConnection;
<span class="hljs-keyword">import</span> org.apache.activemq.ActiveMQConnectionFactory;

<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Receiver</span>{</span>
    <span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span>(String[] args) {
        <span class="hljs-comment">// ConnectionFactory :連線工廠,JMS 用它建立連線</span>
        ConnectionFactory connectionFactory;
        <span class="hljs-comment">// Connection :JMS 客戶端到JMS Provider 的連線</span>
        Connection connection = <span class="hljs-keyword">null</span>;
        <span class="hljs-comment">// Session: 一個傳送或接收訊息的執行緒</span>
        Session session;
        <span class="hljs-comment">// Destination :訊息的目的地;訊息傳送給誰.</span>
        Destination destination;
        <span class="hljs-comment">// 消費者,訊息接收者</span>
        MessageConsumer consumer;
        connectionFactory = <span class="hljs-keyword">new</span> ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, <span class="hljs-string">"tcp://localhost:61616"</span>);
        <span class="hljs-keyword">try</span> {
            <span class="hljs-comment">// 構造從工廠得到連線物件</span>
            connection = connectionFactory.createConnection();
            <span class="hljs-comment">// 啟動</span>
            connection.start();
            <span class="hljs-comment">// 獲取操作連線</span>
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            <span class="hljs-comment">// 獲取session注意引數值xingbo.xu-queue是一個伺服器的queue,須在在ActiveMq的console配置</span>
            destination = session.createQueue(<span class="hljs-string">"FirstQueue"</span>);
            consumer = session.createConsumer(destination);
            <span class="hljs-keyword">while</span> (<span class="hljs-keyword">true</span>) {
                <span class="hljs-comment">// 設定接收者接收訊息的時間,為了便於測試,這裡誰定為100s</span>
                TextMessage message = (TextMessage) consumer.receive(<span class="hljs-number">100000</span>);
                <span class="hljs-keyword">if</span> (<span class="hljs-keyword">null</span> != message) {
                    System.out.println(<span class="hljs-string">"收到訊息"</span> + message.getText());
                } <span class="hljs-keyword">else</span> {
                    <span class="hljs-keyword">break</span>;
                }
            }
        } <span class="hljs-keyword">catch</span> (Exception e) {
            e.printStackTrace();
        } <span class="hljs-keyword">finally</span> {
            <span class="hljs-keyword">try</span> {
                <span class="hljs-keyword">if</span> (<span class="hljs-keyword">null</span> != connection)
                    connection.close();
            } <span class="hljs-keyword">catch</span> (Throwable ignore) {
            }
        }
    }
}</code><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li><li>20</li><li>21</li><li>22</li><li>23</li><li>24</li><li>25</li><li>26</li><li>27</li><li>28</li><li>29</li><li>30</li><li>31</li><li>32</li><li>33</li><li>34</li><li>35</li><li>36</li><li>37</li><li>38</li><li>39</li><li>40</li><li>41</li><li>42</li><li>43</li><li>44</li><li>45</li><li>46</li><li>47</li><li>48</li><li>49</li><li>50</li><li>51</li><li>52</li><li>53</li><li>54</li><li>55</li><li>56</li></ul><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li><li>20</li><li>21</li><li>22</li><li>23</li><li>24</li><li>25</li><li>26</li><li>27</li><li>28</li><li>29</li><li>30</li><li>31</li><li>32</li><li>33</li><li>34</li><li>35</li><li>36</li><li>37</li><li>38</li><li>39</li><li>40</li><li>41</li><li>42</li><li>43</li><li>44</li><li>45</li><li>46</li><li>47</li><li>48</li><li>49</li><li>50</li><li>51</li><li>52</li><li>53</li><li>54</li><li>55</li><li>56</li></ul>

3.3.測試結果

先執行接收者Receiver.java,在執行Sender.java。
得到結果如下圖:(2個控制檯都會輸出如下圖資料)

4.使用jfinal-ext中的jms外掛操作activeMQ

整合quartz任務排程框架,實現每10秒傳送一次訊息到佇列。

4.1.核心程式碼

<code class="language-java hljs  has-numbering"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span>(String[] args) <span class="hljs-keyword">throws</span> InstantiationException, IllegalAccessException, ClassNotFoundException {
    JmsPlugin jp = <span class="hljs-keyword">new</span> JmsPlugin(<span class="hljs-string">"jms.properties"</span>);
    jp.start();
    PropertyConfig pc = PropertyConfig.me();
    pc.loadPropertyFile(<span class="hljs-string">"job.properties"</span>);
    QuartzPlugin qp = <span class="hljs-keyword">new</span> QuartzPlugin();
    <span class="hljs-keyword">if</span> (pc.getPropertyToBoolean(<span class="hljs-string">"a.enable"</span>)) {
        qp.add(pc.getProperty(<span class="hljs-string">"a.cron"</span>), (Job) Class.forName(pc.getProperty(<span class="hljs-string">"a.job"</span>)).newInstance());
    }
    qp.start();
}</code><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li></ul><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li></ul>

4.2.配置檔案jms.properties

<code class="language-txt hljs coffeescript has-numbering"><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">##</span>
<span class="hljs-comment">#          server info         #</span>
<span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">##</span>
<span class="hljs-comment"># jms伺服器地址</span>
serverUrl=<span class="hljs-attribute">tcp</span>:<span class="hljs-regexp">//</span><span class="hljs-attribute">localhost</span>:<span class="hljs-number">61616</span>
username=admin
password=admin

<span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">##</span>
<span class="hljs-comment">#          queue info          #</span>
<span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">##</span>
<span class="hljs-comment"># 傳送的佇列名字,用“,”號分隔</span>
sendQueues=firstMQ

<span class="hljs-comment"># 接受的佇列的名字,用“,”號分隔</span>
receiveQueues=firstMQ
<span class="hljs-comment"># 佇列firstMQ上訊息名字為a的訊息號</span>
queue.firstMQ.a=<span class="hljs-number">10000</span>
<span class="hljs-comment">#接受到佇列q1上訊息名字為a的訊息的時候呼叫的處理器</span>
queue.firstMQ.a.resolver=com.mg.jfinal.ext.demo.resolver.MGResolver</code><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li><li>20</li></ul><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li><li>20</li></ul>

4.3.配置檔案job.properties

<code class="language-txt hljs avrasm has-numbering"><span class="hljs-preprocessor">#JobA</span>
a<span class="hljs-preprocessor">.job</span>=<span class="hljs-keyword">com</span><span class="hljs-preprocessor">.mg</span><span class="hljs-preprocessor">.jfinal</span><span class="hljs-preprocessor">.task</span><span class="hljs-preprocessor">.JobA</span>
a<span class="hljs-preprocessor">.cron</span>=*/<span class="hljs-number">10</span> * * * * ?
a<span class="hljs-preprocessor">.enable</span>=true</code><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li></ul><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li></ul>

4.4.執行結果

如圖:

5.程式碼地址

6. bat