PAcee Hub

学习 笔记


  • 首页

  • 标签

  • 分类

RabbitMQ整合-SpringAMQP(一)

发表于 2019-12-27 | 分类于 rabbitmq

Spring AMQP是用于以AMQP为基础的MQ中间件的解决方案,对于底层API进行封装,使开发者在对MQ进行操作时更加易用,并进行了一些拓展与优化。

这里我们主要使用RabbitMQ与SpringAMQP整合,对于其他以AMQP规范制作的中间件也是差不多的整合方式。

SpringAMQP核心内容

  • RabbitAdmin:管控组件
  • RabbitTemplate:消息模板组件
  • SimpleMessageListenerContainer:简单消息监听容器
  • MessageListenerAdapter:消息适配器
  • MessageConverter:消息转换器

接下来,我们就需要对这些核心内容进行研究

RabbitAdmin

RabbitAdmin的主要作用就是方便的操作Exchange,Queue,Binding这些信息

使用它需要几点注意:

  • 需要将RabbitAdmin注入到上下文中,注入的前提是ConnectionFactory在上下文中,所以要先注入ConnectionFactory
  • 注入时要设置autoStartup=true,不然不会加载RabbitAdmin
    阅读全文 »

RabbitMQ高级-死信队列

发表于 2019-12-26 | 分类于 rabbitmq

死信队列介绍

死信队列,DLX,dead-letter-exchange

死信队列其实就是一个队列,只不过里面存放的都是死信,所以才叫做死信队列

死信

那么什么是死信呢,即死亡的信息,这里死亡的定义有几种

  • 被拒绝的消息(basic.reject / basic.nack),并且requeue = false
  • 消息的TTL过期
  • 队列达到最大长度

死信的处理过程

创建一个DLX,ex01,这是一个正常的Exchange。

创建一个其他的Exchange,ex02,并且其队列设置其DLX参数为刚刚创建的死信队列。

这样如果这个ex02里有死信,就自动会投递到ex01中保存,可以根据业务场景,对ex01即DLX进行消费监听处理。

阅读全文 »

RabbitMQ高级-TTL详解

发表于 2019-12-26 | 分类于 rabbitmq

什么是TTL

  • TTL意为Time To Live,即生存时间
  • RabbitMQ中适用于队列和消息
    • 消息:即消息过期时间,比如设置10秒,10秒后如果没有被消费则删除
    • 队列:队列中消息的存活时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除

TTL控制台操作

因为之前都是代码操作,还没有在15672控制台操作过,所以我们先使用控制台进行TTL测试

阅读全文 »

RabbitMQ高级-消费端ACK与重回队列

发表于 2019-12-26 | 分类于 rabbitmq

ACK与NACK

在上一节中,我们研究了消费端的限流,了解到手动发送ACK的API,对于消费端来说,不止能发ACK还能发NACK,因为总会有消息接收出问题的情况。

当发送ACK后,告知Broker成功被消费,就结束了一条消息流程。

当发送NACK,消息未被成功消费,那么有两种情况,对于requeue的设置

  • 如果为true,即重回队列,将未被成功消费的信息放到队列最末端,等等重新被消费
  • 如果为false,即不做任何操作

使用方式

NACK:void basicNack(long deliveryTag, boolean multiple, boolean requeue)

ACK:void basicAck(long deliveryTag, boolean multiple)

阅读全文 »

RabbitMQ高级-消费端限流策略

发表于 2019-12-26 | 分类于 rabbitmq

为什么要进行限流

假设一个情景:某个消费端挂掉了,因为不是很重要,所以程序员第二天才发现,这时在MQ Broker上已经囤积了可能上千上万条message,如果这时直接重启消费端,巨量消息冲击,如果单机客户端无法抵御冲击很可能导致服务器崩溃,引发线上故障。

除了这个情景,还有其他情况,生产端每分钟有几百条消息,但消费端每分钟只能处理几十条,这时生产与消费肯定是不平衡的,如果长此以往超出了最大负荷,消费端性能必定下降,导致服务器卡顿崩溃等问题。

这两个情景下都离不开限流,只要有了限流,这些问题都能迎刃而解。

RabbitMQ限流机制

对于限流,肯定是在消费端的,与生产端无关。

RabbitMQ提供了一种QOS(服务质量保证)功能:即在未开启自动确认(autoAck)功能的前提下,如果一定数量的消息未确认,即未返回ACK,消费端不进行新消息的消费

最重要一点:不能开启autoAck,即autoAck=false,否则无法做到限流

阅读全文 »

RabbitMQ高级-自定义消费者

发表于 2019-12-26 | 分类于 rabbitmq

之前的消费者

在之前的消费端,我们是如何消费的呢?

1
2
3
4
5
6
7
8
9
10
11
// 消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queue, true, consumer);

// 消费
while (true){
// Delivery 是封装的类,封装了消息信息,配置信息,交换机信息,路由键等信息
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
byte[] body = delivery.getBody();
System.out.println(new String(body));
}

创建一个QueueingConsumer,然后循环消费,将接收到的消息信息打印出来。

可以发现,这种方式耦合严重,并且不美观,那么有没有别的方式,更简单的进行消费呢?

就需要使用到接下来要说的自定义消费者

阅读全文 »

RabbitMQ高级-消息Confirm与Return

发表于 2019-12-26 | 分类于 rabbitmq

确认消息Confirm机制

消息确认,从前面的100%可靠性投递可以了解到,当生产者投递到Broker上时,如果开启了Confirm模式,会返回一个ACK给生产者,表示消息投递成功了。这也是消息可靠性投递的保障之一。

流程图

img

如图,消息生产者发送消息到MQ Broker上,Broker收到投递的消息并做出confirm响应。

生产者这里会配置一个Confirm Listener监听器,来监听confirm响应,因为操作是异步的,所以生产者将消息发送出去就可以做其他事情了,对于消息确认只需让内部监听器去监听即可。

阅读全文 »

RabbitMQ高级-幂等性问题

发表于 2019-12-26 | 分类于 rabbitmq

什么是幂等性

用户对于同一操作发起的一次请求或者多次请求的结果是一致的。

我们可以借鉴数据库的乐观锁机制来举个例子

  • 首先为表添加一个版本字段version
  • 在执行更新操作前呢,会先去数据库查询这个version
  • 然后执行更新语句,以version作为条件,例如:
    UPDATE T_REPS SET COUNT = COUNT -1,VERSION = VERSION + 1 WHERE VERSION = 1
  • 如果执行更新时有其他人先更新了这张表的数据,那么这个条件就不生效了,也就不会执行操作了,通过这种乐观锁的机制来保障幂等性。
    阅读全文 »

RabbitMQ高级-消息100%投递成功方案

发表于 2019-12-26 | 分类于 rabbitmq

什么是生产端的可靠性投递?

  • 保证消息成功发出
  • 保障MQ节点成功接收
  • 发送端收到MQ节点的确认应答
  • 完善的消息补偿机制

既然是百分百投递成功,那么只做前三点是肯定不够的。

有些时候,可能消息投递的时候就失败了,或者消息成功投递,在MQ节点应答时因为网络闪断,导致生产端没有接收到应答,等等一些特殊情况。

所以,我们的重点就在于第四点,消息补偿机制

互联网大厂的解决方案

阅读全文 »

RabbitMQ入门

发表于 2019-12-26 | 分类于 rabbitmq

一、管理台

1577094623243

我们先介绍一下管理台

从这张图右上角可以看到用户信息,版本信息与登出。

紧接着有6个标签

Connection

连接信息

1577094742138

因为我们没有任何连接,所以是空的,后面连接生产者和消费者时再看

Channels

信道信息

1577094807220

也没有任何信息,因为信道是基于连接的

Exchanges

交换机,生产者投递消息到这里

1577094838038

这里可以看到默认有多个交换机,并且有四种类型,对于其类型我们后面讲解

Features里有D和I的信息

  • D:durable=true,即持久化,就是MQ如果停掉,这些交换机都不会被清除,一直保留
  • I:internal=true,一个特征,后面详说

Queue

MQ队列

1577095146437

也是为空的

Admin

1577095197061

可以进行用户,虚拟机,以及权限的操作,比如添加用户,添加虚拟机,以及权限配置

Overview

MQ概览,大体信息都在这里

1577095291809

MQ中连接,信道等合计

1577095316585

节点信息,内存占用,磁盘占用等等

1577095336725

保存路径,比如配置文件路径,数据存储的路径,日志的保存

1577095363997

端口号信息

1577095376502

最后这个很重要,MQ信息的导入导出,当我们集群的服务器想升级,或者迁移时,比如centos6升centos7。

如果慢慢调整要耗费很长时间,使用MQ信息导入导出,可以节省大量时间,只需对某些升级所需改动的配置改造即可。

导出后是json文件信息,如下,保存了比如用户,虚拟主机,包括exchange,binding,queue,该有的都有

1577095492548

二、极速入门

这里我们使用SpringBoot快速搭建一个生产者与消费者,并配合Rabbitmq进行消费。

对于SpringBoot如何搭建就不介绍了,主要使用依赖:

1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>

Provider

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Provider {

/**
* 需要将消息投递到MQ中
* @param args
*/
public static void main(String[] args) throws Exception{
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.56.120");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/"); // 默认创建 "/" 为虚拟主机

// 2.创建连接
Connection connection = connectionFactory.newConnection();

// 3.创建Channel通道
Channel channel = connection.createChannel();

for(int i = 0;i < 5;i ++){
String msg = "hello rabbit";
/**
* 4.投递消息 四个参数
* exchange,指定投递到哪个交换机
* routingKey,路由键
* props,之前说的Msg由props与body组成,props是描述消息的一些信息,比如优先级
* body,消息中真正存放的信息
*/
channel.basicPublish("","test01",null,msg.getBytes());
}

// 5.关闭连接
channel.close();
connection.close();

}
}

消息生产者/提供方 代码比较简单,主要是建立连接,建立通道,然后进行消息的投递。

注意最后要关闭连接。

Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class Consumer {

public static void main(String[] args) throws Exception{
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.56.120");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/"); // 默认创建 "/" 为虚拟主机

// 2.创建连接
Connection connection = connectionFactory.newConnection();

// 3.创建Channel通道
Channel channel = connection.createChannel();

// 4.创建队列
String queueName = "test01";
/**
* 五个参数
* queue,队列名称
* durable,是否持久化
* exclusive,是否独占,即只能这个连接使用的队列
* autoDelete,是否自动删除,即如果该队列没有任何exchange进行绑定,会自动被删除
* arguments,其他一些参数
*/
channel.queueDeclare(queueName,true,false,false,null);


// 5.创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

// 6.设置信道的消费者
/**
* 三个参数
* queue,获取消息的队列名
* autoAck,自动返回Ack
* 当消费端消费时,MQ会发送信息给消费者
* 消费者成功获取后,会返回Ack,代表成功消费
* 如果不设置自动,需要手动返回Ack
* callback,消费者
*/
channel.basicConsume(queueName,true,queueingConsumer);

// 7.消费
while (true){
// 这里可以添加long参数,即超时时间,不设置会一直阻塞
// Delivery 是封装的类,封装了消息信息,配置信息,交换机信息,路由键等信息
Delivery delivery = queueingConsumer.nextDelivery();
byte[] body = delivery.getBody();
System.out.println(new String(body));
}

}
}

Consumer的代码就比较多一点了,Rabbitmq在消息提供方不需要创建提供者,但在消费方需要创建消费者,通道设置并进行消费。

还有一点是队列的创建,因为现在MQ中队列是空的,所以在消费方创建一个队列,如果有相同名称的队列,便不会创建,没有会自动创建。

测试

这里先启动Consumer,因为先启动Provider的话,没有test01这个队列,就无法投递消息

启动Consumer后,会一直处在监听状态,查看一下MQ管理台

1577104595038

1577104603479

1577104617424

出现了连接,队列,信道等等信息,等待消息的产生并消费

这时开启Provider

1577104668687

瞬间执行完毕,并且消费方获取5条消息并打印

1577104701750

控制台也有消息消费的折线图

这里有一个问题,我们没有Exchange,那么消息是如何投递并消费的呢?

Default Exchange

1577104765426

在控制台可以清楚看到一个default交换机,进去看看他的说明

1577104816665

意思就是,在提供方没有指明使用哪个Exchange时,默认投递到这个Exchange上,并且路由规则是,路由键的名称和队列名称相同,便可成功路由消费,这也是我们没有指定Exchange缺能消费成功的原因。

在上面的代码中也可以发现,routingKey和queueName是相同的,都为test01

三、Exchange

在上一节快速入门中,我们使用了默认的Exchange,路由规则是路由键名称和队列名相同。

这一节,我们会详细介绍三大Exchange类型

Direct

英文翻译为直接,这个Exchange就和其名字一样,是直连的,也是RabbitMQ默认的Exchange类型,完全根据RoutingKey来路由消息。

消费者设置Exchange的Queue和Binding时要指定RoutingKey,生产者提供消息时要指定相同的RoutingKey,才能使消息正确被消费。

上一节使用的Default Exchange就是Direct类型,但他比较特殊,没有Binding操作,必须RoutingKey与QueueName完全匹配才可以。

代码理解

消费者Consumer,这里对于信道连接的创建代码就不再重复了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 声明Exchange,QueueName,RoutingKey的设置
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String queueName = "test_direct_queue";
String routingKey = "test.direct";

// 参数:交换机名称,类型,是否持久化,是否自动删除,Internal 用不到,其他参数 null
channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,exchangeName,routingKey); // 设置绑定关系

// 创建消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 绑定消费者
channel.basicConsume(queueName,true,consumer);
// 消费
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println(new String(delivery.getBody()));
}

通过Channel创建了direct类型的交换机,queue和其中绑定关系,路由键为test.direct

生产者Provider

1
2
3
4
5
6
// 声明Exchange,RoutingKey的设置
String exchangeName = "test_direct_exchange";
String routingKey = "test.direct";
String msg = "hello direct Exchange";
// 投递消息
channel.basicPublish(exchangeName,routingKey,null,msg.getBytes());

生产者比较简单,设置好Exchange名称与RoutingKey即可,注意这里名称要与Consumer的相同,才能成功投递路由消费。

测试

1577153828348

开启消费者后,会发现创建了Direct类型的Exchange,并且Binding关系就是根据test.direct这个路由键,去传递到test_direct_queue这个队列。

启动消息提供者

1577153928633

成功消费

图解

640?wx_fmt=png

如图,清晰明确的画出了Direct交换机,Messages根据RoutingKey一一对应到不同的Queue中。

应用场景

在做日志收集时,想只把 Error级别的日志发送给负责记录写入磁盘文件的 Queue。这种场景下我们可以使用指定的 RoutingKey(例如 error)将写入磁盘文件的 Queue 绑定到 Direct Exchange 上。

Topic

Topic类型的Exchange,其实也是根据RoutingKey路由的,它有些类似于Direct,但却不同,不同点就是它可以模糊匹配。

分别支持*和#通配符

  • *表示匹配一个单词
  • #则表示匹配没有或一个或多个单词

代码理解

Consumer

1
2
3
4
5
// 声明Exchange,QueueName,RoutingKey的设置
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
String routingKey = "user.#";

对比Direct的代码,这里只修改了设置,需要注意的是Type类型改成了topic,并且routingKey为user.#

Provider

1
2
3
4
5
6
7
8
9
10
11
// 声明Exchange,RoutingKey的设置
String exchangeName = "test_topic_exchange";
String routingKey1 = "user";
String routingKey2 = "user.pacee";
String routingKey3 = "user.pacee.info";

// 投递消息
String msg = "hello direct Exchange";
channel.basicPublish(exchangeName,routingKey1,null,msg.getBytes());
channel.basicPublish(exchangeName,routingKey2,null,msg.getBytes());
channel.basicPublish(exchangeName,routingKey3,null,msg.getBytes());

在生产者这边,我们投递三个消息,分别是user、user.pacee、user.pacee.info

测试

1577157156024

三个全部映射成功,这里忘记修改打印的字符串了:)

图解

640?wx_fmt=png

应用场景

假设我们的消息路由规则除了需要根据日志级别来分发之外还需要根据消息来源分发,可以将 RoutingKey 定义为 消息来源.级别 如 order.info、user.error等。处理所有来源为 user 的 Queue 就可以通过 user.* 绑定到 Topic Exchange 上,而处理所有日志级别为 info 的 Queue 可以通过 *.info 绑定到 Exchange上。

Fanout

Fanout类型的交换机,与RoutingKey无关,即会将消息分发到所有和他绑定的队列上,看重Binding关系

代码理解

Consumer

1
2
3
4
5
// 声明Exchange,QueueName,RoutingKey的设置
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
String queueName = "test_fanout_queue";
String routingKey = "test.fanout.01";

其他代码和之前一样,这里设置type为fanout,路由key为test.fanout.01

Provider

1
2
3
4
5
6
7
// 声明Exchange,RoutingKey的设置
String exchangeName = "test_fanout_exchange";
String routingKey = "test.fanout.02"; // 故意设置的和Consumer不同的RoutingKey

// 投递消息
String msg = "hello fanout Exchange";
channel.basicPublish(exchangeName,routingKey,null,msg.getBytes());

这里故意设置与Consumer不同的RoutingKey,看看消息会不会被成功投递

测试

1577157979895

被成功投递,应证说法

图解

640?wx_fmt=png

应用场景

假设我们定义了一个 Exchange 来接收日志消息,同时定义了两个 Queue 来存储消息:一个记录将被打印到控制台的日志消息;另一个记录将被写入磁盘文件的日志消息。我们希望 Exchange 接收到的每一条消息都会同时被转发到两个 Queue,这种场景下就可以使用 Fanout Exchange 来广播消息到所有绑定的 Queue。

Headers

Headers Exchange 会忽略 RoutingKey 而根据消息中的 Headers 和创建绑定关系时指定的 Arguments 来匹配决定路由到哪些 Queue。

Headers Exchange 的性能比较差,而且 Direct Exchange 完全可以代替它,所以不建议使用。这里也不过多介绍了。

四、核心概念再理解

Binding

上一个笔记中了解,绑定是将Exchange和Queue连接起来,通过路由键。

在这一节,我们实际编写了绑定的代码,如下

1
channel.queueBind(queueName,exchangeName,routingKey); // 设置绑定关系

主要就是三个关键信息,来组成一个绑定关系,主要操作再消息消费方

  • 队列名
  • 交换机名
  • 路由键

Queue

通过代码编写,我们知道了Queue也有几大属性

  • durable:是否持久化 true/false
  • exclusive:是否该连接独占,一般都为false
  • autoDelete:是否自动删除,即如果没有Exchange与其连接(Binding),将会自动删除

Message

Message比较重要,主要由Properties和Body组成,Properties中常用属性有:

  • deliveryMode:是否持久化,默认1为不持久化,2为持久化
  • headers:自定义一些属性,保存到headers中,之后代码有体现
  • contentType
  • contentEncoding:字符集
  • priority:优先级
  • correlationId:可以用作消息的唯一ID,作用挺多的
  • replyTo:可以用作消息失败后返回的队列,后面再细说
  • expiration:消息过期时间
  • messageId:消息的ID
  • ····:还有很多,这里就不赘述了

接着我们在代码中设置一些Message属性

Provider

1
2
3
4
5
6
7
8
9
10
11
12
// 设置消息的Properties 使用链式编程
Map<String,Object> map = new HashMap<>();
map.put("myattr","head attr");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 设置是否持久化,1:false,2:true
.contentEncoding("UTF-8") // 设置字符集
.expiration("10000") // 设置10秒的过期时间
.headers(map) // 设置headers,一些自定义的键值对
.build();

String msg = "hello rabbit message";
channel.basicPublish("","test01",properties,msg.getBytes());

Consumer

1
2
3
4
5
6
7
8
9
10
11
// 7.消费
while (true){
// 这里可以添加long参数,即超时时间,不设置会一直阻塞
// Delivery 是封装的类,封装了消息信息,配置信息,交换机信息,路由键等信息
Delivery delivery = queueingConsumer.nextDelivery();
byte[] body = delivery.getBody();
AMQP.BasicProperties properties = delivery.getProperties();
Map<String, Object> headers = properties.getHeaders();
System.out.println(new String(body));
System.out.println(headers.get("myattr"));
}

主要是消费时,打印出在headers中添加的自定义属性myattr

1577171235439

启动消费后,成功显示信息

< 1…345…8 >
PAcee

PAcee

学习 笔记

76 日志
6 分类
7 标签
© 2020 PAcee