1、配置

package cn.sh.cares.exec.config;

import cn.sh.cares.exec.constant.Constants;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Slf4j
@Configuration
public class IbmMqConfig {

    @Value("${ibm.host}")
    private String host;

    @Value("${ibm.port}")
    private Integer port;

    @Value("${ibm.channel}")
    private String channel;

    @Value("${ibm.queue_manager}")
    private String queue_manager;

    @Value("${ibm.userId}")
    private String userId;

    @Value("${ibm.password}")
    private String password;

    private String queue_name_upload = Constants.AIRPORT_ICAO_CODE + ".TO.NTFM";

    private String queue_name_download = "NTFM.TO."+Constants.AIRPORT_ICAO_CODE;


    @Bean
    MQQueueManager mqQueueManager() {
        MQQueueManager queueManager = null;
        MQEnvironment.hostname = host;
        MQEnvironment.port = port;
        MQEnvironment.channel = channel;
        MQEnvironment.userID = userId;
        MQEnvironment.password = password;

        try {
            queueManager = new MQQueueManager(queue_manager);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("创建队列管理器失败");
            System.exit(-1);
        }

        return queueManager;
    }


    @Bean
    MQQueue putQueue(MQQueueManager queueManager) {
        MQQueue putQueue = null;

        try {
            int openOptionsArg = CMQC.MQOO_OUTPUT|CMQC.MQOO_INQUIRE|CMQC.MQOO_INPUT_AS_Q_DEF ;
            putQueue = queueManager.accessQueue(queue_name_upload, openOptionsArg);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("连接队列失败");
            System.exit(-1);
        }

        return putQueue;
    }


    @Bean
    MQQueue getQueue(MQQueueManager queueManager) {
        MQQueue getQueue = null;

        try {
            int openOptionsArg = CMQC.MQOO_OUTPUT|CMQC.MQOO_INQUIRE|CMQC.MQOO_INPUT_AS_Q_DEF ;
            getQueue = queueManager.accessQueue(queue_name_download, openOptionsArg);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("连接队列失败");
            System.exit(-1);
        }

        return getQueue;
    }

}

2、消息发送

package cn.sh.cares.exec.service.impl;

import cn.sh.cares.exec.message.struct.Mqmessage;
import cn.sh.cares.exec.service.JaxbService;
import cn.sh.cares.exec.service.MessageSender;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.text.StringEscapeUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.io.FileWriter;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Date;

@Slf4j
@Service
public class MessageSenderImpl implements MessageSender {

    @Resource
    private MQQueue putQueue;

    @Resource
    private JaxbService jaxbService;

    @Value("${apoi.log.msg}")
    private String msgPath;

    @Override
    public void send(Mqmessage mqmessage) {

        log.debug("开始发送");

        String msg = jaxbService.serializeMsg(mqmessage);

        XxlJobLogger.log(StringEscapeUtils.escapeXml10(msg));
        log.debug(msg);

        writeToFile(msg);
        MQMessage myMessage = new MQMessage();
        try {
            myMessage.writeUTF(msg);
            MQPutMessageOptions pmo = new MQPutMessageOptions();
            putQueue.put(myMessage, pmo);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("消息发送失败");
        }

        XxlJobLogger.log("发送消息成功");
        log.debug("发送成功");
    }

    private void writeToFile(String msg) {

        Path path = Paths.get(msgPath, DateFormatUtils.format(new Date(),"yyyy-MM-dd"));

        try {
            if (!path.toFile().exists()) {
                if (!path.toFile().getParentFile().exists()) {
                    path.toFile().getParentFile().mkdirs();
                }
                path.toFile().createNewFile();
            }
            FileWriter printWriter = new FileWriter(path.toFile(),true);
            printWriter.append(msg).append("\n");
            printWriter.flush();
            printWriter.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3、循环读取消息

package cn.sh.cares.exec;

import cn.sh.cares.exec.service.JaxbService;
import cn.sh.cares.exec.service.impl.apoi.FplnServiceImpl;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import static com.ibm.mq.constants.CMQC.MQWI_UNLIMITED;

@SpringBootTest
class DataExchangeNtfmApplicationTests {

    @Resource
    MQQueue getQueue;

    @Test
    void getMsg() {

        try {
            MQGetMessageOptions gmo = new MQGetMessageOptions();
            gmo.waitInterval = MQWI_UNLIMITED;
            int depth = putQueue.getCurrentDepth();
            while (depth-- > 0) {
                MQMessage retrievedMessage = new MQMessage();
                putQueue.get(retrievedMessage, gmo);
                String msg = retrievedMessage.readUTF();
                System.out.println(msg);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}



中间件      IBMMQ 消息读取

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!