1、创建 SimpleMessageListenerContainer

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory,
                                                                   StoreMessageListener storeMessageListener,
                                                                   MqMsgConvertor mqMsgConvertor) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        // 从数据库查出_IN 结尾的队列名
        container.setQueueNames("test");
        container.setExposeListenerChannel(true);
        container.setPrefetchCount(100);
        // 设置为数据库队列个数
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        MessageListenerAdapter adapter = new MessageListenerAdapter(storeMessageListener);
        adapter.setMessageConverter(mqMsgConvertor);
        container.setMessageListener(adapter);
        return container;
    }

其中在监听适配器 MessageListenerAdapter 包装自定义的消息监听 storeMessageListener
适配器实现了消息监听接口 onmessage 方法,然后进行消息转换,通过反射执行我们定义的消息监听

所以要实现消息转换功能,我们定义的监听不能实现rabbit监听接口,只需定义一个普通类即可

2、自定义消息监听

MessageListenerAdapter 默认会调用我们自定义监听的类的 handleMessage方法,这个在MessageListenerAdapter 适配器是默认方法,可以设置为其他的

public class StoreMessageListener  {

    @Autowired
    private MsgFilterChain filterChain;

    public void handleMessage(MqMessage request) {
        if (null == request) {
            log.debug("消息格式错误,不做处理");
            return;
        }

        filterChain.doFilter(request,new MqMessage());
    }

}

3、消息转换类

public class MqMsgConvertor implements MessageConverter {

    Jaxb2Marshaller jaxb2Marshaller;

    public MqMsgConvertor(Jaxb2Marshaller jaxb2Marshaller) {
        this.jaxb2Marshaller = jaxb2Marshaller;
    }

    @Override
    public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
        messageProperties.setContentType(MediaType.APPLICATION_XML_VALUE);
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
        jaxb2Marshaller.marshal(o,new StreamResult(arrayOutputStream));
        Message message = new Message(arrayOutputStream.toByteArray(),messageProperties);
        return message;
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        try {
            Object o = jaxb2Marshaller.unmarshal(new StreamSource(new ByteArrayInputStream(message.getBody())));
            return o;
        } catch (Exception e) {
            log.error("消息转换错误,消息格式不符合定义",e);
        }
        return null;
    }
}


中间件      rabbitmq

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!