之前通过java代码去发送这个变化,这个方案的潜在问题是java代码可能阻塞导致表的数据更新变慢。
当然也可以把变化通过 oracle java 代码发送到我们的队列中间件,但是数据库既然有,就用现成的。

获取相关权限

grant execute on DBMS_AQ to sia;
GRANT EXECUTE ANY PROCEDURE TO sia;
GRANT aq_administrator_role TO sia;
GRANT aq_user_role TO sia;
GRANT EXECUTE ON dbms_aqadm TO sia;
GRANT EXECUTE ON dbms_aq TO sia;
GRANT EXECUTE ON dbms_aqin TO sia;

创建类型

create type acdm_upload_queue_payload_type as object (
msgtype varchar2(20),
message varchar2(4000)
);

创建队列表

begin
  dbms_aqadm.create_queue_table(
    queue_table   => 'acdm_upload_queue_table',
    queue_payload_type => 'acdm_upload_queue_payload_type',
    multiple_consumers => false
  );
end;

创建队列

begin
  dbms_aqadm.create_queue (
    queue_name  => 'ntfm_acdm_queue',
    queue_table => 'acdm_upload_queue_table'
  );

  dbms_aqadm.start_queue(
    queue_name  =>  'ntfm_acdm_queue'
  );
end;

创建存储过程

create or replace procedure aq_send_msg(queueName in varchar2,msgType in varchar2,msg in varchar2) is
  r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
  r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  v_message_handle RAW(16);
  o_payload acdm_upload_queue_payload_type;

begin
   o_payload := acdm_upload_queue_payload_type(msgType,msg);

  dbms_aq.enqueue(
    queue_name  => queueName,
    enqueue_options => r_enqueue_options,
    message_properties => r_message_properties,
    payload => o_payload,
    msgid => v_message_handle
  );
end aq_send_msg;

创建触发器

如果只监控指定的列的数据的变化,可以列出要监控的列

CREATE OR REPLACE TRIGGER TRI_ACDM_TD_FLIGHT
    AFTER INSERT OR UPDATE OF PLEG_REGNO, PLEG_FLTNO, PLEG_PLANE_TYPE, PLEG_CDM_WAKE, PLEG_VIP, PLEG_TYPE, PLEG_TM_PTD, PLEG_CDM_TOBT, PLEG_PARK_ARR, PLEG_TM_ATA, PLEG_BAG_TRANS_BELT_ARR, PLEG_CDM_AOBT, PLEG_PARK_DEP, PLEG_BRD_GATE, PLEG_TM_ATD ON AOC.TD_PLAN_LEG
    FOR EACH ROW
DECLARE
    -- local variables here
BEGIN
    -- 发送空管局

    IF (:NEW.PLEG_CDM_TOBT != :OLD.PLEG_CDM_TOBT) THEN

        AQ_SEND_MSG('ntfm_acdm_queue',
                    'FCDM',
                    :NEW.PLEG_ID || ',' || :NEW.PLEG_AP_THR_DEP);

    END IF;

    AQ_SEND_MSG('ntfm_acdm_queue',
                'FLI',
                :NEW.PLEG_ID || ',' || :NEW.PLEG_AP_THR_DEP);
    -- 发送民航局
    -- aq_send_msg('osccaac_acdm_queue','FLI',:new.pleg_id||','|| :new.pleg_ap_thr_dep);
END TRI_ACDM_TD_FLIGHT;

清空队列

--- 清空队列


-- purge queue
DECLARE
    po_t dbms_aqadm.aq$_purge_options_t;
BEGIN
    dbms_aqadm.purge_queue_table('acdm_upload_queue_table', NULL, po_t);
END;

引入依赖

这几个依赖去oracle 安装目录下找

        <dependency>
            <groupId>com.oracle</groupId>
            <artifactId>jmscommon</artifactId>
            <version>1.0</version>
            <scope>system</scope>
            <systemPath>${project.basedir}/aqlib/jmscommon.jar</systemPath>
        </dependency>


        <dependency>
            <groupId>com.oracle</groupId>
            <artifactId>aqapi</artifactId>
            <version>1.0</version>
            <scope>system</scope>
            <systemPath>${project.basedir}/aqlib/aqapi_g.jar</systemPath>
        </dependency>


        <dependency>
            <groupId>com.oracle</groupId>
            <artifactId>jta</artifactId>
            <version>1.0</version>
            <scope>system</scope>
            <systemPath>${project.basedir}/aqlib/jta.jar</systemPath>
        </dependency>

数据类型转换类


public class OracleQueueMsgType implements CustomDatum, CustomDatumFactory {
    public static final String _SQL_NAME = "OracleQueueMsgType";
    public static final int _SQL_TYPECODE = OracleTypes.STRUCT;

    MutableStruct _struct;
    // 12表示字符串
    static int[] _sqlType = { 12,12 };
    static CustomDatumFactory[] _factory = new CustomDatumFactory[2];
    static final OracleQueueMsgType _MessageFactory = new OracleQueueMsgType();

    public static CustomDatumFactory getFactory() {
        return _MessageFactory;
    }

    public OracleQueueMsgType() {
        _struct = new MutableStruct(new Object[2], _sqlType, _factory);
    }

    public Datum toDatum(OracleConnection c) throws SQLException {
        return _struct.toDatum(c, _SQL_NAME);
    }

    public CustomDatum create(Datum d, int sqlType) throws SQLException {
        if (d == null)
            return null;
        OracleQueueMsgType o = new OracleQueueMsgType();
        o._struct = new MutableStruct((STRUCT) d, _sqlType, _factory);
        return o;
    }

    public AqMsg getContent() throws SQLException {
        String msgType = (String) _struct.getAttribute(0);
        String msg = (String) _struct.getAttribute(1);
        return new AqMsg(msgType,msg);
    }

}

Java 数据类型

@Data
@ToString
public class AqMsg {

    public AqMsg(String msgType, String msg) {
        this.msgType = msgType;
        this.msg = msg;
    }

    private String msgType;
    private String msg;
}

消息监听

public class OracleMessageListener implements MessageListener {


    @Resource
    FplnServiceImpl fplnService;

    @Resource
    FpdiServiceImpl fpdiService;

    @Resource
    FpaiServiceImpl fpaiService;

    @Override
    public void onMessage(Message message) {
        System.out.println("ok");

        AQjmsAdtMessage adtMessage = (AQjmsAdtMessage) message;

        try {
            OracleQueueMsgType payload = (OracleQueueMsgType) adtMessage.getAdtPayload();
            AqMsg aqMsg = payload.getContent();
            if (null == aqMsg) {
                return;
            }

            if (AqMsgTypeEnum.AMB.name().equals(aqMsg.getMsgType())) {
                log.debug("收到航班动态数据变更通知 ambid={}",aqMsg.getMsg());
                fplnService.processAmb(Long.valueOf(aqMsg.getMsg()));
            } else if (AqMsgTypeEnum.FLI.name().equals(aqMsg.getMsgType())) {
                log.debug("收到航班属性数据的更新通知 msg={}",aqMsg);
                processFlight(aqMsg.getMsg());

            } else if (AqMsgTypeEnum.GUA.name().equals(aqMsg.getMsgType())) {
                log.debug("收到航班保障数据的更新通知 msg={}",aqMsg);
                processFlight(aqMsg.getMsg());
            } else if (AqMsgTypeEnum.FCDM.name().equals(aqMsg.getMsgType())) {
                log.debug("收到航班保障数据的更新通知 msg={}",aqMsg);
                fpdiService.processFcdmById(Long.valueOf(aqMsg.getMsg().split(",")[0]));
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void processFlight(String msg) {
        String[] msgstr = msg.split(",");
        if (AIRPORT_CODE.equals(msgstr[1])) {
            fpdiService.processById(Long.valueOf(msgstr[0]));
        }else{
            fpaiService.processById(Long.valueOf(msgstr[0]));
        }
    }
}

oracle aq 断线重连

上述配置如果数据库因故障恢复后,代码无法自动重新连接需要重启应用程序

自定义消息鉴监听容器

/**
 * 可自动重连的消息监听容器
 */
public class CaacAqMessageListenerContainer extends DefaultMessageListenerContainer {

    @Override
    protected MessageConsumer createConsumer(Session session,
                                             Destination destination) throws JMSException {

        return ((AQjmsSession) session).createConsumer(destination,
                getMessageSelector(),
                new OracleQueueMsgType(), null,
                isPubSubNoLocal());
    }

}

重载 createConsumer 方法,使用aq的api创建consumer。因为我们发送队列的是一个二维数据的数据结构,必须指定 OracleQueueMsgType

设置监听配置


@Value("${spring.datasource.url}")
    private String jdbcUrl;
    @Value("${spring.datasource.username}")
    private String username;
    @Value("${spring.datasource.password}")
    private String password;
    private final String queueName = "osccaac_acdm_queue";

    @Resource
    private OracleMessageListener oracleMessageListener;

    @Bean
    public CaacAqMessageListenerContainer aqListenerContainer() throws JMSException {
        CaacAqMessageListenerContainer listenerContainer = new CaacAqMessageListenerContainer();
        AQjmsConnectionFactory connectionFactory = new AQjmsConnectionFactory();
        connectionFactory.setJdbcURL(jdbcUrl);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        listenerContainer.setConnectionFactory(connectionFactory);
        listenerContainer.setMessageListener(oracleMessageListener);
        listenerContainer.setConcurrentConsumers(10);
        listenerContainer.setDestinationName(queueName);
        return listenerContainer;
    }

这样即使数据库故障后,我们也能自动去重连不需要人工干预,提升了程序可用性



数据库      oracle aq

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!