环境

1,IBMMQ 9.1

2,SPRINGBOOT 2.3.1.RELEASE

依赖

        <dependency>
            <groupId>com.ibm.mq</groupId>
            <artifactId>com.ibm.mq.allclient</artifactId>
            <version>9.1.1.0</version>
        </dependency>

发送消息

配置

package cn.sh.cares.exec.config;

import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;
import com.ibm.mq.jms.MQQueue;
import com.ibm.mq.jms.MQQueueConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.PlatformTransactionManager;

import javax.jms.JMSException;

import static com.ibm.msg.client.wmq.common.CommonConstants.WMQ_CLIENT_RECONNECT;


@Slf4j
@Configuration
public class IbmMqConfig {

    @Value("${ibm.host}")
    private String host;

    @Value("${ibm.port}")
    private Integer port;

    @Value("${ibm.channel}")
    private String channel;

    @Value("${ibm.queue_manager}")
    private String queue_manager;

    @Value("${ibm.userId}")
    private String userId;

    @Value("${ibm.password}")
    private String password;

    @Value("${ibm.ccsid}")
    private String ccsid;


    @Bean(name="mqQueueConnectionFactoryUp")
    public MQQueueConnectionFactory mqQueueConnectionFactory(){
        MQQueueConnectionFactory mqQueueConnectionFactory = new MQQueueConnectionFactory();
        mqQueueConnectionFactory.setHostName(host);
        try {
            mqQueueConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
            mqQueueConnectionFactory.setCCSID(Integer.valueOf(ccsid));
            mqQueueConnectionFactory.setChannel(channel);
            mqQueueConnectionFactory.setPort(port);
            mqQueueConnectionFactory.setClientReconnectOptions(WMQ_CLIENT_RECONNECT);
            mqQueueConnectionFactory.setClientReconnectTimeout(60);
            mqQueueConnectionFactory.setQueueManager(queue_manager);
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return mqQueueConnectionFactory;
    }
    @Bean(name="userCredentialsConnectionFactoryAdapterUp")
    public UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter(@Qualifier("mqQueueConnectionFactoryUp") MQQueueConnectionFactory mqQueueConnectionFactory){
        UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter = new UserCredentialsConnectionFactoryAdapter();
        userCredentialsConnectionFactoryAdapter.setUsername(userId);
        userCredentialsConnectionFactoryAdapter.setPassword(password);
        userCredentialsConnectionFactoryAdapter.setTargetConnectionFactory(mqQueueConnectionFactory);
        return userCredentialsConnectionFactoryAdapter;
    }

    @Bean(name="cachingConnectionFactory")
    public CachingConnectionFactory cachingConnectionFactory(@Qualifier("userCredentialsConnectionFactoryAdapterUp") UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter){
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setTargetConnectionFactory(userCredentialsConnectionFactoryAdapter);
        cachingConnectionFactory.setSessionCacheSize(5);
        cachingConnectionFactory.setReconnectOnException(true);
        return cachingConnectionFactory;
    }
    @Bean(name="jmsTransactionManager")
    public PlatformTransactionManager jmsTransactionManager(CachingConnectionFactory cachingConnectionFactory){
        JmsTransactionManager jmsTransactionManager = new JmsTransactionManager();
        jmsTransactionManager.setConnectionFactory(cachingConnectionFactory);
        return jmsTransactionManager;
    }
    @Bean(name="jmsOperations")
    public JmsOperations jmsOperations(CachingConnectionFactory cachingConnectionFactory){
        JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
        jmsTemplate.setReceiveTimeout(60000);
        return jmsTemplate;
    }

}

发送消息

package cn.sh.cares.exec.service.impl;

import cn.sh.cares.exec.message.struct.Mqmessage;
import cn.sh.cares.exec.service.JaxbService;
import cn.sh.cares.exec.service.MessageSender;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.text.StringEscapeUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsOperations;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.TextMessage;
import java.io.FileWriter;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Date;

@Slf4j
@Service
public class MessageSenderImpl implements MessageSender {

    @Resource
    private JmsOperations jmsOperations;

    @Resource
    private JaxbService jaxbService;

    @Value("${apoi.log.msg}")
    private String msgPath;

    @Value("${ibm.queue_name}")
    private String queueName;

    @Override
    public void send(Mqmessage mqmessage) {

        log.debug("开始发送");

        String msg = jaxbService.serializeMsg(mqmessage);

        if (StringUtils.isEmpty(msg)) {
            log.debug("消息为空");
            return;
        }

        XxlJobLogger.log(StringEscapeUtils.escapeXml10(msg));

        log.debug(msg);

        try {
            jmsOperations.send(queueName, session -> {
                TextMessage textMessage = session.createTextMessage();
                textMessage.setText(msg);
                return textMessage;
            });
        } catch (Exception e) {
            e.printStackTrace();
            log.error("消息发送失败 msg={},error={}", msg, e.getMessage());
        } finally {
            writeToFile(msg);
        }
    }

    private void writeToFile(String msg) {

        Path path = Paths.get(msgPath, DateFormatUtils.format(new Date(), "yyyy-MM-dd"));

        try {
            if (!path.toFile().exists()) {
                if (!path.toFile().getParentFile().exists()) {
                    path.toFile().getParentFile().mkdirs();
                }
                path.toFile().createNewFile();
            }
            FileWriter printWriter = new FileWriter(path.toFile(), true);
            printWriter.append(msg).append("\n");
            printWriter.flush();
            printWriter.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

监听队列消息

监听配置

package cn.sh.cares.exec.config;

import cn.sh.cares.exec.service.impl.NtfmDlndErrorHandle;
import cn.sh.cares.exec.service.impl.NtfmDlndMsgErrorListener;
import cn.sh.cares.exec.service.impl.NtfmDlndMsgListener;
import com.ibm.mq.jms.MQQueue;
import com.ibm.mq.jms.MQQueueConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

import javax.annotation.Resource;
import javax.jms.JMSException;

import static com.ibm.msg.client.wmq.common.CommonConstants.WMQ_CLIENT_RECONNECT_Q_MGR;


@Slf4j
@Configuration
public class IbmMqDownConfig {

    @Value("${ibm.host.down}")
    private String host;

    @Value("${ibm.channel.down}")
    private String channel;

    @Value("${ibm.queue_manager.down}")
    private String queue_manager;

    @Value("${ibm.userId.down}")
    private String userId;

    @Value("${ibm.password.down}")
    private String password;

    @Value("${ibm.ccsid.down}")
    private String ccsid;

    @Resource
    NtfmDlndMsgErrorListener ntfmDlndMsgErrorListener;

    @Resource
    NtfmDlndErrorHandle ntfmDlndErrorHandle;

    /**=======================MQ 通道工厂============================**/
    @Bean(name="mqQueueConnectionFactory")
    public MQQueueConnectionFactory mqQueueConnectionFactory(){
        MQQueueConnectionFactory mqQueueConnectionFactory = new MQQueueConnectionFactory();
        try {
            mqQueueConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
            mqQueueConnectionFactory.setCCSID(Integer.valueOf(ccsid));
            mqQueueConnectionFactory.setChannel(channel);
            mqQueueConnectionFactory.setConnectionNameList(host);
            mqQueueConnectionFactory.setClientReconnectOptions(WMQ_CLIENT_RECONNECT_Q_MGR);
            mqQueueConnectionFactory.setQueueManager(queue_manager);
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return mqQueueConnectionFactory;
    }
    @Bean(name="userCredentialsConnectionFactoryAdapter")
    public UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter(@Qualifier("mqQueueConnectionFactory") MQQueueConnectionFactory mqQueueConnectionFactory){
        UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter = new UserCredentialsConnectionFactoryAdapter();
        userCredentialsConnectionFactoryAdapter.setUsername(userId);
        userCredentialsConnectionFactoryAdapter.setPassword(password);
        userCredentialsConnectionFactoryAdapter.setTargetConnectionFactory(mqQueueConnectionFactory);
        return userCredentialsConnectionFactoryAdapter;
    }

    /**============================MQ 消息监听接收=============================**/
    //队列连接
    @Bean(name="FCTI")
    public MQQueue mqueueFCTI(){
        MQQueue mqQueue = new MQQueue();
        try {
            mqQueue.setBaseQueueName("FCTI.TO.ZSJN");
            mqQueue.setBaseQueueManagerName(queue_manager);
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return mqQueue;
    }

    //对队列进行监听
    @Bean(name="FCTI-MsgContainer")
    public DefaultMessageListenerContainer msgContainerFCTI(@Qualifier("userCredentialsConnectionFactoryAdapter") UserCredentialsConnectionFactoryAdapter adapter,
                                                       @Qualifier(value = "FCTI") MQQueue mqQueue,
                                                       NtfmDlndMsgListener listener){
        DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
        defaultMessageListenerContainer.setConnectionFactory(adapter);
        defaultMessageListenerContainer.setDestination(mqQueue);
        defaultMessageListenerContainer.setMessageListener(listener);
        defaultMessageListenerContainer.setExceptionListener(ntfmDlndMsgErrorListener);
        defaultMessageListenerContainer.setErrorHandler(ntfmDlndErrorHandle);
        defaultMessageListenerContainer.setConcurrentConsumers(1);
        defaultMessageListenerContainer.setConcurrency("1");
        return defaultMessageListenerContainer;
    }


    @Bean(name="MDRS")
    public MQQueue mqueueMDRS(){
        MQQueue mqQueue = new MQQueue();
        try {
            mqQueue.setBaseQueueName("MDRS.TO.ZSJN");
            mqQueue.setBaseQueueManagerName(queue_manager);
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return mqQueue;
    }

    //对队列进行监听
    @Bean(name="MDRS-MsgContainer")
    public DefaultMessageListenerContainer msgContainerMDRS(@Qualifier("userCredentialsConnectionFactoryAdapter") UserCredentialsConnectionFactoryAdapter adapter,
                                                       @Qualifier(value = "MDRS") MQQueue mqQueue,
                                                       NtfmDlndMsgListener listener){
        DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
        defaultMessageListenerContainer.setConnectionFactory(adapter);
        defaultMessageListenerContainer.setDestination(mqQueue);
        defaultMessageListenerContainer.setMessageListener(listener);
        defaultMessageListenerContainer.setExceptionListener(ntfmDlndMsgErrorListener);
        defaultMessageListenerContainer.setErrorHandler(ntfmDlndErrorHandle);
        defaultMessageListenerContainer.setConcurrentConsumers(1);
        defaultMessageListenerContainer.setConcurrency("1");
        return defaultMessageListenerContainer;
    }


    @Bean(name="PADR")
    public MQQueue mqueuePADR(){
        MQQueue mqQueue = new MQQueue();
        try {
            mqQueue.setBaseQueueName("PADR.TO.ZSJN");
            mqQueue.setBaseQueueManagerName(queue_manager);
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return mqQueue;
    }

    //对队列进行监听
    @Bean(name="PADR-MsgContainer")
    public DefaultMessageListenerContainer msgContainerPADR(@Qualifier("userCredentialsConnectionFactoryAdapter") UserCredentialsConnectionFactoryAdapter adapter,
                                                           @Qualifier(value = "PADR") MQQueue mqQueue,
                                                           NtfmDlndMsgListener listener){
        DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
        defaultMessageListenerContainer.setConnectionFactory(adapter);
        defaultMessageListenerContainer.setDestination(mqQueue);
        defaultMessageListenerContainer.setMessageListener(listener);
        defaultMessageListenerContainer.setExceptionListener(ntfmDlndMsgErrorListener);
        defaultMessageListenerContainer.setErrorHandler(ntfmDlndErrorHandle);
        defaultMessageListenerContainer.setConcurrentConsumers(1);
        defaultMessageListenerContainer.setConcurrency("1");
        return defaultMessageListenerContainer;
    }

    @Bean(name="APFE")
    public MQQueue mqueueAPFE(){
        MQQueue mqQueue = new MQQueue();
        try {
            mqQueue.setBaseQueueName("APFE.TO.ZSJN");
            mqQueue.setBaseQueueManagerName(queue_manager);
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return mqQueue;
    }

    //对队列进行监听
    @Bean(name="APFE-MsgContainer")
    public DefaultMessageListenerContainer msgContainerAPFE(@Qualifier("userCredentialsConnectionFactoryAdapter") UserCredentialsConnectionFactoryAdapter adapter,
                                                           @Qualifier(value = "APFE") MQQueue mqQueue,
                                                           NtfmDlndMsgListener listener){
        DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
        defaultMessageListenerContainer.setConnectionFactory(adapter);
        defaultMessageListenerContainer.setDestination(mqQueue);
        defaultMessageListenerContainer.setMessageListener(listener);
        defaultMessageListenerContainer.setExceptionListener(ntfmDlndMsgErrorListener);
        defaultMessageListenerContainer.setErrorHandler(ntfmDlndErrorHandle);
        defaultMessageListenerContainer.setConcurrentConsumers(1);
        defaultMessageListenerContainer.setConcurrency("1");
        return defaultMessageListenerContainer;
    }

    @Bean(name="FTMI")
    public MQQueue mqueueFTMI(){
        MQQueue mqQueue = new MQQueue();
        try {
            mqQueue.setBaseQueueName("FTMI.TO.ZSJN");
            mqQueue.setBaseQueueManagerName(queue_manager);
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return mqQueue;
    }

    //对队列进行监听
    @Bean(name="FTMI-MsgContainer")
    public DefaultMessageListenerContainer msgContainerFTMI(@Qualifier("userCredentialsConnectionFactoryAdapter") UserCredentialsConnectionFactoryAdapter adapter,
                                                            @Qualifier(value = "FTMI") MQQueue mqQueue,
                                                            NtfmDlndMsgListener listener){
        DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
        defaultMessageListenerContainer.setConnectionFactory(adapter);
        defaultMessageListenerContainer.setDestination(mqQueue);
        defaultMessageListenerContainer.setMessageListener(listener);
        defaultMessageListenerContainer.setExceptionListener(ntfmDlndMsgErrorListener);
        defaultMessageListenerContainer.setErrorHandler(ntfmDlndErrorHandle);
        defaultMessageListenerContainer.setConcurrentConsumers(1);
        defaultMessageListenerContainer.setConcurrency("1");
        return defaultMessageListenerContainer;
    }
}


消息处理

package cn.sh.cares.exec.service.impl;

import cn.sh.cares.exec.service.NtfmDlndMsgPersist;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.*;
import java.nio.charset.StandardCharsets;

/**
 * @author wangcj
 * @desc
 * @date 2021/5/10 15:11
 **/
@Slf4j
@Service
public class NtfmDlndMsgListener implements MessageListener {

    @Resource
    NtfmDlndMsgPersist msgPersist;

    @Override
    public void onMessage(Message message) {
        String str = null;
        // 1、读取报文
        try {
            if (message instanceof BytesMessage) {
                BytesMessage bm = (BytesMessage) message;
                byte[] bys = null;
                bys = new byte[(int) bm.getBodyLength()];
                bm.readBytes(bys);
                str = new String(bys, StandardCharsets.UTF_8);
            } else {
                str = ((TextMessage) message).getText();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }

        log.debug("收到下行消息 ::{}", str);

        // TODO 消息解析入库
        try {
            msgPersist.storeMsg(str);
        } catch (Exception e) {
            log.error("下行消息持久化异常", e);
        }
    }
}

断线重连测试

消息监听容器使用 DefaultMessageListenerContainer 而不是 SimpleMessageListenerContainer,
SimpleMessageListenerContainer 无法断线重连。

DefaultMessageListenerContainer 断线后默认是5秒重连一次,时间间隔可设置

单位 :毫秒
defaultMessageListenerContainer.setRecoveryInterval(6000);

搭建docker环境测试



中间件      IBMMQ 发送IBMMQ 监听IBMMQ

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!