Spring AMQP是用于以AMQP为基础的MQ中间件的解决方案,对于底层API进行封装,使开发者在对MQ进行操作时更加易用,并进行了一些拓展与优化。
这里我们主要使用RabbitMQ与SpringAMQP整合,对于其他以AMQP规范制作的中间件也是差不多的整合方式。
SpringAMQP核心内容
RabbitAdmin
:管控组件RabbitTemplate
:消息模板组件SimpleMessageListenerContainer
:简单消息监听容器MessageListenerAdapter
:消息适配器MessageConverter
:消息转换器
接下来,我们就需要对这些核心内容进行研究
RabbitAdmin
RabbitAdmin
的主要作用就是方便的操作Exchange,Queue,Binding这些信息
使用它需要几点注意:
- 需要将
RabbitAdmin
注入到上下文中,注入的前提是ConnectionFactory
在上下文中,所以要先注入ConnectionFactory
- 注入时要设置
autoStartup=true
,不然不会加载RabbitAdmin
快速入门
添加依赖
需要使用spring的amqp依赖,这里为了方便,使用的springboot的工程
1 | <dependency> |
注入RabbitAdmin
1 |
|
这里,我直接写在启动器类了,注入RabbitAdmin
前提是注入ConnectionFactory
,这里使用的spring.amqp
下的而不是之前使用的rabbit
下的ConnectionFactory
需要注意的是 rabbitAdmin.setAutoStartup(true);
要设置为true,不然不会被加载此类
测试
接着我们在Test类进行测试,简单的创建一个交换机,试试是否注入成功
1 | .class) (SpringRunner |
这里使用SpringBoot的Test类,然后运行,查看控制台
成功创建,说明RabbitAdmin
注入成功
RabbiAdmin基本操作
针对Exchange
1 |
|
具有声明以及删除的操作
针对Queue
1 |
|
这里我们获取了Queue中的信息,主要获取到了消费者数量,信息数量,队列名,如下图:
针对Binding
1 |
|
主要就是绑定和解绑
注解声明
在刚开始学习RabbitMQ的时候,使用最基础的API
进行创建Exchange,Queue或Binding
1 | channel.exchangeDeclare(exchange,"topic",true); |
通过上面的学习,我们使用RabbitAdmin
进行创建
1 | rabbitAdmin.declareExchange(DirectExchange); |
那么有没有更简单的创建方式呢?
就是使用@Bean
注解进行声明,放入到容器中,然后配合RabbitAdmin
会自动进行加载并统一创建
1 |
|
这里我创建一个RabbitBean
类,并添加@Component
注解,使之注入到上下文容器中。
然后再到Test测试类,随便执行一条RabbitAdmin
的语句,如下:
1 |
|
这时,回到控制台,会发现已经创建成功,并绑定了关联关系
注意:必须使用rabbitAdmin
执行某个方法,才能成功创建
为什么要这样呢?我们进入源码探究
源码分析
首先进入到RabbitAdmin
类中
可以发现之前那些创建Exchange,Queue的方法,都是先调用RabbitTemplate
,由RabbitTemplate
创建Channel
再返回RabbitAdmin
的ChannelCallback()
回调函数
比如如下的回调函数:
最终调用的方法和我们一开始单独使用rabbitmq时一样
接着我们看看RabbitAdmin
是如何自动创建被@Bean
声明的交换机队列的
1 | public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, ApplicationEventPublisherAware, |
首先在类的实现接口中,就能发现端倪,InitializingBean
,初始化Bean,按理说要自动创建肯定是初始化的时候,那么和这个接口肯定息息相关
1 | public interface InitializingBean { |
这个类中只有一个方法,就是初始化之后执行的方法,再回到RabbitAdmin
中看看这个方法
1 |
|
这里主要是添加一个connection
监听器,并当监听器连接时,调用initialize()
方法,然后我们再进入这个方法看看
1 | public void initialize() { |
通过这段源码的查看,就清晰明了了,会将applicationcontext
中的所有Exchange,Queue,Binding类型的对象进行取出,再统一通过channel
进行创建。
这里又回到我们刚刚的问题了,为何必须使用RabbitAdmin
执行某一个方法,才能创建?
- 首先通过源码可以知道,想要创建,必须由监听器监听到
connection
连接,才会走初始化方法 - 那么可以确定就是如果不使用RabbitAdmin执行某个方法,那么它不会进行
connection
连接,所以这里我大胆猜测,RabbitAdmin
是懒加载的,只有使用到它,它才会进行连接 - 所以我们必须要执行某个方法,让他进行连接,才能进行声明式
Bean
的创建
RabbitTemplate
RabbitTemplate
是一个非常好用的消息模板组件,它是非常重要的消息发送类,有大量不同的send API
,可以非常灵活的进行消息发送。
他还提供了可靠性投递,回调监听ConfirmCallBack
,返回确认ReturnCallBack
等方法,来实现Confirm
与Return
注入到容器中
RabbitTemplate和RabbitAdmin一样,都需要注入到上下文里
1 |
|
这里可以进行confirm已经return方法的编写,如下
这里我就不做添加了。
发送消息
首先可以看到RabbitTemplate有重载了大量的发送消息方法,进行灵活多样的操作
接下来我们通过RabbitTemplate进行发消息
1 | .class) (SpringRunner |
这里我们使用了几种常用发送方式,但其实它还有更多更灵活的发送方式
convertAndSend(String exchange, String routingKey, final Object object)
:简单的发送方式,exchange,routingkey进行对应,最后消息的类型可以是多种多样的,底层进行转换,字符串或者Message
类封装convertAndSend(String exchange, String routingKey, final Object message,final MessagePostProcessor messagePostProcessor)
:添加一个后置处理器,根据需求处理send(final String exchange, final String routingKey, final Message message)
:send
方法的不同是必须是Message
类型的参数,而convertAndSend
是Object
发送完可以看到有4条消息,点进去详细查看
这里可以看到,第二第三条消息都添加了后置处理的header
头,是因为在第二条消息发送的时候,那个MessagePostProcessor
方法中,就已经在Message
对象添加header
头了,所以在第三条消息发送时,并不是他也进行了后置方法,只是Message
对象还是用的原来的,导致也出现postProcess Msg
(如果图片看不清,可以右键在新建标签页打开查看)