之前通过java代码去发送这个变化,这个方案的潜在问题是java代码可能阻塞导致表的数据更新变慢。
当然也可以把变化通过 oracle java 代码发送到我们的队列中间件,但是数据库既然有,就用现成的。
获取相关权限
grant execute on DBMS_AQ to sia;
GRANT EXECUTE ANY PROCEDURE TO sia;
GRANT aq_administrator_role TO sia;
GRANT aq_user_role TO sia;
GRANT EXECUTE ON dbms_aqadm TO sia;
GRANT EXECUTE ON dbms_aq TO sia;
GRANT EXECUTE ON dbms_aqin TO sia;
创建类型
create type acdm_upload_queue_payload_type as object (
msgtype varchar2(20),
message varchar2(4000)
);
创建队列表
begin
dbms_aqadm.create_queue_table(
queue_table => 'acdm_upload_queue_table',
queue_payload_type => 'acdm_upload_queue_payload_type',
multiple_consumers => false
);
end;
创建队列
begin
dbms_aqadm.create_queue (
queue_name => 'ntfm_acdm_queue',
queue_table => 'acdm_upload_queue_table'
);
dbms_aqadm.start_queue(
queue_name => 'ntfm_acdm_queue'
);
end;
创建存储过程
create or replace procedure aq_send_msg(queueName in varchar2,msgType in varchar2,msg in varchar2) is
r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload acdm_upload_queue_payload_type;
begin
o_payload := acdm_upload_queue_payload_type(msgType,msg);
dbms_aq.enqueue(
queue_name => queueName,
enqueue_options => r_enqueue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
end aq_send_msg;
创建触发器
如果只监控指定的列的数据的变化,可以列出要监控的列
CREATE OR REPLACE TRIGGER TRI_ACDM_TD_FLIGHT
AFTER INSERT OR UPDATE OF PLEG_REGNO, PLEG_FLTNO, PLEG_PLANE_TYPE, PLEG_CDM_WAKE, PLEG_VIP, PLEG_TYPE, PLEG_TM_PTD, PLEG_CDM_TOBT, PLEG_PARK_ARR, PLEG_TM_ATA, PLEG_BAG_TRANS_BELT_ARR, PLEG_CDM_AOBT, PLEG_PARK_DEP, PLEG_BRD_GATE, PLEG_TM_ATD ON AOC.TD_PLAN_LEG
FOR EACH ROW
DECLARE
-- local variables here
BEGIN
-- 发送空管局
IF (:NEW.PLEG_CDM_TOBT != :OLD.PLEG_CDM_TOBT) THEN
AQ_SEND_MSG('ntfm_acdm_queue',
'FCDM',
:NEW.PLEG_ID || ',' || :NEW.PLEG_AP_THR_DEP);
END IF;
AQ_SEND_MSG('ntfm_acdm_queue',
'FLI',
:NEW.PLEG_ID || ',' || :NEW.PLEG_AP_THR_DEP);
-- 发送民航局
-- aq_send_msg('osccaac_acdm_queue','FLI',:new.pleg_id||','|| :new.pleg_ap_thr_dep);
END TRI_ACDM_TD_FLIGHT;
清空队列
--- 清空队列
-- purge queue
DECLARE
po_t dbms_aqadm.aq$_purge_options_t;
BEGIN
dbms_aqadm.purge_queue_table('acdm_upload_queue_table', NULL, po_t);
END;
引入依赖
这几个依赖去oracle 安装目录下找
<dependency>
<groupId>com.oracle</groupId>
<artifactId>jmscommon</artifactId>
<version>1.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/aqlib/jmscommon.jar</systemPath>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>aqapi</artifactId>
<version>1.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/aqlib/aqapi_g.jar</systemPath>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>jta</artifactId>
<version>1.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/aqlib/jta.jar</systemPath>
</dependency>
数据类型转换类
public class OracleQueueMsgType implements CustomDatum, CustomDatumFactory {
public static final String _SQL_NAME = "OracleQueueMsgType";
public static final int _SQL_TYPECODE = OracleTypes.STRUCT;
MutableStruct _struct;
// 12表示字符串
static int[] _sqlType = { 12,12 };
static CustomDatumFactory[] _factory = new CustomDatumFactory[2];
static final OracleQueueMsgType _MessageFactory = new OracleQueueMsgType();
public static CustomDatumFactory getFactory() {
return _MessageFactory;
}
public OracleQueueMsgType() {
_struct = new MutableStruct(new Object[2], _sqlType, _factory);
}
public Datum toDatum(OracleConnection c) throws SQLException {
return _struct.toDatum(c, _SQL_NAME);
}
public CustomDatum create(Datum d, int sqlType) throws SQLException {
if (d == null)
return null;
OracleQueueMsgType o = new OracleQueueMsgType();
o._struct = new MutableStruct((STRUCT) d, _sqlType, _factory);
return o;
}
public AqMsg getContent() throws SQLException {
String msgType = (String) _struct.getAttribute(0);
String msg = (String) _struct.getAttribute(1);
return new AqMsg(msgType,msg);
}
}
Java 数据类型
@Data
@ToString
public class AqMsg {
public AqMsg(String msgType, String msg) {
this.msgType = msgType;
this.msg = msg;
}
private String msgType;
private String msg;
}
消息监听
public class OracleMessageListener implements MessageListener {
@Resource
FplnServiceImpl fplnService;
@Resource
FpdiServiceImpl fpdiService;
@Resource
FpaiServiceImpl fpaiService;
@Override
public void onMessage(Message message) {
System.out.println("ok");
AQjmsAdtMessage adtMessage = (AQjmsAdtMessage) message;
try {
OracleQueueMsgType payload = (OracleQueueMsgType) adtMessage.getAdtPayload();
AqMsg aqMsg = payload.getContent();
if (null == aqMsg) {
return;
}
if (AqMsgTypeEnum.AMB.name().equals(aqMsg.getMsgType())) {
log.debug("收到航班动态数据变更通知 ambid={}",aqMsg.getMsg());
fplnService.processAmb(Long.valueOf(aqMsg.getMsg()));
} else if (AqMsgTypeEnum.FLI.name().equals(aqMsg.getMsgType())) {
log.debug("收到航班属性数据的更新通知 msg={}",aqMsg);
processFlight(aqMsg.getMsg());
} else if (AqMsgTypeEnum.GUA.name().equals(aqMsg.getMsgType())) {
log.debug("收到航班保障数据的更新通知 msg={}",aqMsg);
processFlight(aqMsg.getMsg());
} else if (AqMsgTypeEnum.FCDM.name().equals(aqMsg.getMsgType())) {
log.debug("收到航班保障数据的更新通知 msg={}",aqMsg);
fpdiService.processFcdmById(Long.valueOf(aqMsg.getMsg().split(",")[0]));
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void processFlight(String msg) {
String[] msgstr = msg.split(",");
if (AIRPORT_CODE.equals(msgstr[1])) {
fpdiService.processById(Long.valueOf(msgstr[0]));
}else{
fpaiService.processById(Long.valueOf(msgstr[0]));
}
}
}
oracle aq 断线重连
上述配置如果数据库因故障恢复后,代码无法自动重新连接需要重启应用程序
自定义消息鉴监听容器
/**
* 可自动重连的消息监听容器
*/
public class CaacAqMessageListenerContainer extends DefaultMessageListenerContainer {
@Override
protected MessageConsumer createConsumer(Session session,
Destination destination) throws JMSException {
return ((AQjmsSession) session).createConsumer(destination,
getMessageSelector(),
new OracleQueueMsgType(), null,
isPubSubNoLocal());
}
}
重载 createConsumer 方法,使用aq的api创建consumer。因为我们发送队列的是一个二维数据的数据结构,必须指定 OracleQueueMsgType
设置监听配置
@Value("${spring.datasource.url}")
private String jdbcUrl;
@Value("${spring.datasource.username}")
private String username;
@Value("${spring.datasource.password}")
private String password;
private final String queueName = "osccaac_acdm_queue";
@Resource
private OracleMessageListener oracleMessageListener;
@Bean
public CaacAqMessageListenerContainer aqListenerContainer() throws JMSException {
CaacAqMessageListenerContainer listenerContainer = new CaacAqMessageListenerContainer();
AQjmsConnectionFactory connectionFactory = new AQjmsConnectionFactory();
connectionFactory.setJdbcURL(jdbcUrl);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
listenerContainer.setConnectionFactory(connectionFactory);
listenerContainer.setMessageListener(oracleMessageListener);
listenerContainer.setConcurrentConsumers(10);
listenerContainer.setDestinationName(queueName);
return listenerContainer;
}
这样即使数据库故障后,我们也能自动去重连不需要人工干预,提升了程序可用性
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!