1. 程式人生 > >通過JMS監聽Oracle AQ,在資料苦表變化時觸發並執行Java程式

通過JMS監聽Oracle AQ,在資料苦表變化時觸發並執行Java程式

環境說明

本實驗環境基於Oracle 12C和JDK1.8,其中Oracle 12C支援多租戶特性,相較於之前的Oracle版本,使用‘C##使用者名稱‘表示使用者,例如如果資料庫使用者叫kevin,則登陸時使用C##kevin進行登陸。

一、Oracle高階訊息佇列AQ

Oracle AQ是Oracle中的訊息佇列,是Oracle中的一種高階應用,每個版本都在不斷的加強,使用DBMS_AQ系統包進行相應的操作,是Oracle的預設元件,只要安裝了Oracle資料庫就可以使用。使用AQ可以在多個Oracle資料庫、Oracle與Java、C等系統中進行資料傳輸。

下面分步驟說明如何建立Oracle AQ

1. 建立訊息負荷payload

Oracle AQ中傳遞的訊息被稱為有效負荷(payloads),格式可以是使用者自定義物件或XMLType或ANYDATA。本例中我們建立一個簡單的物件型別用於傳遞訊息。

create type demo_queue_payload_type as object (message varchar2(4000));

2. 建立隊列表

隊列表用於儲存訊息,在入隊時自動存入表中,出隊時自動刪除。使用DBMS_AQADM包進行資料表的建立,只需要寫表名,同時設定相應的屬性。對於佇列需要設定multiple_consumers為false,如果使用釋出/訂閱模式需要設定為true。

begin
  dbms_aqadm.create_queue_table(
    queue_table   => 'demo_queue_table',
    queue_payload_type => 'demo_queue_payload_type',
    multiple_consumers => false
  );
end;

執行完後可以檢視oracle表中自動生成了demo_queue_table表,可以檢視影響子段(含義比較清晰)。

3. 建立佇列並啟動

建立佇列並啟動佇列:

begin
  dbms_aqadm.create_queue (
    queue_name  => 'demo_queue',
    queue_table => 'demo_queue_table'
  );

  dbms_aqadm.start_queue(
    queue_name  =>  'demo_queue'
  );
end;

至此,我們已經建立了佇列有效負荷,隊列表和佇列。可以檢視以下系統建立了哪些相關的物件:

SELECT object_name, object_type FROM user_objects WHERE object_name != 'DEMO_QUEUE_PAYLOAD_TYPE';

OBJECT_NAME OBJECT_TYPE
------------------------------ ---------------
DEMO_QUEUE_TABLE TABLE
SYS_C009392 INDEX
SYS_LOB0000060502C00030$$ LOB
AQ$_DEMO_QUEUE_TABLE_T INDEX
AQ$_DEMO_QUEUE_TABLE_I INDEX
AQ$_DEMO_QUEUE_TABLE_E QUEUE
AQ$DEMO_QUEUE_TABLE VIEW
DEMO_QUEUE QUEUE

我們看到一個佇列帶出了一系列自動生成物件,有些是被後面直接用到的。不過有趣的是,建立了第二個佇列。這就是所謂的異常佇列(exception queue)。如果AQ無法從我們的佇列接收訊息,將記錄在該異常佇列中。

訊息多次處理出錯等情況會自動轉移到異常的佇列,對於異常佇列如何處理目前筆者還沒有找到相應的寫法,因為我使用的場景並不要求訊息必須一對一的被處理,只要起到通知的作用即可。所以如果訊息轉移到異常佇列,可以執行清空隊列表中的資料

delete from demo_queue_table;

4. 佇列的停止和刪除

如果需要刪除或重建可以使用下面的方法進行操作:

BEGIN
   DBMS_AQADM.STOP_QUEUE(
      queue_name => 'demo_queue'
      );
   DBMS_AQADM.DROP_QUEUE(
      queue_name => 'demo_queue'
      );
   DBMS_AQADM.DROP_QUEUE_TABLE(
      queue_table => 'demo_queue_table'
      );
END;

5. 入隊訊息

入列操作是一個基本的事務操作(就像往隊列表Insert),因此我們需要提交。

declare
  r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
  r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  v_message_handle RAW(16);
  o_payload demo_queue_payload_type;
begin
  o_payload := demo_queue_payload_type('what is you name ?');

  dbms_aq.enqueue(
    queue_name  => 'demo_queue',
    enqueue_options => r_enqueue_options,
    message_properties => r_message_properties,
    payload => o_payload,
    msgid => v_message_handle
  );

  commit;
end;

通過SQL語句檢視訊息是否正常入隊:

select * from aq$demo_queue_table;
select user_data from aq$demo_queue_table;

6. 出隊訊息

使用Oracle進行出隊操作

declare
  r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
  r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  v_message_handle RAW(16);
  o_payload demo_queue_payload_type;
begin
  DBMS_AQ.DEQUEUE(
    queue_name => 'demo_queue',
    dequeue_options => r_dequeue_options,
    message_properties => r_message_properties,
    payload => o_payload,
    msgid => v_message_handle
  );


  DBMS_OUTPUT.PUT_LINE(
    '***** Browse message is [' || o_payload.message || ']****'
  );

end;

二、Java使用JMS監聽並處理Oracle AQ佇列

Java使用JMS進行相應的處理,需要使用Oracle提供的jar,在Oracle安裝目錄可以找到:在linux中可以使用find命令進行查詢,例如

find `pwd` -name 'jmscommon.jar'

需要的jar為:

  • app/oracle/product/12.1.0/dbhome_1/rdbms/jlib/jmscommon.jar
  • app/oracle/product/12.1.0/dbhome_1/jdbc/lib/ojdbc7.jar
  • app/oracle/product/12.1.0/dbhome_1/jlib/orai18n.jar
  • app/oracle/product/12.1.0/dbhome_1/jlib/jta.jar
  • app/oracle/product/12.1.0/dbhome_1/rdbms/jlib/aqapi_g.jar

1. 建立連線引數類

實際使用時可以把引數資訊配置在properties檔案中,使用Spring進行注入。

package org.kevin.jms;
/**
 * 
 *  連線引數資訊
 *
 */
public class JmsConfig {

    public String username = "c##kevin";
    public String password = "a111111111";
    public String jdbcUrl = "jdbc:oracle:thin:@127.0.0.1:1521:orcl";
    public String queueName = "demo_queue";
}

2. 建立訊息轉換類

因為訊息載荷是Oracle資料型別,需要提供一個轉換工廠類將Oracle型別轉換為Java型別。

package org.kevin.jms;

import java.sql.SQLException;

import oracle.jdbc.driver.OracleConnection;
import oracle.jdbc.internal.OracleTypes;
import oracle.jpub.runtime.MutableStruct;
import oracle.sql.CustomDatum;
import oracle.sql.CustomDatumFactory;
import oracle.sql.Datum;
import oracle.sql.STRUCT;

/**
 * 
 * 資料型別轉換類
 *
 */
@SuppressWarnings("deprecation")
public class QUEUE_MESSAGE_TYPE implements CustomDatum, CustomDatumFactory {
    public static final String _SQL_NAME = "QUEUE_MESSAGE_TYPE";
    public static final int _SQL_TYPECODE = OracleTypes.STRUCT;

    MutableStruct _struct;
    // 12表示字串
    static int[] _sqlType = { 12 };
    static CustomDatumFactory[] _factory = new CustomDatumFactory[1];
    static final QUEUE_MESSAGE_TYPE _MessageFactory = new QUEUE_MESSAGE_TYPE();

    public static CustomDatumFactory getFactory() {
        return _MessageFactory;
    }

    public QUEUE_MESSAGE_TYPE() {
        _struct = new MutableStruct(new Object[1], _sqlType, _factory);
    }

    public Datum toDatum(OracleConnection c) throws SQLException {
        return _struct.toDatum(c, _SQL_NAME);
    }

    public CustomDatum create(Datum d, int sqlType) throws SQLException {
        if (d == null)
            return null;
        QUEUE_MESSAGE_TYPE o = new QUEUE_MESSAGE_TYPE();
        o._struct = new MutableStruct((STRUCT) d, _sqlType, _factory);
        return o;
    }

    public String getContent() throws SQLException {
        return (String) _struct.getAttribute(0);
    }

}

3. 主類進行訊息處理

package org.kevin.jms;

import java.util.Properties;

import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;

import oracle.jms.AQjmsAdtMessage;
import oracle.jms.AQjmsDestination;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;

/**
 * 
 * 訊息處理類
 *
 */
public class Main {

    public static void main(String[] args) throws Exception {
        JmsConfig config = new JmsConfig();

        QueueConnectionFactory queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(config.jdbcUrl,
                new Properties());

        QueueConnection conn = queueConnectionFactory.createQueueConnection(config.username, config.password);
        AQjmsSession session = (AQjmsSession) conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

        conn.start();

        Queue queue = (AQjmsDestination) session.getQueue(config.username, config.queueName);
        MessageConsumer consumer = session.createConsumer(queue, null, QUEUE_MESSAGE_TYPE.getFactory(), null, false);

        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                System.out.println("ok");

                AQjmsAdtMessage adtMessage = (AQjmsAdtMessage) message;

                try {
                    QUEUE_MESSAGE_TYPE payload = (QUEUE_MESSAGE_TYPE) adtMessage.getAdtPayload();
                    System.out.println(payload.getContent());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        Thread.sleep(1000000);
    }

}

使用Oracle程式塊進行入隊操作,在沒有啟動Java時看到隊列表中存在資料。啟動Java後,控制檯正確的輸出的訊息;通過Oracle程式塊再次寫入訊息,發現控制檯正確處理訊息。Java的JMS監聽不是立刻進行處理,可能存在幾秒中的時間差,時間不等。

三、監控表記錄變化通知Java

下面的例子建立一個數據表,然後在表中新增觸發器,當資料變化後觸發器呼叫儲存過程給Oracle AQ傳送訊息,然後使用Java JMS對訊息進行處理。

1. 建立表

建立student表,包含username和age兩個子段,其中username時varchar2型別,age時number型別。

2. 建立儲存過程

建立send_aq_msg儲存過程,因為儲存過程中呼叫dbms資料包,系統包在儲存過程中執行需要進行授權(使用sys使用者進行授權):

grant execute on dbms_aq to c##kevin;
  • 1

注意儲存過程中包含commit語句。

create or replace 
PROCEDURE send_aq_msg (info IN VARCHAR2) as
  r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
  r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  v_message_handle RAW(16);
  o_payload demo_queue_payload_type;
begin
  o_payload := demo_queue_payload_type(info);

  dbms_aq.enqueue(
    queue_name  => 'demo_queue',
    enqueue_options => r_enqueue_options,
    message_properties => r_message_properties,
    payload => o_payload,
    msgid => v_message_handle
  );

  commit;
end send_aq_msg;

3. 建立觸發器

在student表中建立觸發器,當資料寫入或更新時,如果age=18,則進行入隊操作。需要呼叫儲存過程傳送訊息,但觸發器中不能包含事物提交語句,因此需要使用pragma autonomous_transaction;宣告自由事物:

CREATE OR REPLACE TRIGGER STUDENT_TR 
AFTER INSERT OR UPDATE OF AGE ON STUDENT FOR EACH ROW 
DECLARE
pragma autonomous_transaction;
BEGIN
  if :new.age = 18 then
      send_aq_msg(:new.username);  
  end if;  
END;

相關推薦

通過JMSOracle AQ資料變化觸發執行Java程式

環境說明 本實驗環境基於Oracle 12C和JDK1.8,其中Oracle 12C支援多租戶特性,相較於之前的Oracle版本,使用‘C##使用者名稱‘表示使用者,例如如果資料庫使用者叫kevin,則登陸時使用C##kevin進行登陸。 一、Oracle高階訊息

JMSOracle AQ

- 該文件中,jdk版本1.8,java專案為maven構建的springboot專案,並使用了定時任務來做AQ監聽的重連功能,解決由於外部原因導致連線斷裂之後,需要手動重啟專案才能恢復連線的問題 - [github原始碼位置](https://github.com/wangqq1217/oracleAQ-

oracle 百萬條資料 update所有記錄的 sql 執行效率問題

需求: 有一張臨時表 , 資料總數100w條, 其中 50w條 , state = 1 50w條 , state = 0 因為資料無用 ,

vue中通過watch資料變化帶來的效能優化

問題背景 為什麼要用 vuex? 在使用 Vue 進行元件化開發時,元件通訊是一個十分重要的部分。在 Vue 中,父子元件的關係可以總結為 父子元件通訊:父元件通過 props 向下傳遞資料給子元件 子父元件通訊:子元件通過 events 給父元件傳送訊息 使

Oracle資料庫非常慢基本hang住故障處理

測試人員郵件反饋: 訂購資料庫的連線非常慢,甚至是無法連線,想要我檢檢視看。 經檢視: [email protected]:~/app/admin/wdadb/adump> lsnrctl status LSNRCTL for Linux: Version

Android 通過註冊廣播實時網路連線與斷開狀態的變化

很多時候我們都需要實時監聽網路狀態,當網路狀態發生變化之後立即通知程式進行不同的操作。 監聽廣播的兩種方式: (1)在AndroidManifest.xml配置檔案中宣告 <receiver android:name=".NetworkConn

實戰Android:通過BroadcastReceiverHome電源Power和音量變化Volume鍵

上一個例子是採用AccessibilityService來實現按鍵的監聽。這次我們採用BroadcastReceiver來完成按鍵的監聽。 缺點:我嘗試了一下,暫時還不知道如何停止按 鍵的預設行為,比如我確實監聽到了電源按鍵,但卻沒法阻止此刻螢幕變黑的行為。先在這記下。以後

電話狀態的。響鈴靜止接起

tel mis one iss cal list ack res extend package com.sharpcj.telephonestatelistenerdemo; import android.content.Context; import android.

觀察者模式實際應用:線程意外退出線程後自動重啟

lee text 實時 之間 最終 ren tap instance and 摘要:  觀察者模式,定義對象之間的一種一對多的依賴關系,當對象的狀態發生改變時,所有依賴於它的對象都得到通知並且被自動更新。觀察者模式在JDK中有現成的實現,java.util.Obsera

ionic 頁面滾動點擊停止滾動

ret 有一個 state start lin 開始 sta 點擊 需要 類似今日頭條,頁面上有很多card,點擊每個card跳轉該card的詳情頁面。這裏有一個問題,當我滾動頁面時,會先後觸發touchstart、touchmove、touchend,但是當touchen

(二)僅僅通過Application用戶行為及App的在線狀態和在線

活躍 and rem HR 再看 andro void put 初始   先要實現功能,還是先從API去找。看看有沒有你想要的。這裏其實就是監聽App內activity的狀態。怎麽辦?   給個API所在地址:http://www.android-doc.com/refer

瀏覽器返回pushStatepopstate 事件window.history對象

當前 rep www 簡單介紹 itl 成了 stat 直接 clas 在WebApp或瀏覽器中,會有點擊返回、後退、上一頁等按鈕實現自己的關閉頁面、調整到指定頁面、確認離開頁面或執行一些其它操作的需求。可以使用 popstate 事件進行監聽返回、後退、上一頁操作。 一、

list-server資源修改自動重新整理瀏覽器

名稱:list-server 參考地址: http://blog.csdn.net/alabadazi/article/details/53334161 作用:輕量級的僅適用於開發 的 node 伺服器, 它僅支援 web app, 它能夠為你開啟瀏覽器, 當你的html或是Java

數據庫安全檢查是重點設置密碼

quit proto change rod 方法 1.2 uri tro oca Oracle 數據庫監聽的安全管理是比較容易忽略的一個問題,做一個測試禁用監聽的本地驗證功能,設置監聽密碼,數據庫版本為11.2.0.4 1、默認配置listener.ora LISTENER

Nginx同時IPV6+IPV4實現正向和反向

nginx.conf 配置如下 user root; worker_processes 8; #error_log /opt/server/department/nginx/logs/error.log warn; error_log /opt/server/department

vue 滑動事件是否滑動到dom元素的區域

//監聽滾動事件      window.addEventListener('scroll',that.handleScroll) //垂直滾動的值相容問題 let scrollTopE = window.pageYOffset || document.documentElement.scrollTop |

ReactNative Android返回鍵在某個頁面返回鍵退出應用

在之前專案中,在進行返回鍵退出應用時,應用的程式碼如下: componentWillMount() { if (Platform.OS === 'android') { BackHandler.addEventListener('hardwa

ionic 頁面滾動點選停止滾動

原文出處:https://www.cnblogs.com/lee-xiumei/p/7449021.html 類似今日頭條,頁面上有很多card,點選每個card跳轉該card的詳情頁面。這裡有一個問題,當我滾動頁面時,會先後觸發touchstart、touchmove、touchend,但是當

Python pygame事件使用者事件pygame.event.get()

demo.py(事件,監聽事件): import pygame pygame.init() # 初始化所有pygame模組 # 建立遊戲主視窗 480 * 700 screen = pygame.display.set_mode((480, 700)) # 建立

nginx相同埠根據域名請求不同的server

    #同時監聽相同埠,可以通過匹配server_name 來決定最終匹配哪個server     #server1     server {         listen       80;         #server_name  localhost;