1、配置
package cn.sh.cares.exec.config;
import cn.sh.cares.exec.constant.Constants;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class IbmMqConfig {
@Value("${ibm.host}")
private String host;
@Value("${ibm.port}")
private Integer port;
@Value("${ibm.channel}")
private String channel;
@Value("${ibm.queue_manager}")
private String queue_manager;
@Value("${ibm.userId}")
private String userId;
@Value("${ibm.password}")
private String password;
private String queue_name_upload = Constants.AIRPORT_ICAO_CODE + ".TO.NTFM";
private String queue_name_download = "NTFM.TO."+Constants.AIRPORT_ICAO_CODE;
@Bean
MQQueueManager mqQueueManager() {
MQQueueManager queueManager = null;
MQEnvironment.hostname = host;
MQEnvironment.port = port;
MQEnvironment.channel = channel;
MQEnvironment.userID = userId;
MQEnvironment.password = password;
try {
queueManager = new MQQueueManager(queue_manager);
} catch (Exception e) {
e.printStackTrace();
log.error("创建队列管理器失败");
System.exit(-1);
}
return queueManager;
}
@Bean
MQQueue putQueue(MQQueueManager queueManager) {
MQQueue putQueue = null;
try {
int openOptionsArg = CMQC.MQOO_OUTPUT|CMQC.MQOO_INQUIRE|CMQC.MQOO_INPUT_AS_Q_DEF ;
putQueue = queueManager.accessQueue(queue_name_upload, openOptionsArg);
} catch (Exception e) {
e.printStackTrace();
log.error("连接队列失败");
System.exit(-1);
}
return putQueue;
}
@Bean
MQQueue getQueue(MQQueueManager queueManager) {
MQQueue getQueue = null;
try {
int openOptionsArg = CMQC.MQOO_OUTPUT|CMQC.MQOO_INQUIRE|CMQC.MQOO_INPUT_AS_Q_DEF ;
getQueue = queueManager.accessQueue(queue_name_download, openOptionsArg);
} catch (Exception e) {
e.printStackTrace();
log.error("连接队列失败");
System.exit(-1);
}
return getQueue;
}
}
2、消息发送
package cn.sh.cares.exec.service.impl;
import cn.sh.cares.exec.message.struct.Mqmessage;
import cn.sh.cares.exec.service.JaxbService;
import cn.sh.cares.exec.service.MessageSender;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.text.StringEscapeUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.FileWriter;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Date;
@Slf4j
@Service
public class MessageSenderImpl implements MessageSender {
@Resource
private MQQueue putQueue;
@Resource
private JaxbService jaxbService;
@Value("${apoi.log.msg}")
private String msgPath;
@Override
public void send(Mqmessage mqmessage) {
log.debug("开始发送");
String msg = jaxbService.serializeMsg(mqmessage);
XxlJobLogger.log(StringEscapeUtils.escapeXml10(msg));
log.debug(msg);
writeToFile(msg);
MQMessage myMessage = new MQMessage();
try {
myMessage.writeUTF(msg);
MQPutMessageOptions pmo = new MQPutMessageOptions();
putQueue.put(myMessage, pmo);
} catch (Exception e) {
e.printStackTrace();
log.error("消息发送失败");
}
XxlJobLogger.log("发送消息成功");
log.debug("发送成功");
}
private void writeToFile(String msg) {
Path path = Paths.get(msgPath, DateFormatUtils.format(new Date(),"yyyy-MM-dd"));
try {
if (!path.toFile().exists()) {
if (!path.toFile().getParentFile().exists()) {
path.toFile().getParentFile().mkdirs();
}
path.toFile().createNewFile();
}
FileWriter printWriter = new FileWriter(path.toFile(),true);
printWriter.append(msg).append("\n");
printWriter.flush();
printWriter.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
3、循环读取消息
package cn.sh.cares.exec;
import cn.sh.cares.exec.service.JaxbService;
import cn.sh.cares.exec.service.impl.apoi.FplnServiceImpl;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import static com.ibm.mq.constants.CMQC.MQWI_UNLIMITED;
@SpringBootTest
class DataExchangeNtfmApplicationTests {
@Resource
MQQueue getQueue;
@Test
void getMsg() {
try {
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.waitInterval = MQWI_UNLIMITED;
int depth = putQueue.getCurrentDepth();
while (depth-- > 0) {
MQMessage retrievedMessage = new MQMessage();
putQueue.get(retrievedMessage, gmo);
String msg = retrievedMessage.readUTF();
System.out.println(msg);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!