RabbitMQ
学习文档:https://niceseason.github.io/2020/04/18/springboot/#%E4%BA%8C%E3%80%81RabbitMQ
官网:https://www.rabbitmq.com/documentation.html
异步、流量控制(消峰)、解耦
RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现
rocketmq开源版本功能不够完善
AMQP、rabbitMq高性能、高并发、高可用、数据处理量不错、开源、多语言
kafka性能高、但是可能会重发
下载
官网:
https://github.com/erlang/otp/releases?page=3
https://www.rabbitmq.com/install-windows.html
软件
https://github.com/erlang/otp/releases/download/OTP-23.3.4.14/otp_win64_23.3.4.14.exe
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.9.19/rabbitmq-server-3.9.19.exe
(太新的erlang也要更新)
安装目录下 E:\environment\rabbitmq\rabbitmq_server-3.9.19\sbin
port: 15672
rabbitmqctl start_app
rabbitmqctl stop
rabbitmqctl status //查看当前状态
rabbitmq-plugins enable rabbitmq_management //开启Web插件
rabbitmq-server start //启动服务
rabbitmq-server stop //停止服务
rabbitmq-server restart //重启服务
# 可视化插件
rabbitmq-plugins enable rabbitmq_management
guest/guest
# 使用命令添加用户并授权
# 添加用户
rabbitmqctl add_user admin admin
# 设置permissions
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
# 设置用户角色
rabbitmqctl set_user_tags admin administrator
# 查看新添加的admin
rabbitmqctl list_users
# 查看用于的权限
rabbitmqctl list_permissions -p /
核心概念
- Message
- 消息,消息是不具名的,它由消息头和消息体组成
- 消息头,包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等
- Publisher
- 消息的生产者,也是一个向交换器发布消息的客户端应用程序
- Exchange
- 交换器,将生产者消息路由给服务器中的队列
- 类型有direct(默认),fanout, topic, 和headers,具有不同转发策略
- Queue
- 消息队列,保存消息直到发送给消费者
- Binding
- 绑定,用于消息队列和交换器之间的关联
- Connection
- 网络连接,比如一个TCP连接
- Consumer
- 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序
- Virtual Host
- 虚拟主机,表示一批交换器、消息队列和相关对象。
- vhost 是 AMQP 概念的基础,必须在连接时指定
- RabbitMQ 默认的 vhost 是 /
- Broker
- 消息队列服务器实体
- channel
- 建立一条连接,有很多通道
运行机制
消息路由
AMQP 中增加了Exchange 和 Binding 的角色, Binding 决定交换器的消息应该发送到那个队列
Exchange 类型
direct
点对点模式,消息中的路由键(routing key)如果和 Binding 中的 binding
key 一致, 交换器就将消息发到对应的队列中。fanout
广播模式,每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去
topic
将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。
识别通配符: # 匹配 0 个或多个单词, *匹配一个单词
初始化
amqp:5672、5671
http,web端口:15672
clustering:25672
4369,25672:erlang发现&集群端口
1883,8883:MQTT协议端口
# for RabbitMQ 3.9, the latest series
docker run \
--ip 192.168.0.9 --net mynet \
-it --rm -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 rabbitmq:3.9-management
docker run \
-it --rm -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 rabbitmq:3.9-management
# for RabbitMQ 3.8,
# https://www.rabbitmq.com/versions.html
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8-management
docker update rabbitmq --restart=always
队列模式
https://www.rabbitmq.com/getstarted.html
这里写的太烂了
简单队列
点对点,交换机对应一个队列(一个路由)
简单
生产的很多,可能内存溢出
send
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("101.227.11.219");
factory.setUsername("root");
factory.setPassword("");
factory.setVirtualHost("/ming");
factory.setPort(5672);
try (
Connection connection = factory.newConnection();
Channel channel = connection.createChannel()
) {
channel.queueDeclare(
QUEUE_NAME, // 名称
false, // 持久化
false, // 排他队列,基于连接下,可以访问,不可重名,关闭连接,也会删除
/*
1.基于连接可见,同一连接下的不同通道可以访问同一连接下的其他排它队列
2.基于连接创建,连接下排它队列不可同名
3.即使是持久化的,连接关闭则排它队列删除
只限于一个客户端发送读取消息场景
*/
false, // 自动删除,如果没有消费者则删除
null
);
String message = "Hello World!";
// 交换机、路由、是否持久化、消息内容
channel.basicPublish(
"",
QUEUE_NAME,
null,
message.getBytes(StandardCharsets.UTF_8)
);
System.out.println(" [x] Sent '" + message + "'");
}
}
}
recv
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("101.227.11.219");
factory.setUsername("root");
factory.setPassword("");
factory.setVirtualHost("/ming");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(
QUEUE_NAME,
false,
false,
false,
null
);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
轮训
启动多个消费者,做到消费一个队列中的消息
一人接收一个
确定:消费者的能力不一样
// 处理完才接受下一条
int prefetchCount = 1;
channel.basicQos(prefetchCount);
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("101.227.11.219");
factory.setUsername("root");
factory.setPassword("mingyuefusu!");
factory.setVirtualHost("/ming");
factory.setPort(5672);
return factory.newConnection();
}
recv
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
channel.basicAck(
delivery.getEnvelope().getDeliveryTag(),
false // false一条条确认消息
);
};
channel.basicConsume(
QUEUE_NAME,
true, // 手动确认收到消息
deliverCallback,
consumerTag -> { }
);
发布订阅/广播 fanout
消息到达一个交换机后,会给绑定的所有队列都发送一样的消息
多个消费者收到同一条消息
send
try (
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
) {
// 声明广播的交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String message = "Hello World!";
// 只需要放交换机即可
channel.basicPublish(
EXCHANGE_NAME,
"",
null,
message.getBytes(StandardCharsets.UTF_8)
);
System.out.println(" [x] Sent '" + message);
}
recv
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 获取广播交换机中的队列
String queue = channel.queueDeclare().getQueue();
// 绑定队列和交换机
channel.queueBind(queue, EXCHANGE_NAME, "");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(
queue,// 监听的还是队列
true, // 自动回复
deliverCallback,
consumerTag -> { }
);
路由模式 direct
会员才能看到会员专属的信息
缺点:路由key太多难以管理
send
try (
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
) {
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String[] argv = {"info", "error", "warn", "debug"};
for (String severity : argv) {
String message = "my " + severity;
// 交换机、路由、是否持久化、消息内容
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
}
}
recv
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
String[] argv = {"info", "error", "warn", "debug"};
for (String severity : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
主题模式
*唯一单词,#0或多个
发送详细,接受通配
匹配不到默认丢弃
send
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = getRouting(argv);
String message = getMessage(argv);
// 交换机 路由 持久化 内容
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
recv
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 为什么能获取队列名?如果这个交换机有多个队列呢? 但是这里好像是先有的接收
String queueName = channel.queueDeclare().getQueue();
String[] argv = {"com.#"};
for (String bindingKey : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
RPC
客户端和服务端同步
官网:https://www.rabbitmq.com/tutorials/tutorial-six-java.html
client请求携带reply_to(队列名)和correlation_id发送至队列
server消费队列后,使用reply_to将消息发送至队列,client根据correlation_id取出
confirm
生产者的消息不能知道消息是否到达了队列,可以使用事务
channel.txSelect();
channel.txRollback();
- 普通confirm(同步)
每发送一条消息,调用waitForConfirms()方法,等待服务器端confirm,实际上是一种串行confirm
- 批量confirm(同步)
每发送一条消息后,waitForConfirmOrDie(),等待服务器端confirm
- 异步confirm
提供回调方法,服务端confirm一条或多条消息后,client端会回调这个方法
channel.confirmSelect();
channel.waitForConfirms(); // 普通confirm
waitForConfirmOrDie(); // 批量模式,只要有一条没确认就抛异常
send
public class Send {
private static final String QUEUE_NAME = "async";
public static void main(String[] temp) throws Exception {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
try {
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
// 开启确认模式
channel.confirmSelect();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if( multiple ) {
System.out.println("multiple" + deliveryTag);
// 删除 deliveryTag 项标识id ?
confirmSet.headSet(deliveryTag + 1L).clear();
} else {
System.out.println("single" + deliveryTag);
confirmSet.headSet(deliveryTag).clear();
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if(multiple) {
System.out.println("failed-multiple->" +deliveryTag);
confirmSet.headSet(deliveryTag +1L).clear();
} else {
System.out.println("failed-single->" +deliveryTag);
confirmSet.remove(deliveryTag);
}
}
});
while(true) {
String message = "hello world";
Long seqNo = channel.getNextPublishSeqNo();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("utf-8"));
confirmSet.add(seqNo);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if(null != channel && channel.isOpen()) {
channel.close();
}
if(null != channel && connection.isOpen()) {
connection.close();
}
}
}
//..
}
recv
public static void main(String[] argv) throws Exception {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(
QUEUE_NAME,
false,
false,
false,
null
);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(
QUEUE_NAME,
true, // 自动回复
deliverCallback,
consumerTag -> { }
);
}
SpringAMQP
依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>2.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
配置
开启使用
@EnableRabbit
配合文件
spring:
# 1 broker接受回调,队列错误回调,消费端接受回调
rabbitmq:
host: 101.200.169.229
port: 5672
username: guest
password: guest
virtual-host: /
publisher-confirm-type: correlated
publisher-returns: true
# 抵达队列,异步方式回调
template:
mandatory: true
listener:
simple:
acknowledge-mode: manual
# 2
rabbitmq:
host: 101.227.11.219
port: 5672
username: root
password:
virtual-host: /ming
server:
port: 8081
@Autowired
AmqpAdmin amqpAdmin;
序列化方式
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyRabbitConfig {
// 序列化方式,传输Object
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
简单API
@SpringBootTest
@Slf4j
public class Test {
@Autowired
private AmqpAdmin amqpAdmin;
@Autowired
private RabbitTemplate rabbitTemplate;
// 交换机、队列、绑定
@org.junit.jupiter.api.Test
public void test() {
// exchange: durable autoDelete
DirectExchange hello = new DirectExchange("hello-exchange", true, false);
amqpAdmin.declareExchange(hello);
log.info("{} finish", "hello");
// queue: durable exclusive autoDelete
Queue queue = new Queue("hello-queue", true, false, false);
amqpAdmin.declareQueue(queue);
// binding
/* String destination, 目的地
Binding.DestinationType destinationType, 类型: 交换机、队列
String exchange, 交换机
String routingKey, 路由键
@Nullable Map<String, Object> arguments
*/
Binding binding = new Binding("hello-queue", Binding.DestinationType.QUEUE, "hello-exchange", "hello.java", null);
amqpAdmin.declareBinding(binding);
// send Msg
String msg = "hello world!!!";
rabbitTemplate.convertAndSend("hello-exchange", "hello.java", msg);;
}
}
发送
OrderEntity orderEntity = new OrderEntity();
orderEntity.setOrderSn(UUID.randomUUID().toString());
orderEntity.setModifyTime(new Date());
// 反馈可以统一放到configuration中
/*
public MyRabbitConfig(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
initTemplate();
}
*/
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
if(ack) {
System.out.println("发送成功");
} else {
System.out.println("发送失败");
}
});
rabbitTemplate.setReturnCallback((msg, respCode, respText, exchange, routeingKey) -> {
System.out.println("发送失败" +msg );
});
rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", orderEntity);
Init资源
有listener才会创建
@Configuration
@EnableRabbit // 需要监听的时候要添加
public class RabbitMQConfig {
@Bean
public Exchange orderEventExchange() {
/**
* String name,
* boolean durable,
* boolean autoDelete,
* Map<String, Object> arguments
*/
return new TopicExchange("order-event-exchange", true, false);
}
@Bean
public Queue orderDelayQueue() {
HashMap<String, Object> paramMap = new HashMap<>();
paramMap.put("x-dead-letter-exchange", "order-event-exchange");
paramMap.put("x-dead-letter-routing-key", "order.release.order");
// 20s
paramMap.put("x-message-ttl", TimeUnit.SECONDS.toMillis(20L));
return new Queue("order.delay.queue", true, false, false, paramMap);
}
/**
* 普通队列
*
* @return
*/
@Bean
public Queue orderReleaseQueue() {
return new Queue("order.release.order.queue", true, false, false);
}
/**
* 创建订单的binding,order-event-exchange 和 order.delay.queue 通过路由 order.create.order绑定
* @return
*/
@Bean
public Binding orderCreateBinding() {
/**
* String destination, 目的地(队列名或者交换机名字)
* DestinationType destinationType, 目的地类型(Queue、Exhcange)
* String exchange,
* String routingKey,
* Map<String, Object> arguments
* */
return new Binding(
"order.delay.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.create.order",
null
);
}
}
样例2
@Configuration
public class DirectRabbitConfig {
@Bean
public Queue rabbitmqDemoDirectQueue() {
/**
* 1、name: 队列名称
* 2、durable: 是否持久化
* 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
* 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
* */
return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true, false, false);
}
@Bean
public DirectExchange rabbitmqDemoDirectExchange() {
//Direct交换机
return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
}
@Bean
public Binding bindDirect() {
//链式写法,绑定交换机和队列,并设置匹配键
return BindingBuilder
//绑定队列
.bind(rabbitmqDemoDirectQueue())
//到交换机
.to(rabbitmqDemoDirectExchange())
//并设置匹配键
.with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING);
}
}
监听接收
如果已经创建好了队列,将不会覆盖,要删除队列
@Component
@RabbitListener(queues = {"order.release.order.queue"})
// 使用queuesToDeclare属性,如果不存在则会创建队列
// @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
public class OrderCloseListener {
@RabbitHandler
public void getCloseOrderEntity(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
System.out.println("接受到关闭订单" +orderEntity);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
发送模式
from:https://developer.aliyun.com/article/769883#slide-13
直连、主题通过 routerKey 绑定交换机和队列
广播会发送到绑定了交换机的所有队列
header exchange 通过头部的信息和绑定的头部信息进行对比,相当于将路由键放在了头部
直连
@Configuration
public class DirectRabbitConfig {
@Bean
public Queue rabbitmqDemoDirectQueue() {
/**
* 1、name: 队列名称
* 2、durable: 是否持久化
* 3、exclusive: 排他队列,基于连接下,可以访问,不可重名,关闭连接就会删除
* 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
* */
return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true, false, false);
}
@Bean
public DirectExchange rabbitmqDemoDirectExchange() {
// Direct交换机
return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
}
@Bean
public Binding bindDirect() {
// 链式写法,绑定交换机和队列,并设置匹配键
return BindingBuilder
// 绑定队列
.bind(rabbitmqDemoDirectQueue())
// 到交换机
.to(rabbitmqDemoDirectExchange())
// 并设置匹配键
.with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING);
}
}
发送
rabbitTemplate.convertAndSend(
RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE,
RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING,
dataHashMap
);
接收
@RabbitListener(
queuesToDeclare =
@Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC)
)
广播模式
通过将两个队列绑定到相同的交换机上,启动项目时就创建交换机和队列
不需要路由键
声明
@Component
public class DirectRabbitConfig implements BeanPostProcessor {
@Resource
private RabbitAdmin rabbitAdmin;
@Bean
public Queue fanoutExchangeQueueA() {
// 队列A
return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A, true, false, false);
}
@Bean
public Queue fanoutExchangeQueueB() {
// 队列B
return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_B, true, false, false);
}
@Bean
public FanoutExchange rabbitmqDemoFanoutExchange() {
// 创建 FanoutExchange 类型交换机
return new FanoutExchange(
RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME,
true,
false
);
}
@Bean
public Binding bindFanoutA() {
//队列A绑定到FanoutExchange交换机
return BindingBuilder
.bind(fanoutExchangeQueueA())
.to(rabbitmqDemoFanoutExchange());
}
@Bean
public Binding bindFanoutB() {
//队列B绑定到FanoutExchange交换机
return BindingBuilder.bind(fanoutExchangeQueueB()).to(rabbitmqDemoFanoutExchange());
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
//启动项目即创建交换机和队列
rabbitAdmin.declareExchange(rabbitmqDemoFanoutExchange());
rabbitAdmin.declareQueue(fanoutExchangeQueueB());
rabbitAdmin.declareQueue(fanoutExchangeQueueA());
return null;
}
}
发送
@Resource
private RabbitTemplate rabbitTemplate;
// 发布消息,就可以不指定路由
@Override
public String sendMsgByFanoutExchange(String msg) throws Exception {
Map<String, Object> message = getMessage(msg);
try {
rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, "", message);
return "ok";
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
接受
@RabbitListener(
queuesToDeclare =
@Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A)
)
@RabbitHandler
主题模式
通过”*” 、 “#”通配符,路由到对应的队列
*
符号:有且只匹配一个词
#
符号:匹配一个或多个词
一个带路由匹配规则的消息到来,交换机通过路由规则,发送到所有对应的队列
可以实现直连direct、广播fanout模式的,感觉主要是看路由键
header exchange
不是通过路由键,而是通过请求头中的键值
队列绑定时,能规定头部信息,x-match中的值为any/all,对消息的头部信息 和 队列绑定的头部信息进行匹配
@Component
public class RabbitConfig implements BeanPostProcessor {
@Bean
public Queue headersQueueA() {
return new Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A, true, false, false);
}
@Bean
public Binding bindHeadersA() {
Map<String, Object> map = new HashMap<>();
map.put("key_one", "java");
map.put("key_two", "rabbit");
// 全匹配
return BindingBuilder.bind(headersQueueA())
.to(rabbitmqDemoHeadersExchange())
.whereAll(map).match();
// 部分匹配
return BindingBuilder.bind(headersQueueB())
.to(rabbitmqDemoHeadersExchange())
.whereAny(map).match();
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
rabbitAdmin.declareExchange(rabbitmqDemoHeadersExchange());
rabbitAdmin.declareQueue(headersQueueA());
rabbitAdmin.declareQueue(headersQueueB());
return null;
}
}
发送
MessageProperties messageProperties = new MessageProperties();
// 消息持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setContentType("UTF-8");
// 添加消息
messageProperties.getHeaders().putAll(headerDataMap);
Message message = new Message(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend(
RabbitMQConfig.HEADERS_EXCHANGE_DEMO_NAME,
null,
message
);
接受
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B))
public void process(Message message) throws Exception {
MessageProperties messageProperties = message.getMessageProperties();
String contentType = messageProperties.getContentType();
System.out.println("队列[" + RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B + "]收到消息:" + new String(message.getBody(), contentType));
}
邮箱发送
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
<version>2.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
<version>2.1.7.RELEASE</version>
</dependency>
配置
server:
port: 8083
spring:
mail:
host: smtp.qq.com
port: 465
username: 284908631@qq.com
password: #
#test-connection: 是否测试链接
protocol: smtp
default-encoding: UTF-8
properties.mail.smtp.ssl.enable: true
# properties:
# mail.smtp.socketFactory.fallback : true
# mail.smtp.starttls.enable: true
rabbitmq:
host:
username: root
password:
virtual-host: /ming
@Bean
public Queue queue() {
return new Queue("com.ming.mail");
}
模板
<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8">
<title>欢迎</title>
</head>
<body>
欢迎<span style="color:orangered" th:text="${name}"></span>加入
</body>
</html>
接收
@Component
public class MailRecv {
private static Logger logger = LoggerFactory.getLogger(MailRecv.class);
@Autowired
private JavaMailSender javaMailSender;
@Autowired
private MailProperties mailProperties;
@Autowired
private TemplateEngine templateEngine;
@RabbitListener(queues = "com.ming.mail")
public void handle(MyUser myUser) {
MimeMessage msg = javaMailSender.createMimeMessage();
MimeMessageHelper helper = new MimeMessageHelper(msg);
try{
helper.setFrom(mailProperties.getUsername());
helper.setTo(myUser.getEmail());
helper.setSubject("入职欢迎");
helper.setSentDate(new Date());
Context context = new Context();
context.setVariable("name", myUser.getName());
// 模板生成文本
String text = templateEngine.process("mail", context);
helper.setText(text, true);
javaMailSender.send(msg);
} catch (MessagingException e) {
e.printStackTrace();
logger.error("email error");
}
}
}
发送
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMail() {
MyUser myUser = new MyUser();
myUser.setAge(20L);
myUser.setName("mingyue");
myUser.setEmail("284908631@qq.com");
rabbitTemplate.convertAndSend("com.ming.mail", myUser);
}
可靠性传递【发送】
# 已经过时
spring.rabbitmq.publisher-confirms=ture
# 高版本,发送端信息到达brock
sprin.rabbitmq.publisher-confirm-type=correlated
# 配置config,以下实现有
# 失败回调,发送端信息抵达队列
spring.rabbitmq.publisher-returns=true
# 抵达队列,异步方式回调
spring.rabbitmq.template.mandatory=true
publisher-confirm-type: correlated
publisher-returns: true
# 抵达队列,异步方式回调
template:
mandatory: true
生产端可靠性投递
成功发出
成功接收
生产者能收到应答
1.消息落库,进行标记,mq回调改变状态为已发送,定时任务将未成功发送的重新发送
2.消费者发送消息到队列,告诉回调服务已经消费完成,生产者延迟再次投递,回调服务进行确认,如果没有id相同的消费完成记录,则发起RPC重新发送
实现
spring:
rabbitmq:
host: 101.227.11.219
username: #
password: #
virtual-host: /ming
# 确认回调
publisher-confirm-type: correlated
# 失败回调
publisher-returns: true
config
设置 rabbitTemplate 成功和失败回调
@Configuration
public class RabbitMQConfig {
public final Logger logger = LoggerFactory.getLogger(RabbitMQConfig.class);
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
@Autowired
private MailLogService mailLogService;
// 也可以使用@PostConstruct,对rabbitTemplate进行改造
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
// broker确认接受到信息,回调
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
String msgId = data.getId();
if(ack) {
logger.info("{}->发送成功", msgId);
MailLog mailLog = new MailLog();
mailLog.setStatus(1);
mailLog.setMsgId(msgId);
mailLogService.updateById(mailLog);
} else {
logger.error("{}->发送失败", msgId);
}
});
/**
* 队列接受消息失败,回调
* msg:消息主题
* respCode:响应码
* respText:响应描述
* exchange:交换机
* routingKey: 路由器
*/
rabbitTemplate.setReturnCallback((msg, respCode, respText, exchange, routeingKey) -> {
logger.error("{}->发送到queue失败", msg.getBody());
});
return rabbitTemplate;
}
@Bean
public Queue queue() {
return new Queue(EmailConstants.MAIL_QUEUE_NAME);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(EmailConstants.MAIL_EXCHANGE_NAME);
}
@Bean
public Binding binding() {
return BindingBuilder
.bind(queue())
.to(topicExchange())
.with(EmailConstants.MAIL_ROUTING_KEY);
}
}
发送
准备 UUID 作为 msgID ,在 converAndSend中 new CorrelationData(msgId) 发送给队列
@Test
public void sendMail() {
System.out.println("???");
String msgID = UUID.randomUUID().toString();
MailLog mailLog = new MailLog();
mailLog.setMsgId(msgID);
mailLog.setEid(0);
mailLog.setStatus(0);
mailLog.setRouteKey(EmailConstants.MAIL_ROUTING_KEY);
mailLog.setExchange(EmailConstants.MAIL_EXCHANGE_NAME);
mailLog.setCount(0);
mailLog.setTryTime(LocalDateTime.now().plusMinutes(EmailConstants.MSG_TIMEOUT));
mailLog.setCreateTime(LocalDateTime.now());
mailLog.setUpdateTime(LocalDateTime.now());
mailLogService.save(mailLog);
MyUser myUser = new MyUser();
myUser.setAge(20L);
myUser.setName("mingyue");
myUser.setEmail("284908631@qq.com");
rabbitTemplate.convertAndSend(
EmailConstants.MAIL_EXCHANGE_NAME,
EmailConstants.MAIL_ROUTING_KEY,
myUser,
new CorrelationData(msgID) // 消息的唯一id
);
}
定时
数据库中查找状态为 未发送0且到了重试时间的,如果其尝试次数小于3则重新发送给mq
@Scheduled(cron = "0/10 * * * * ?")
public void mail() {
List<MailLog> list = mailLogService.list(
new QueryWrapper<MailLog>()
.eq("status", 0)
.lt("tryTime", LocalDateTime.now())
);
System.out.println("????");
list.forEach(System.out::println);
System.out.println("????");
list.forEach(mailLog -> {
if (3 <= mailLog.getCount()) {
MailLog mailLog1 = new MailLog();
mailLog1.setStatus(2);
mailLog1.setMsgId(mailLog.getMsgId());
mailLogService.updateById(
mailLog1
);
}
MailLog mailLog2 = new MailLog();
mailLog2.setTryTime(LocalDateTime.now().plusMinutes(EmailConstants.MSG_TIMEOUT));
mailLog2.setUpdateTime(LocalDateTime.now());
mailLog2.setMsgId(mailLog.getMsgId());
mailLog2.setCount(mailLog.getCount() + 1);
mailLogService.updateById(mailLog2);
System.out.println("eid" + mailLog.getEid());
MyUser myUser = new MyUser();
myUser.setAge(20L);
myUser.setName("mingyue");
myUser.setEmail("284908631@qq.com");
rabbitTemplate.convertAndSend(
EmailConstants.MAIL_EXCHANGE_NAME,
EmailConstants.MAIL_ROUTING_KEY,
myUser,
new CorrelationData(mailLog.getMsgId())
);
});
}
可靠性传递【接受】
队列发送信息到接收端,如果接收端宕机,channel也还是自动回复,消息就全部没有了
# 收到确认收获
spring.rabbitmq.listener.simple.acknowledge-mode=manual
tag自增
@Component
@RabbitListener(queues = EmailConstants.MAIL_QUEUE_NAME)
public void handle(MyUser myUser,
org.springframework.messaging.Message message,
Channel channel
){
// getTag 1
MessageHeaders headers = message.getHeaders();
long tag = (long) headers.get(AmqpHeaders.DELIVERY_TAG);
// or 2
long tag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(tag, false); // 只确认单条,非批量,不回队列
// 异常
catch (Exception e) {
channel.basicNack(tag, false, true); //,非批量, 重回队列
// channel.basicReject(tag, requeue);
e.printStackTrace();
}
}
消息可靠性
消息丢失
执行 convertAndSend 时网络断开
- 进行try catch,可能网络失败,重试发送
- 做好日志就,每个消息状态是否被服务器收到都记录
- 定期重发,扫描数据库
消息到达Broker,Broder没有写入磁盘
- 确认回调,修改服务器状态
自动ack下,消费者收到消息,没来得及处理就宕机
- 开启手动ACK,消费成功才移除,异常就noAck并重新入队
消息重复
消息处理完成,回复时宕机,导致重复处理
消息消费失败,再次发送
- 使用幂等性接口
message.getMessageProperties().getRedelivered()
是否重复投递- 防重表
消息积压
- 消费者宕机积压
- 消费者消费能力不足
- 发送者流量太大
- 上线更多消费者
- 上线队列消费服务,将数据存入数据库再处理
消息幂等性
定时任务刚好把刚刚放入数据库的记录(但是重发时间3分钟,一般不会出现)
正常情况下,消费者在消费消息时候,消费完毕后,会发送一个确认信息给消息队列(RabbitMQ是发送一个ACK确认消息),就从消息队列中删除
因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道已经消费过该消息了,再次将该消息分发给其他的消费者
接受message,获取消息tag和msgId,判断redis是否存在msgId,开启手动确认
# 收到确认收获
spring.rabbitmq.listener.simple.acknowledge-mode=manual
@RabbitListener(queues = EmailConstants.MAIL_QUEUE_NAME)
public void handle(MyUser myUser,
org.springframework.messaging.Message message,
Channel channel
){
// or
MyUser myUser1 = (MyUser)message.getPayload();
MessageHeaders headers = message.getHeaders();
long tag = (long) headers.get(AmqpHeaders.DELIVERY_TAG);
String msgId = (String) headers.get("spring_returned_message_correlation");
// redis中重复
HashOperations hashOperations = redisTemplate.opsForHash();
if(hashOperations.entries("email_log").containsKey(msgId)) {
channel.basicAck(tag, false); // 只确认单条
System.out.println("消息已经被消费");
return;
}
// 确认消费
hashOperations.put("email_log", msgId, "ok");
channel.basicAck(tag, false); // 只确认单条
// 异常
catch (Exception e) {
channel.basicNack(tag, false, true); // 重回队列
e.printStackTrace();
}
}
延时队列
场景:未付款的订单,超过十佳后,自动取消订单释放占有物品
方案:schedule定时任务轮询数据库
缺点:消耗系统内存,数据库压力,时间误差(必须要30分钟才能解锁,可能要60分钟才能扫描到)
解决:rabbitmq消息ttl和死信Exchange结合
消息的TTL
消息的存活时间,rabbitmq对队列和消息分别设置TTL
- 对队列设置,就是没有消费者连着的保留时间,超过保留时间,消息就死了,成为死信
- 对队列和消息都设置,取小的,消息如果被路由到不同的队列中,这个消息死亡的时间可能不一样,可以通过设置消息的expiration字段或者x-message-ttl属性设置时间
Dead Letter Exchanges
一个消息满足如下条件 ,会进入死信路由,一个路由可以对应很对队列
- 消息被Consumer拒收,reject方法参数requeue为false,不会再次放到队列中
- 消息的TTL到了(所以不设置监听)
- 队列长度满了,排在前面的消息被丢弃或者扔到死信路由上
dead letter exchange就是普通的exchange,只是在某一个设置dead letter exchange中有消息过期了,就自动触发消息的转发,发送到dead letter exchange中
我们既然可以控制消息一段时间后变成死信,又可以变成死信的消息被路由到某一个指定的交换机,结合二者就变成延时队列
实现
推荐给消息队列设置过期时间,消息过期机制惰性检查,必须要第一个过期了才能拿后面的
设置消息队列,消息死了交给指定的死信队列
设置消息过期时间