环境
1,IBMMQ 9.1
2,SPRINGBOOT 2.3.1.RELEASE
依赖
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.allclient</artifactId>
<version>9.1.1.0</version>
</dependency>
发送消息
配置
package cn.sh.cares.exec.config;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;
import com.ibm.mq.jms.MQQueue;
import com.ibm.mq.jms.MQQueueConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import javax.jms.JMSException;
import static com.ibm.msg.client.wmq.common.CommonConstants.WMQ_CLIENT_RECONNECT;
@Slf4j
@Configuration
public class IbmMqConfig {
@Value("${ibm.host}")
private String host;
@Value("${ibm.port}")
private Integer port;
@Value("${ibm.channel}")
private String channel;
@Value("${ibm.queue_manager}")
private String queue_manager;
@Value("${ibm.userId}")
private String userId;
@Value("${ibm.password}")
private String password;
@Value("${ibm.ccsid}")
private String ccsid;
@Bean(name="mqQueueConnectionFactoryUp")
public MQQueueConnectionFactory mqQueueConnectionFactory(){
MQQueueConnectionFactory mqQueueConnectionFactory = new MQQueueConnectionFactory();
mqQueueConnectionFactory.setHostName(host);
try {
mqQueueConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
mqQueueConnectionFactory.setCCSID(Integer.valueOf(ccsid));
mqQueueConnectionFactory.setChannel(channel);
mqQueueConnectionFactory.setPort(port);
mqQueueConnectionFactory.setClientReconnectOptions(WMQ_CLIENT_RECONNECT);
mqQueueConnectionFactory.setClientReconnectTimeout(60);
mqQueueConnectionFactory.setQueueManager(queue_manager);
} catch (JMSException e) {
e.printStackTrace();
}
return mqQueueConnectionFactory;
}
@Bean(name="userCredentialsConnectionFactoryAdapterUp")
public UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter(@Qualifier("mqQueueConnectionFactoryUp") MQQueueConnectionFactory mqQueueConnectionFactory){
UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter = new UserCredentialsConnectionFactoryAdapter();
userCredentialsConnectionFactoryAdapter.setUsername(userId);
userCredentialsConnectionFactoryAdapter.setPassword(password);
userCredentialsConnectionFactoryAdapter.setTargetConnectionFactory(mqQueueConnectionFactory);
return userCredentialsConnectionFactoryAdapter;
}
@Bean(name="cachingConnectionFactory")
public CachingConnectionFactory cachingConnectionFactory(@Qualifier("userCredentialsConnectionFactoryAdapterUp") UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter){
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setTargetConnectionFactory(userCredentialsConnectionFactoryAdapter);
cachingConnectionFactory.setSessionCacheSize(5);
cachingConnectionFactory.setReconnectOnException(true);
return cachingConnectionFactory;
}
@Bean(name="jmsTransactionManager")
public PlatformTransactionManager jmsTransactionManager(CachingConnectionFactory cachingConnectionFactory){
JmsTransactionManager jmsTransactionManager = new JmsTransactionManager();
jmsTransactionManager.setConnectionFactory(cachingConnectionFactory);
return jmsTransactionManager;
}
@Bean(name="jmsOperations")
public JmsOperations jmsOperations(CachingConnectionFactory cachingConnectionFactory){
JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
jmsTemplate.setReceiveTimeout(60000);
return jmsTemplate;
}
}
发送消息
package cn.sh.cares.exec.service.impl;
import cn.sh.cares.exec.message.struct.Mqmessage;
import cn.sh.cares.exec.service.JaxbService;
import cn.sh.cares.exec.service.MessageSender;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.text.StringEscapeUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsOperations;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.jms.TextMessage;
import java.io.FileWriter;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Date;
@Slf4j
@Service
public class MessageSenderImpl implements MessageSender {
@Resource
private JmsOperations jmsOperations;
@Resource
private JaxbService jaxbService;
@Value("${apoi.log.msg}")
private String msgPath;
@Value("${ibm.queue_name}")
private String queueName;
@Override
public void send(Mqmessage mqmessage) {
log.debug("开始发送");
String msg = jaxbService.serializeMsg(mqmessage);
if (StringUtils.isEmpty(msg)) {
log.debug("消息为空");
return;
}
XxlJobLogger.log(StringEscapeUtils.escapeXml10(msg));
log.debug(msg);
try {
jmsOperations.send(queueName, session -> {
TextMessage textMessage = session.createTextMessage();
textMessage.setText(msg);
return textMessage;
});
} catch (Exception e) {
e.printStackTrace();
log.error("消息发送失败 msg={},error={}", msg, e.getMessage());
} finally {
writeToFile(msg);
}
}
private void writeToFile(String msg) {
Path path = Paths.get(msgPath, DateFormatUtils.format(new Date(), "yyyy-MM-dd"));
try {
if (!path.toFile().exists()) {
if (!path.toFile().getParentFile().exists()) {
path.toFile().getParentFile().mkdirs();
}
path.toFile().createNewFile();
}
FileWriter printWriter = new FileWriter(path.toFile(), true);
printWriter.append(msg).append("\n");
printWriter.flush();
printWriter.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
监听队列消息
监听配置
package cn.sh.cares.exec.config;
import cn.sh.cares.exec.service.impl.NtfmDlndErrorHandle;
import cn.sh.cares.exec.service.impl.NtfmDlndMsgErrorListener;
import cn.sh.cares.exec.service.impl.NtfmDlndMsgListener;
import com.ibm.mq.jms.MQQueue;
import com.ibm.mq.jms.MQQueueConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import javax.annotation.Resource;
import javax.jms.JMSException;
import static com.ibm.msg.client.wmq.common.CommonConstants.WMQ_CLIENT_RECONNECT_Q_MGR;
@Slf4j
@Configuration
public class IbmMqDownConfig {
@Value("${ibm.host.down}")
private String host;
@Value("${ibm.channel.down}")
private String channel;
@Value("${ibm.queue_manager.down}")
private String queue_manager;
@Value("${ibm.userId.down}")
private String userId;
@Value("${ibm.password.down}")
private String password;
@Value("${ibm.ccsid.down}")
private String ccsid;
@Resource
NtfmDlndMsgErrorListener ntfmDlndMsgErrorListener;
@Resource
NtfmDlndErrorHandle ntfmDlndErrorHandle;
/**=======================MQ 通道工厂============================**/
@Bean(name="mqQueueConnectionFactory")
public MQQueueConnectionFactory mqQueueConnectionFactory(){
MQQueueConnectionFactory mqQueueConnectionFactory = new MQQueueConnectionFactory();
try {
mqQueueConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
mqQueueConnectionFactory.setCCSID(Integer.valueOf(ccsid));
mqQueueConnectionFactory.setChannel(channel);
mqQueueConnectionFactory.setConnectionNameList(host);
mqQueueConnectionFactory.setClientReconnectOptions(WMQ_CLIENT_RECONNECT_Q_MGR);
mqQueueConnectionFactory.setQueueManager(queue_manager);
} catch (JMSException e) {
e.printStackTrace();
}
return mqQueueConnectionFactory;
}
@Bean(name="userCredentialsConnectionFactoryAdapter")
public UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter(@Qualifier("mqQueueConnectionFactory") MQQueueConnectionFactory mqQueueConnectionFactory){
UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter = new UserCredentialsConnectionFactoryAdapter();
userCredentialsConnectionFactoryAdapter.setUsername(userId);
userCredentialsConnectionFactoryAdapter.setPassword(password);
userCredentialsConnectionFactoryAdapter.setTargetConnectionFactory(mqQueueConnectionFactory);
return userCredentialsConnectionFactoryAdapter;
}
/**============================MQ 消息监听接收=============================**/
//队列连接
@Bean(name="FCTI")
public MQQueue mqueueFCTI(){
MQQueue mqQueue = new MQQueue();
try {
mqQueue.setBaseQueueName("FCTI.TO.ZSJN");
mqQueue.setBaseQueueManagerName(queue_manager);
} catch (JMSException e) {
e.printStackTrace();
}
return mqQueue;
}
//对队列进行监听
@Bean(name="FCTI-MsgContainer")
public DefaultMessageListenerContainer msgContainerFCTI(@Qualifier("userCredentialsConnectionFactoryAdapter") UserCredentialsConnectionFactoryAdapter adapter,
@Qualifier(value = "FCTI") MQQueue mqQueue,
NtfmDlndMsgListener listener){
DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
defaultMessageListenerContainer.setConnectionFactory(adapter);
defaultMessageListenerContainer.setDestination(mqQueue);
defaultMessageListenerContainer.setMessageListener(listener);
defaultMessageListenerContainer.setExceptionListener(ntfmDlndMsgErrorListener);
defaultMessageListenerContainer.setErrorHandler(ntfmDlndErrorHandle);
defaultMessageListenerContainer.setConcurrentConsumers(1);
defaultMessageListenerContainer.setConcurrency("1");
return defaultMessageListenerContainer;
}
@Bean(name="MDRS")
public MQQueue mqueueMDRS(){
MQQueue mqQueue = new MQQueue();
try {
mqQueue.setBaseQueueName("MDRS.TO.ZSJN");
mqQueue.setBaseQueueManagerName(queue_manager);
} catch (JMSException e) {
e.printStackTrace();
}
return mqQueue;
}
//对队列进行监听
@Bean(name="MDRS-MsgContainer")
public DefaultMessageListenerContainer msgContainerMDRS(@Qualifier("userCredentialsConnectionFactoryAdapter") UserCredentialsConnectionFactoryAdapter adapter,
@Qualifier(value = "MDRS") MQQueue mqQueue,
NtfmDlndMsgListener listener){
DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
defaultMessageListenerContainer.setConnectionFactory(adapter);
defaultMessageListenerContainer.setDestination(mqQueue);
defaultMessageListenerContainer.setMessageListener(listener);
defaultMessageListenerContainer.setExceptionListener(ntfmDlndMsgErrorListener);
defaultMessageListenerContainer.setErrorHandler(ntfmDlndErrorHandle);
defaultMessageListenerContainer.setConcurrentConsumers(1);
defaultMessageListenerContainer.setConcurrency("1");
return defaultMessageListenerContainer;
}
@Bean(name="PADR")
public MQQueue mqueuePADR(){
MQQueue mqQueue = new MQQueue();
try {
mqQueue.setBaseQueueName("PADR.TO.ZSJN");
mqQueue.setBaseQueueManagerName(queue_manager);
} catch (JMSException e) {
e.printStackTrace();
}
return mqQueue;
}
//对队列进行监听
@Bean(name="PADR-MsgContainer")
public DefaultMessageListenerContainer msgContainerPADR(@Qualifier("userCredentialsConnectionFactoryAdapter") UserCredentialsConnectionFactoryAdapter adapter,
@Qualifier(value = "PADR") MQQueue mqQueue,
NtfmDlndMsgListener listener){
DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
defaultMessageListenerContainer.setConnectionFactory(adapter);
defaultMessageListenerContainer.setDestination(mqQueue);
defaultMessageListenerContainer.setMessageListener(listener);
defaultMessageListenerContainer.setExceptionListener(ntfmDlndMsgErrorListener);
defaultMessageListenerContainer.setErrorHandler(ntfmDlndErrorHandle);
defaultMessageListenerContainer.setConcurrentConsumers(1);
defaultMessageListenerContainer.setConcurrency("1");
return defaultMessageListenerContainer;
}
@Bean(name="APFE")
public MQQueue mqueueAPFE(){
MQQueue mqQueue = new MQQueue();
try {
mqQueue.setBaseQueueName("APFE.TO.ZSJN");
mqQueue.setBaseQueueManagerName(queue_manager);
} catch (JMSException e) {
e.printStackTrace();
}
return mqQueue;
}
//对队列进行监听
@Bean(name="APFE-MsgContainer")
public DefaultMessageListenerContainer msgContainerAPFE(@Qualifier("userCredentialsConnectionFactoryAdapter") UserCredentialsConnectionFactoryAdapter adapter,
@Qualifier(value = "APFE") MQQueue mqQueue,
NtfmDlndMsgListener listener){
DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
defaultMessageListenerContainer.setConnectionFactory(adapter);
defaultMessageListenerContainer.setDestination(mqQueue);
defaultMessageListenerContainer.setMessageListener(listener);
defaultMessageListenerContainer.setExceptionListener(ntfmDlndMsgErrorListener);
defaultMessageListenerContainer.setErrorHandler(ntfmDlndErrorHandle);
defaultMessageListenerContainer.setConcurrentConsumers(1);
defaultMessageListenerContainer.setConcurrency("1");
return defaultMessageListenerContainer;
}
@Bean(name="FTMI")
public MQQueue mqueueFTMI(){
MQQueue mqQueue = new MQQueue();
try {
mqQueue.setBaseQueueName("FTMI.TO.ZSJN");
mqQueue.setBaseQueueManagerName(queue_manager);
} catch (JMSException e) {
e.printStackTrace();
}
return mqQueue;
}
//对队列进行监听
@Bean(name="FTMI-MsgContainer")
public DefaultMessageListenerContainer msgContainerFTMI(@Qualifier("userCredentialsConnectionFactoryAdapter") UserCredentialsConnectionFactoryAdapter adapter,
@Qualifier(value = "FTMI") MQQueue mqQueue,
NtfmDlndMsgListener listener){
DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
defaultMessageListenerContainer.setConnectionFactory(adapter);
defaultMessageListenerContainer.setDestination(mqQueue);
defaultMessageListenerContainer.setMessageListener(listener);
defaultMessageListenerContainer.setExceptionListener(ntfmDlndMsgErrorListener);
defaultMessageListenerContainer.setErrorHandler(ntfmDlndErrorHandle);
defaultMessageListenerContainer.setConcurrentConsumers(1);
defaultMessageListenerContainer.setConcurrency("1");
return defaultMessageListenerContainer;
}
}
消息处理
package cn.sh.cares.exec.service.impl;
import cn.sh.cares.exec.service.NtfmDlndMsgPersist;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.jms.*;
import java.nio.charset.StandardCharsets;
/**
* @author wangcj
* @desc
* @date 2021/5/10 15:11
**/
@Slf4j
@Service
public class NtfmDlndMsgListener implements MessageListener {
@Resource
NtfmDlndMsgPersist msgPersist;
@Override
public void onMessage(Message message) {
String str = null;
// 1、读取报文
try {
if (message instanceof BytesMessage) {
BytesMessage bm = (BytesMessage) message;
byte[] bys = null;
bys = new byte[(int) bm.getBodyLength()];
bm.readBytes(bys);
str = new String(bys, StandardCharsets.UTF_8);
} else {
str = ((TextMessage) message).getText();
}
} catch (JMSException e) {
e.printStackTrace();
}
log.debug("收到下行消息 ::{}", str);
// TODO 消息解析入库
try {
msgPersist.storeMsg(str);
} catch (Exception e) {
log.error("下行消息持久化异常", e);
}
}
}
断线重连测试
消息监听容器使用 DefaultMessageListenerContainer 而不是 SimpleMessageListenerContainer,
SimpleMessageListenerContainer 无法断线重连。
DefaultMessageListenerContainer 断线后默认是5秒重连一次,时间间隔可设置
单位 :毫秒
defaultMessageListenerContainer.setRecoveryInterval(6000);
搭建docker环境测试
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!