1、创建 SimpleMessageListenerContainer
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory,
StoreMessageListener storeMessageListener,
MqMsgConvertor mqMsgConvertor) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
// 从数据库查出_IN 结尾的队列名
container.setQueueNames("test");
container.setExposeListenerChannel(true);
container.setPrefetchCount(100);
// 设置为数据库队列个数
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
MessageListenerAdapter adapter = new MessageListenerAdapter(storeMessageListener);
adapter.setMessageConverter(mqMsgConvertor);
container.setMessageListener(adapter);
return container;
}
其中在监听适配器 MessageListenerAdapter 包装自定义的消息监听 storeMessageListener
适配器实现了消息监听接口 onmessage 方法,然后进行消息转换,通过反射执行我们定义的消息监听
所以要实现消息转换功能,我们定义的监听不能实现rabbit监听接口,只需定义一个普通类即可
2、自定义消息监听
MessageListenerAdapter 默认会调用我们自定义监听的类的 handleMessage方法,这个在MessageListenerAdapter 适配器是默认方法,可以设置为其他的
public class StoreMessageListener {
@Autowired
private MsgFilterChain filterChain;
public void handleMessage(MqMessage request) {
if (null == request) {
log.debug("消息格式错误,不做处理");
return;
}
filterChain.doFilter(request,new MqMessage());
}
}
3、消息转换类
public class MqMsgConvertor implements MessageConverter {
Jaxb2Marshaller jaxb2Marshaller;
public MqMsgConvertor(Jaxb2Marshaller jaxb2Marshaller) {
this.jaxb2Marshaller = jaxb2Marshaller;
}
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
messageProperties.setContentType(MediaType.APPLICATION_XML_VALUE);
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
jaxb2Marshaller.marshal(o,new StreamResult(arrayOutputStream));
Message message = new Message(arrayOutputStream.toByteArray(),messageProperties);
return message;
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
try {
Object o = jaxb2Marshaller.unmarshal(new StreamSource(new ByteArrayInputStream(message.getBody())));
return o;
} catch (Exception e) {
log.error("消息转换错误,消息格式不符合定义",e);
}
return null;
}
}
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!