java訊息佇列ActiveMQ的簡單使用
阿新 • • 發佈:2019-01-07
activeMQ
是學習java訊息佇列的實現專案,使用jfinal + jfinal-ext + activeMQ + quartz快速構建。
1.訊息佇列
訊息佇列,其實是一種基於資料結構實現的服務。而java語言中的實現,有apache的activeMQ,比較主流。
2.環境搭建
首先去apache的官網下載apache-activeMQ-...-.zip的包,解壓後,執行bin中的activeMQ服務。
在瀏覽器中輸入http://localhost:8186/admin,出現登陸介面輸入admin/admin登陸即可。
然後建立一個FirstQueue佇列(給後面的例項提供服務)。
3.activeMQ原始操作
記住activeMQ服務一定要一直開啟,傳送者和接收者都會通過tcp協議去連結伺服器,以取得訊息佇列中的訊息體。
如下圖是我的伺服器cmd截圖:
3.1.首先建立傳送者Sender.java
<code class="language-java hljs has-numbering"><span class="hljs-keyword">package</span> com.mg.demo; <span class="hljs-keyword">import</span> javax.jms.Connection; <span class="hljs-keyword">import</span> javax.jms.ConnectionFactory; <span class="hljs-keyword">import</span> javax.jms.DeliveryMode; <span class="hljs-keyword">import</span> javax.jms.Destination; <span class="hljs-keyword">import</span> javax.jms.MessageProducer; <span class="hljs-keyword">import</span> javax.jms.Session; <span class="hljs-keyword">import</span> javax.jms.TextMessage; <span class="hljs-keyword">import</span> org.apache.activemq.ActiveMQConnection; <span class="hljs-keyword">import</span> org.apache.activemq.ActiveMQConnectionFactory; <span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Sender</span> {</span> <span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">final</span> <span class="hljs-keyword">int</span> SEND_NUMBER = <span class="hljs-number">5</span>; <span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span>(String[] args) { <span class="hljs-comment">// ConnectionFactory :連線工廠,JMS 用它建立連線</span> ConnectionFactory connectionFactory; <span class="hljs-comment">// Connection :JMS 客戶端到JMS</span> <span class="hljs-comment">// Provider 的連線</span> Connection connection = <span class="hljs-keyword">null</span>; <span class="hljs-comment">// Session: 一個傳送或接收訊息的執行緒</span> Session session; <span class="hljs-comment">// Destination :訊息的目的地;訊息傳送給誰.</span> Destination destination; <span class="hljs-comment">// MessageProducer:訊息傳送者</span> MessageProducer producer; <span class="hljs-comment">// TextMessage message;</span> <span class="hljs-comment">// 構造ConnectionFactory例項物件,此處採用ActiveMq的實現jar</span> connectionFactory = <span class="hljs-keyword">new</span> ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, <span class="hljs-string">"tcp://localhost:61616"</span>); <span class="hljs-keyword">try</span> { <span class="hljs-comment">// 構造從工廠得到連線物件</span> connection = connectionFactory.createConnection(); <span class="hljs-comment">// 啟動</span> connection.start(); <span class="hljs-comment">// 獲取操作連線</span> session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); <span class="hljs-comment">// 獲取session注意引數值xingbo.xu-queue是一個伺服器的queue,須在在ActiveMq的console配置</span> destination = session.createQueue(<span class="hljs-string">"FirstQueue"</span>); <span class="hljs-comment">// 得到訊息生成者【傳送者】</span> producer = session.createProducer(destination); <span class="hljs-comment">// 設定不持久化,此處學習,實際根據專案決定</span> producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); <span class="hljs-comment">// 構造訊息,此處寫死,專案就是引數,或者方法獲取</span> sendMessage(session, producer); session.commit(); } <span class="hljs-keyword">catch</span> (Exception e) { e.printStackTrace(); } <span class="hljs-keyword">finally</span> { <span class="hljs-keyword">try</span> { <span class="hljs-keyword">if</span> (<span class="hljs-keyword">null</span> != connection) connection.close(); } <span class="hljs-keyword">catch</span> (Throwable ignore) { } } } <span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">sendMessage</span>(Session session, MessageProducer producer) <span class="hljs-keyword">throws</span> Exception { <span class="hljs-keyword">for</span> (<span class="hljs-keyword">int</span> i = <span class="hljs-number">1</span>; i <= SEND_NUMBER; i++) { TextMessage message = session.createTextMessage(<span class="hljs-string">"ActiveMq 傳送的訊息"</span> + i); <span class="hljs-comment">// 傳送訊息到目的地方</span> System.out.println(<span class="hljs-string">"傳送訊息:"</span> + <span class="hljs-string">"ActiveMq 傳送的訊息"</span> + i); producer.send(message); } } } </code><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li><li>20</li><li>21</li><li>22</li><li>23</li><li>24</li><li>25</li><li>26</li><li>27</li><li>28</li><li>29</li><li>30</li><li>31</li><li>32</li><li>33</li><li>34</li><li>35</li><li>36</li><li>37</li><li>38</li><li>39</li><li>40</li><li>41</li><li>42</li><li>43</li><li>44</li><li>45</li><li>46</li><li>47</li><li>48</li><li>49</li><li>50</li><li>51</li><li>52</li><li>53</li><li>54</li><li>55</li><li>56</li><li>57</li><li>58</li><li>59</li><li>60</li><li>61</li><li>62</li><li>63</li><li>64</li><li>65</li><li>66</li><li>67</li><li>68</li><li>69</li></ul><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li><li>20</li><li>21</li><li>22</li><li>23</li><li>24</li><li>25</li><li>26</li><li>27</li><li>28</li><li>29</li><li>30</li><li>31</li><li>32</li><li>33</li><li>34</li><li>35</li><li>36</li><li>37</li><li>38</li><li>39</li><li>40</li><li>41</li><li>42</li><li>43</li><li>44</li><li>45</li><li>46</li><li>47</li><li>48</li><li>49</li><li>50</li><li>51</li><li>52</li><li>53</li><li>54</li><li>55</li><li>56</li><li>57</li><li>58</li><li>59</li><li>60</li><li>61</li><li>62</li><li>63</li><li>64</li><li>65</li><li>66</li><li>67</li><li>68</li><li>69</li></ul>
3.2.再建立接收者Receiver.java
<code class="language-java hljs has-numbering"><span class="hljs-keyword">package</span> com.mg.demo; <span class="hljs-keyword">import</span> javax.jms.Connection; <span class="hljs-keyword">import</span> javax.jms.ConnectionFactory; <span class="hljs-keyword">import</span> javax.jms.Destination; <span class="hljs-keyword">import</span> javax.jms.MessageConsumer; <span class="hljs-keyword">import</span> javax.jms.Session; <span class="hljs-keyword">import</span> javax.jms.TextMessage; <span class="hljs-keyword">import</span> org.apache.activemq.ActiveMQConnection; <span class="hljs-keyword">import</span> org.apache.activemq.ActiveMQConnectionFactory; <span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Receiver</span>{</span> <span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span>(String[] args) { <span class="hljs-comment">// ConnectionFactory :連線工廠,JMS 用它建立連線</span> ConnectionFactory connectionFactory; <span class="hljs-comment">// Connection :JMS 客戶端到JMS Provider 的連線</span> Connection connection = <span class="hljs-keyword">null</span>; <span class="hljs-comment">// Session: 一個傳送或接收訊息的執行緒</span> Session session; <span class="hljs-comment">// Destination :訊息的目的地;訊息傳送給誰.</span> Destination destination; <span class="hljs-comment">// 消費者,訊息接收者</span> MessageConsumer consumer; connectionFactory = <span class="hljs-keyword">new</span> ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, <span class="hljs-string">"tcp://localhost:61616"</span>); <span class="hljs-keyword">try</span> { <span class="hljs-comment">// 構造從工廠得到連線物件</span> connection = connectionFactory.createConnection(); <span class="hljs-comment">// 啟動</span> connection.start(); <span class="hljs-comment">// 獲取操作連線</span> session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); <span class="hljs-comment">// 獲取session注意引數值xingbo.xu-queue是一個伺服器的queue,須在在ActiveMq的console配置</span> destination = session.createQueue(<span class="hljs-string">"FirstQueue"</span>); consumer = session.createConsumer(destination); <span class="hljs-keyword">while</span> (<span class="hljs-keyword">true</span>) { <span class="hljs-comment">// 設定接收者接收訊息的時間,為了便於測試,這裡誰定為100s</span> TextMessage message = (TextMessage) consumer.receive(<span class="hljs-number">100000</span>); <span class="hljs-keyword">if</span> (<span class="hljs-keyword">null</span> != message) { System.out.println(<span class="hljs-string">"收到訊息"</span> + message.getText()); } <span class="hljs-keyword">else</span> { <span class="hljs-keyword">break</span>; } } } <span class="hljs-keyword">catch</span> (Exception e) { e.printStackTrace(); } <span class="hljs-keyword">finally</span> { <span class="hljs-keyword">try</span> { <span class="hljs-keyword">if</span> (<span class="hljs-keyword">null</span> != connection) connection.close(); } <span class="hljs-keyword">catch</span> (Throwable ignore) { } } } }</code><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li><li>20</li><li>21</li><li>22</li><li>23</li><li>24</li><li>25</li><li>26</li><li>27</li><li>28</li><li>29</li><li>30</li><li>31</li><li>32</li><li>33</li><li>34</li><li>35</li><li>36</li><li>37</li><li>38</li><li>39</li><li>40</li><li>41</li><li>42</li><li>43</li><li>44</li><li>45</li><li>46</li><li>47</li><li>48</li><li>49</li><li>50</li><li>51</li><li>52</li><li>53</li><li>54</li><li>55</li><li>56</li></ul><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li><li>20</li><li>21</li><li>22</li><li>23</li><li>24</li><li>25</li><li>26</li><li>27</li><li>28</li><li>29</li><li>30</li><li>31</li><li>32</li><li>33</li><li>34</li><li>35</li><li>36</li><li>37</li><li>38</li><li>39</li><li>40</li><li>41</li><li>42</li><li>43</li><li>44</li><li>45</li><li>46</li><li>47</li><li>48</li><li>49</li><li>50</li><li>51</li><li>52</li><li>53</li><li>54</li><li>55</li><li>56</li></ul>
3.3.測試結果
先執行接收者Receiver.java,在執行Sender.java。
得到結果如下圖:(2個控制檯都會輸出如下圖資料)
4.使用jfinal-ext中的jms外掛操作activeMQ
整合quartz任務排程框架,實現每10秒傳送一次訊息到佇列。
4.1.核心程式碼
<code class="language-java hljs has-numbering"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span>(String[] args) <span class="hljs-keyword">throws</span> InstantiationException, IllegalAccessException, ClassNotFoundException { JmsPlugin jp = <span class="hljs-keyword">new</span> JmsPlugin(<span class="hljs-string">"jms.properties"</span>); jp.start(); PropertyConfig pc = PropertyConfig.me(); pc.loadPropertyFile(<span class="hljs-string">"job.properties"</span>); QuartzPlugin qp = <span class="hljs-keyword">new</span> QuartzPlugin(); <span class="hljs-keyword">if</span> (pc.getPropertyToBoolean(<span class="hljs-string">"a.enable"</span>)) { qp.add(pc.getProperty(<span class="hljs-string">"a.cron"</span>), (Job) Class.forName(pc.getProperty(<span class="hljs-string">"a.job"</span>)).newInstance()); } qp.start(); }</code><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li></ul><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li></ul>
4.2.配置檔案jms.properties
<code class="language-txt hljs coffeescript has-numbering"><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">##</span> <span class="hljs-comment"># server info #</span> <span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">##</span> <span class="hljs-comment"># jms伺服器地址</span> serverUrl=<span class="hljs-attribute">tcp</span>:<span class="hljs-regexp">//</span><span class="hljs-attribute">localhost</span>:<span class="hljs-number">61616</span> username=admin password=admin <span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">##</span> <span class="hljs-comment"># queue info #</span> <span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">##</span> <span class="hljs-comment"># 傳送的佇列名字,用“,”號分隔</span> sendQueues=firstMQ <span class="hljs-comment"># 接受的佇列的名字,用“,”號分隔</span> receiveQueues=firstMQ <span class="hljs-comment"># 佇列firstMQ上訊息名字為a的訊息號</span> queue.firstMQ.a=<span class="hljs-number">10000</span> <span class="hljs-comment">#接受到佇列q1上訊息名字為a的訊息的時候呼叫的處理器</span> queue.firstMQ.a.resolver=com.mg.jfinal.ext.demo.resolver.MGResolver</code><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li><li>20</li></ul><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li><li>20</li></ul>
4.3.配置檔案job.properties
<code class="language-txt hljs avrasm has-numbering"><span class="hljs-preprocessor">#JobA</span> a<span class="hljs-preprocessor">.job</span>=<span class="hljs-keyword">com</span><span class="hljs-preprocessor">.mg</span><span class="hljs-preprocessor">.jfinal</span><span class="hljs-preprocessor">.task</span><span class="hljs-preprocessor">.JobA</span> a<span class="hljs-preprocessor">.cron</span>=*/<span class="hljs-number">10</span> * * * * ? a<span class="hljs-preprocessor">.enable</span>=true</code><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li></ul><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li></ul>
4.4.執行結果
如圖: