一、管理台
我们先介绍一下管理台
从这张图右上角可以看到用户信息,版本信息与登出。
紧接着有6个标签
Connection
连接信息
因为我们没有任何连接,所以是空的,后面连接生产者和消费者时再看
Channels
信道信息
也没有任何信息,因为信道是基于连接的
Exchanges
交换机,生产者投递消息到这里
这里可以看到默认有多个交换机,并且有四种类型,对于其类型我们后面讲解
Features里有D和I的信息
- D:
durable=true
,即持久化,就是MQ如果停掉,这些交换机都不会被清除,一直保留 - I:
internal=true
,一个特征,后面详说
Queue
MQ队列
也是为空的
Admin
可以进行用户,虚拟机,以及权限的操作,比如添加用户,添加虚拟机,以及权限配置
Overview
MQ概览,大体信息都在这里
MQ中连接,信道等合计
节点信息,内存占用,磁盘占用等等
保存路径,比如配置文件路径,数据存储的路径,日志的保存
端口号信息
最后这个很重要,MQ信息的导入导出,当我们集群的服务器想升级,或者迁移时,比如centos6升centos7。
如果慢慢调整要耗费很长时间,使用MQ信息导入导出,可以节省大量时间,只需对某些升级所需改动的配置改造即可。
导出后是json文件信息,如下,保存了比如用户,虚拟主机,包括exchange,binding,queue,该有的都有
二、极速入门
这里我们使用SpringBoot快速搭建一个生产者与消费者,并配合Rabbitmq进行消费。
对于SpringBoot如何搭建就不介绍了,主要使用依赖:
1 | <dependency> |
Provider
1 | public class Provider { |
消息生产者/提供方 代码比较简单,主要是建立连接,建立通道,然后进行消息的投递。
注意最后要关闭连接。
Consumer
1 | public class Consumer { |
Consumer的代码就比较多一点了,Rabbitmq在消息提供方不需要创建提供者,但在消费方需要创建消费者,通道设置并进行消费。
还有一点是队列的创建,因为现在MQ中队列是空的,所以在消费方创建一个队列,如果有相同名称的队列,便不会创建,没有会自动创建。
测试
这里先启动Consumer,因为先启动Provider的话,没有test01
这个队列,就无法投递消息
启动Consumer后,会一直处在监听状态,查看一下MQ管理台
出现了连接,队列,信道等等信息,等待消息的产生并消费
这时开启Provider
瞬间执行完毕,并且消费方获取5条消息并打印
控制台也有消息消费的折线图
这里有一个问题,我们没有Exchange
,那么消息是如何投递并消费的呢?
Default Exchange
在控制台可以清楚看到一个default
交换机,进去看看他的说明
意思就是,在提供方没有指明使用哪个Exchange
时,默认投递到这个Exchange
上,并且路由规则是,路由键的名称和队列名称相同,便可成功路由消费,这也是我们没有指定Exchange
缺能消费成功的原因。
在上面的代码中也可以发现,routingKey
和queueName
是相同的,都为test01
三、Exchange
在上一节快速入门中,我们使用了默认的Exchange,路由规则是路由键名称和队列名相同。
这一节,我们会详细介绍三大Exchange类型
Direct
英文翻译为直接,这个Exchange就和其名字一样,是直连的,也是RabbitMQ默认的Exchange类型,完全根据RoutingKey来路由消息。
消费者设置Exchange的Queue和Binding时要指定RoutingKey
,生产者提供消息时要指定相同的RoutingKey
,才能使消息正确被消费。
上一节使用的Default Exchange
就是Direct类型,但他比较特殊,没有Binding操作,必须RoutingKey
与QueueName
完全匹配才可以。
代码理解
消费者Consumer,这里对于信道连接的创建代码就不再重复了
1 | // 声明Exchange,QueueName,RoutingKey的设置 |
通过Channel创建了direct类型的交换机,queue和其中绑定关系,路由键为test.direct
生产者Provider
1 | // 声明Exchange,RoutingKey的设置 |
生产者比较简单,设置好Exchange名称与RoutingKey
即可,注意这里名称要与Consumer的相同,才能成功投递路由消费。
测试
开启消费者后,会发现创建了Direct
类型的Exchange,并且Binding关系就是根据test.direct
这个路由键,去传递到test_direct_queue
这个队列。
启动消息提供者
成功消费
图解
如图,清晰明确的画出了Direct交换机,Messages
根据RoutingKey
一一对应到不同的Queue中。
应用场景
在做日志收集时,想只把 Error
级别的日志发送给负责记录写入磁盘文件的 Queue。这种场景下我们可以使用指定的 RoutingKey
(例如 error
)将写入磁盘文件的 Queue 绑定到 Direct Exchange
上。
Topic
Topic类型的Exchange,其实也是根据RoutingKey路由的,它有些类似于Direct,但却不同,不同点就是它可以模糊匹配。
分别支持*
和#
通配符
*
表示匹配一个单词#
则表示匹配没有或一个或多个单词
代码理解
Consumer
1 | // 声明Exchange,QueueName,RoutingKey的设置 |
对比Direct的代码,这里只修改了设置,需要注意的是Type类型改成了topic,并且routingKey为user.#
Provider
1 | // 声明Exchange,RoutingKey的设置 |
在生产者这边,我们投递三个消息,分别是user
、user.pacee
、user.pacee.info
测试
三个全部映射成功,这里忘记修改打印的字符串了:)
图解
应用场景
假设我们的消息路由规则除了需要根据日志级别来分发之外还需要根据消息来源分发,可以将 RoutingKey 定义为 消息来源.级别
如 order.info
、user.error
等。处理所有来源为 user
的 Queue 就可以通过 user.*
绑定到 Topic Exchange 上,而处理所有日志级别为 info
的 Queue 可以通过 *.info
绑定到 Exchange上。
Fanout
Fanout类型的交换机,与RoutingKey
无关,即会将消息分发到所有和他绑定的队列上,看重Binding
关系
代码理解
Consumer
1 | // 声明Exchange,QueueName,RoutingKey的设置 |
其他代码和之前一样,这里设置type
为fanout
,路由key
为test.fanout.01
Provider
1 | // 声明Exchange,RoutingKey的设置 |
这里故意设置与Consumer不同的RoutingKey
,看看消息会不会被成功投递
测试
被成功投递,应证说法
图解
应用场景
假设我们定义了一个 Exchange 来接收日志消息,同时定义了两个 Queue 来存储消息:一个记录将被打印到控制台的日志消息;另一个记录将被写入磁盘文件的日志消息。我们希望 Exchange 接收到的每一条消息都会同时被转发到两个 Queue,这种场景下就可以使用 Fanout Exchange
来广播消息到所有绑定的 Queue。
Headers
Headers Exchange 会忽略 RoutingKey 而根据消息中的 Headers 和创建绑定关系时指定的 Arguments 来匹配决定路由到哪些 Queue。
Headers Exchange 的性能比较差,而且 Direct Exchange 完全可以代替它,所以不建议使用。这里也不过多介绍了。
四、核心概念再理解
Binding
上一个笔记中了解,绑定是将Exchange和Queue连接起来,通过路由键。
在这一节,我们实际编写了绑定的代码,如下
1 | channel.queueBind(queueName,exchangeName,routingKey); // 设置绑定关系 |
主要就是三个关键信息,来组成一个绑定关系,主要操作再消息消费方
- 队列名
- 交换机名
- 路由键
Queue
通过代码编写,我们知道了Queue也有几大属性
durable
:是否持久化true/false
exclusive
:是否该连接独占,一般都为falseautoDelete
:是否自动删除,即如果没有Exchange与其连接(Binding),将会自动删除
Message
Message比较重要,主要由Properties
和Body
组成,Properties
中常用属性有:
deliveryMode
:是否持久化,默认1为不持久化,2为持久化headers
:自定义一些属性,保存到headers
中,之后代码有体现contentType
contentEncoding
:字符集priority
:优先级correlationId
:可以用作消息的唯一ID,作用挺多的replyTo
:可以用作消息失败后返回的队列,后面再细说expiration
:消息过期时间messageId
:消息的ID- ····:还有很多,这里就不赘述了
接着我们在代码中设置一些Message属性
Provider
1 | // 设置消息的Properties 使用链式编程 |
Consumer
1 | // 7.消费 |
主要是消费时,打印出在headers
中添加的自定义属性myattr
启动消费后,成功显示信息