RabbitMQ

学习文档:https://niceseason.github.io/2020/04/18/springboot/#%E4%BA%8C%E3%80%81RabbitMQ

官网:https://www.rabbitmq.com/documentation.html

异步、流量控制(消峰)、解耦

RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现

rocketmq开源版本功能不够完善

AMQP、rabbitMq高性能、高并发、高可用、数据处理量不错、开源、多语言

kafka性能高、但是可能会重发

img

下载

官网:

https://github.com/erlang/otp/releases?page=3

https://www.rabbitmq.com/install-windows.html

软件

https://github.com/erlang/otp/releases/download/OTP-23.3.4.14/otp_win64_23.3.4.14.exe

https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.9.19/rabbitmq-server-3.9.19.exe

(太新的erlang也要更新)

安装目录下 E:\environment\rabbitmq\rabbitmq_server-3.9.19\sbin

port: 15672
rabbitmqctl start_app
rabbitmqctl stop

rabbitmqctl status    //查看当前状态
rabbitmq-plugins enable rabbitmq_management    //开启Web插件
rabbitmq-server start    //启动服务
rabbitmq-server stop    //停止服务
rabbitmq-server restart    //重启服务

# 可视化插件
rabbitmq-plugins enable rabbitmq_management

guest/guest

# 使用命令添加用户并授权
# 添加用户
rabbitmqctl add_user admin admin
# 设置permissions
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
# 设置用户角色
rabbitmqctl set_user_tags admin administrator
# 查看新添加的admin
rabbitmqctl list_users
# 查看用于的权限
rabbitmqctl list_permissions -p /

核心概念

image-20211121233424403

  • Message
    • 消息,消息是不具名的,它由消息头和消息体组成
    • 消息头,包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等
  • Publisher
    • 消息的生产者,也是一个向交换器发布消息的客户端应用程序
  • Exchange
    • 交换器,将生产者消息路由给服务器中的队列
    • 类型有direct(默认),fanout, topic, 和headers,具有不同转发策略
  • Queue
    • 消息队列,保存消息直到发送给消费者
  • Binding
    • 绑定,用于消息队列和交换器之间的关联
  • Connection
    • 网络连接,比如一个TCP连接
  • Consumer
    • 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序
  • Virtual Host
    • 虚拟主机,表示一批交换器、消息队列和相关对象。
    • vhost 是 AMQP 概念的基础,必须在连接时指定
    • RabbitMQ 默认的 vhost 是 /
  • Broker
    • 消息队列服务器实体
  • channel
    • 建立一条连接,有很多通道

运行机制

消息路由

AMQP 中增加了Exchange 和 Binding 的角色, Binding 决定交换器的消息应该发送到那个队列

Exchange 类型

  1. direct

    点对点模式,消息中的路由键(routing key)如果和 Binding 中的 binding
    key 一致, 交换器就将消息发到对应的队列中。

  2. fanout

    广播模式,每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去

  3. topic

    将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。
    识别通配符: # 匹配 0 个或多个单词, *匹配一个单词

初始化

amqp:5672、5671

http,web端口:15672

clustering:25672

4369,25672:erlang发现&集群端口

1883,8883:MQTT协议端口

# for RabbitMQ 3.9, the latest series
docker run \
--ip 192.168.0.9 --net mynet \
-it --rm -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 rabbitmq:3.9-management
docker run \
-it --rm -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 rabbitmq:3.9-management

# for RabbitMQ 3.8,
# https://www.rabbitmq.com/versions.html
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8-management

docker update rabbitmq  --restart=always

队列模式

https://www.rabbitmq.com/getstarted.html

这里写的太烂了

简单队列

点对点,交换机对应一个队列(一个路由)

简单

生产的很多,可能内存溢出

send

public class Send {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("101.227.11.219");
        factory.setUsername("root");
        factory.setPassword("");
        factory.setVirtualHost("/ming");
        factory.setPort(5672);

        try (
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()
        ) {
            channel.queueDeclare(
                    QUEUE_NAME, // 名称
                    false, // 持久化
                    false, // 排他队列,基于连接下,可以访问,不可重名,关闭连接,也会删除
                    /*
                         1.基于连接可见,同一连接下的不同通道可以访问同一连接下的其他排它队列
                         2.基于连接创建,连接下排它队列不可同名
                         3.即使是持久化的,连接关闭则排它队列删除
                         只限于一个客户端发送读取消息场景
                     */
                    false, // 自动删除,如果没有消费者则删除
                    null
            );
            String message = "Hello World!";
            // 交换机、路由、是否持久化、消息内容
            channel.basicPublish(
                    "",
                    QUEUE_NAME,
                    null,
                    message.getBytes(StandardCharsets.UTF_8)
            );
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

recv

public class Recv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("101.227.11.219");
        factory.setUsername("root");
        factory.setPassword("");
        factory.setVirtualHost("/ming");
        factory.setPort(5672);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(
                QUEUE_NAME,
                false,
                false,
                false,
                null
        );

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

轮训

启动多个消费者,做到消费一个队列中的消息

一人接收一个

确定:消费者的能力不一样

// 处理完才接受下一条
int prefetchCount = 1;
channel.basicQos(prefetchCount);
public static Connection getConnection() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("101.227.11.219");
    factory.setUsername("root");
    factory.setPassword("mingyuefusu!");
    factory.setVirtualHost("/ming");
    factory.setPort(5672);
    return factory.newConnection();
}

recv

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
    channel.basicAck(
        delivery.getEnvelope().getDeliveryTag(),
        false // false一条条确认消息
    );
};
channel.basicConsume(
    QUEUE_NAME,
    true, // 手动确认收到消息
    deliverCallback,
    consumerTag -> { }
);

发布订阅/广播 fanout

消息到达一个交换机后,会给绑定的所有队列都发送一样的消息

多个消费者收到同一条消息

img

send

try (
    Connection connection = RabbitMqUtils.getConnection();
    Channel channel = connection.createChannel();
) {
    // 声明广播的交换机
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
    String message = "Hello World!";
    // 只需要放交换机即可
    channel.basicPublish(
        EXCHANGE_NAME,
        "",
        null,
        message.getBytes(StandardCharsets.UTF_8)
    );
    System.out.println(" [x] Sent '" + message);
}

recv

// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 获取广播交换机中的队列
String queue = channel.queueDeclare().getQueue();
// 绑定队列和交换机
channel.queueBind(queue, EXCHANGE_NAME, "");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(
    queue,// 监听的还是队列
    true, // 自动回复
    deliverCallback,
    consumerTag -> { }
);

路由模式 direct

会员才能看到会员专属的信息

缺点:路由key太多难以管理

send

try (
    Connection connection = RabbitMqUtils.getConnection();
    Channel channel = connection.createChannel();
) {
    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String[] argv = {"info", "error", "warn", "debug"};

    for (String severity : argv) {
        String message = "my " + severity;
        // 交换机、路由、是否持久化、消息内容
        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
    }

}

recv

channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();

String[] argv = {"info", "error", "warn", "debug"};

for (String severity : argv) {
    channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" +
                       delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

主题模式

*唯一单词,#0或多个

发送详细,接受通配

匹配不到默认丢弃

send

channel.exchangeDeclare(EXCHANGE_NAME, "topic");

String routingKey = getRouting(argv);
String message = getMessage(argv);
// 交换机 路由 持久化 内容
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));

recv

channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 为什么能获取队列名?如果这个交换机有多个队列呢? 但是这里好像是先有的接收
String queueName = channel.queueDeclare().getQueue();
String[] argv = {"com.#"};
for (String bindingKey : argv) {
    channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" +
                       delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});

RPC

img

客户端和服务端同步

官网:https://www.rabbitmq.com/tutorials/tutorial-six-java.html

client请求携带reply_to(队列名)和correlation_id发送至队列

server消费队列后,使用reply_to将消息发送至队列,client根据correlation_id取出

confirm

生产者的消息不能知道消息是否到达了队列,可以使用事务

channel.txSelect();
channel.txRollback();
  • 普通confirm(同步)

每发送一条消息,调用waitForConfirms()方法,等待服务器端confirm,实际上是一种串行confirm

  • 批量confirm(同步)

每发送一条消息后,waitForConfirmOrDie(),等待服务器端confirm

  • 异步confirm

提供回调方法,服务端confirm一条或多条消息后,client端会回调这个方法

channel.confirmSelect();
channel.waitForConfirms(); // 普通confirm
waitForConfirmOrDie(); // 批量模式,只要有一条没确认就抛异常

send

public class Send {

    private static final String QUEUE_NAME = "async";

    public static void main(String[] temp) throws Exception {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
        try {
            final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
            // 开启确认模式
            channel.confirmSelect();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    if( multiple ) {
                        System.out.println("multiple" + deliveryTag);
                        // 删除 deliveryTag 项标识id ?
                        confirmSet.headSet(deliveryTag + 1L).clear();
                    } else {
                        System.out.println("single" + deliveryTag);
                        confirmSet.headSet(deliveryTag).clear();
                    }
                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    if(multiple) {
                        System.out.println("failed-multiple->" +deliveryTag);
                        confirmSet.headSet(deliveryTag +1L).clear();
                    }  else {
                        System.out.println("failed-single->" +deliveryTag);
                        confirmSet.remove(deliveryTag);
                    }
                }
            });
            while(true) {
                String message = "hello world";
                Long seqNo = channel.getNextPublishSeqNo();
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes("utf-8"));
                confirmSet.add(seqNo);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(null != channel && channel.isOpen()) {
                channel.close();
            }
            if(null != channel && connection.isOpen()) {
                connection.close();
            }
        }
    }
    //..
}

recv

public static void main(String[] argv) throws Exception {

    Connection connection = RabbitMqUtils.getConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(
        QUEUE_NAME,
        false,
        false,
        false,
        null
    );
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
    };
    channel.basicConsume(
        QUEUE_NAME,
        true, // 自动回复
        deliverCallback,
        consumerTag -> { }
    );
}

SpringAMQP

依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <version>2.1.7.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <version>2.1.7.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <version>2.1.7.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.1.7.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

配置

开启使用

@EnableRabbit

配合文件

spring:
# 1 broker接受回调,队列错误回调,消费端接受回调
 rabbitmq:
    host: 101.200.169.229
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-confirm-type: correlated
    publisher-returns: true
    # 抵达队列,异步方式回调
    template:
      mandatory: true
    listener:
      simple:
        acknowledge-mode: manual
# 2
  rabbitmq:
    host: 101.227.11.219
    port: 5672
    username: root
    password: 
    virtual-host: /ming

server:
  port: 8081
@Autowired
AmqpAdmin amqpAdmin;

序列化方式

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyRabbitConfig {
    // 序列化方式,传输Object
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

简单API

@SpringBootTest
@Slf4j
public class Test {
    @Autowired
    private AmqpAdmin amqpAdmin;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 交换机、队列、绑定
    @org.junit.jupiter.api.Test
    public void test() {
        // exchange: durable autoDelete
        DirectExchange hello = new DirectExchange("hello-exchange", true, false);
        amqpAdmin.declareExchange(hello);
        log.info("{} finish", "hello");
        // queue: durable  exclusive autoDelete
        Queue queue = new Queue("hello-queue", true, false, false);
        amqpAdmin.declareQueue(queue);
        // binding
        /*   String destination, 目的地
             Binding.DestinationType destinationType, 类型: 交换机、队列
             String exchange,  交换机
             String routingKey,  路由键
             @Nullable Map<String, Object> arguments
             */
        Binding binding = new Binding("hello-queue", Binding.DestinationType.QUEUE, "hello-exchange", "hello.java", null);
        amqpAdmin.declareBinding(binding);

        // send Msg
        String msg = "hello world!!!";
        rabbitTemplate.convertAndSend("hello-exchange", "hello.java", msg);;
    }
}

发送

OrderEntity orderEntity = new OrderEntity();
orderEntity.setOrderSn(UUID.randomUUID().toString());
orderEntity.setModifyTime(new Date());
// 反馈可以统一放到configuration中
/*
    public MyRabbitConfig(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        initTemplate();
    }
*/
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
    if(ack) {
        System.out.println("发送成功");
    } else {
        System.out.println("发送失败");
    }
});
rabbitTemplate.setReturnCallback((msg, respCode, respText, exchange, routeingKey) -> {
    System.out.println("发送失败" +msg );
});
rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", orderEntity);

Init资源

有listener才会创建

@Configuration
@EnableRabbit // 需要监听的时候要添加
public class RabbitMQConfig {
    @Bean
    public Exchange orderEventExchange() {
        /**
         *   String name,
         *   boolean durable,
         *   boolean autoDelete,
         *   Map<String, Object> arguments
         */
        return new TopicExchange("order-event-exchange", true, false);
    }

    @Bean
    public Queue orderDelayQueue() {
        HashMap<String, Object> paramMap = new HashMap<>();
        paramMap.put("x-dead-letter-exchange", "order-event-exchange");
        paramMap.put("x-dead-letter-routing-key", "order.release.order");
        //  20s
        paramMap.put("x-message-ttl", TimeUnit.SECONDS.toMillis(20L));
        return new Queue("order.delay.queue", true, false, false, paramMap);
    }

    /**
     * 普通队列
     *
     * @return
     */
    @Bean
    public Queue orderReleaseQueue() {
        return new Queue("order.release.order.queue", true, false, false);
    }

    /**
     * 创建订单的binding,order-event-exchange 和 order.delay.queue 通过路由 order.create.order绑定
     * @return
     */
    @Bean
    public Binding orderCreateBinding() {
        /**
         * String destination, 目的地(队列名或者交换机名字)
         * DestinationType destinationType, 目的地类型(Queue、Exhcange)
         * String exchange,
         * String routingKey,
         * Map<String, Object> arguments
         * */
        return new Binding(
                "order.delay.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.create.order",
                null
        );
    }
}

样例2

@Configuration
public class DirectRabbitConfig {
    @Bean
    public Queue rabbitmqDemoDirectQueue() {
        /**
         * 1、name:    队列名称
         * 2、durable: 是否持久化
         * 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
         * 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
         * */
        return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true, false, false);
    }

    @Bean
    public DirectExchange rabbitmqDemoDirectExchange() {
        //Direct交换机
        return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
    }

    @Bean
    public Binding bindDirect() {
        //链式写法,绑定交换机和队列,并设置匹配键
        return BindingBuilder
                //绑定队列
                .bind(rabbitmqDemoDirectQueue())
                //到交换机
                .to(rabbitmqDemoDirectExchange())
                //并设置匹配键
                .with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING);
    }
}

监听接收

如果已经创建好了队列,将不会覆盖,要删除队列

@Component
@RabbitListener(queues = {"order.release.order.queue"})
// 使用queuesToDeclare属性,如果不存在则会创建队列
// @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
public class OrderCloseListener {

    @RabbitHandler
    public void getCloseOrderEntity(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
        System.out.println("接受到关闭订单" +orderEntity);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

发送模式

from:https://developer.aliyun.com/article/769883#slide-13

直连、主题通过 routerKey 绑定交换机和队列

广播会发送到绑定了交换机的所有队列

header exchange 通过头部的信息和绑定的头部信息进行对比,相当于将路由键放在了头部

直连
@Configuration
public class DirectRabbitConfig {
    @Bean
    public Queue rabbitmqDemoDirectQueue() {
        /**
         * 1、name:    队列名称
         * 2、durable: 是否持久化
         * 3、exclusive: 排他队列,基于连接下,可以访问,不可重名,关闭连接就会删除
         * 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
         * */
        return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true, false, false);
    }

    @Bean
    public DirectExchange rabbitmqDemoDirectExchange() {
        // Direct交换机
        return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
    }

    @Bean
    public Binding bindDirect() {
        // 链式写法,绑定交换机和队列,并设置匹配键
        return BindingBuilder
            // 绑定队列
            .bind(rabbitmqDemoDirectQueue())
            // 到交换机
            .to(rabbitmqDemoDirectExchange())
            // 并设置匹配键
            .with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING);
    }
}

发送

rabbitTemplate.convertAndSend(
    RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, 
    RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, 
    dataHashMap
);

接收

@RabbitListener(
    queuesToDeclare = 
        @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC)
)
广播模式

通过将两个队列绑定到相同的交换机上,启动项目时就创建交换机和队列

不需要路由键

声明

@Component
public class DirectRabbitConfig implements BeanPostProcessor {
    @Resource
    private RabbitAdmin rabbitAdmin;

    @Bean
    public Queue fanoutExchangeQueueA() {
        // 队列A
        return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A, true, false, false);
    }

    @Bean
    public Queue fanoutExchangeQueueB() {
        // 队列B
        return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_B, true, false, false);
    }

    @Bean
    public FanoutExchange rabbitmqDemoFanoutExchange() {
        // 创建 FanoutExchange 类型交换机
        return new FanoutExchange(
            RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, 
            true, 
            false
        );
    }

    @Bean
    public Binding bindFanoutA() {
        //队列A绑定到FanoutExchange交换机
        return BindingBuilder
            .bind(fanoutExchangeQueueA())
            .to(rabbitmqDemoFanoutExchange());
    }

    @Bean
    public Binding bindFanoutB() {
        //队列B绑定到FanoutExchange交换机
        return BindingBuilder.bind(fanoutExchangeQueueB()).to(rabbitmqDemoFanoutExchange());
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        //启动项目即创建交换机和队列
        rabbitAdmin.declareExchange(rabbitmqDemoFanoutExchange());
        rabbitAdmin.declareQueue(fanoutExchangeQueueB());
        rabbitAdmin.declareQueue(fanoutExchangeQueueA());
        return null;
    }
}

发送

@Resource
private RabbitTemplate rabbitTemplate;
// 发布消息,就可以不指定路由
@Override
public String sendMsgByFanoutExchange(String msg) throws Exception {
    Map<String, Object> message = getMessage(msg);
    try {
        rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, "", message);
        return "ok";
    } catch (Exception e) {
        e.printStackTrace();
        return "error";
    }
}

接受

@RabbitListener(
    queuesToDeclare = 
        @Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A)
)
@RabbitHandler
主题模式

通过”*” 、 “#”通配符,路由到对应的队列

* 符号:有且只匹配一个词

# 符号:匹配一个或多个词

一个带路由匹配规则的消息到来,交换机通过路由规则,发送到所有对应的队列

可以实现直连direct、广播fanout模式的,感觉主要是看路由键

header exchange

不是通过路由键,而是通过请求头中的键值

队列绑定时,能规定头部信息,x-match中的值为any/all,对消息的头部信息 和 队列绑定的头部信息进行匹配

@Component
public class RabbitConfig implements BeanPostProcessor {
    @Bean
    public Queue headersQueueA() {
        return new Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A, true, false, false);
    }

    @Bean
    public Binding bindHeadersA() {
        Map<String, Object> map = new HashMap<>();
        map.put("key_one", "java");
        map.put("key_two", "rabbit");
        // 全匹配
        return BindingBuilder.bind(headersQueueA())
                .to(rabbitmqDemoHeadersExchange())
                .whereAll(map).match();
        // 部分匹配
        return BindingBuilder.bind(headersQueueB())
                .to(rabbitmqDemoHeadersExchange())
                .whereAny(map).match();
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        rabbitAdmin.declareExchange(rabbitmqDemoHeadersExchange());
        rabbitAdmin.declareQueue(headersQueueA());
        rabbitAdmin.declareQueue(headersQueueB());
        return null;
    }
}

发送

MessageProperties messageProperties = new MessageProperties();
// 消息持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setContentType("UTF-8");
// 添加消息
messageProperties.getHeaders().putAll(headerDataMap);
Message message = new Message(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend(
    RabbitMQConfig.HEADERS_EXCHANGE_DEMO_NAME, 
    null,
    message
);

接受

@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B))
public void process(Message message) throws Exception {
    MessageProperties messageProperties = message.getMessageProperties();
    String contentType = messageProperties.getContentType();
    System.out.println("队列[" + RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B + "]收到消息:" + new String(message.getBody(), contentType));
}

邮箱发送

依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-mail</artifactId>
    <version>2.1.7.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-thymeleaf</artifactId>
    <version>2.1.7.RELEASE</version>
</dependency>

配置

server:
  port: 8083
spring:
  mail:
    host: smtp.qq.com
    port: 465
    username: 284908631@qq.com
    password:  #
    #test-connection: 是否测试链接
    protocol: smtp
    default-encoding: UTF-8
    properties.mail.smtp.ssl.enable: true
#    properties:
#      mail.smtp.socketFactory.fallback : true
#      mail.smtp.starttls.enable: true
  rabbitmq:
    host: 
    username: root
    password: 
    virtual-host: /ming
@Bean
public Queue queue() {
    return new Queue("com.ming.mail");
}

模板

<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="UTF-8">
    <title>欢迎</title>
</head>
<body>
    欢迎<span style="color:orangered" th:text="${name}"></span>加入
</body>
</html>

接收

@Component
public class MailRecv {
    private static Logger logger = LoggerFactory.getLogger(MailRecv.class);
    @Autowired
    private JavaMailSender javaMailSender;
    @Autowired
    private MailProperties mailProperties;
    @Autowired
    private TemplateEngine templateEngine;

    @RabbitListener(queues = "com.ming.mail")
    public void handle(MyUser myUser) {
        MimeMessage msg = javaMailSender.createMimeMessage();
        MimeMessageHelper helper = new MimeMessageHelper(msg);
        try{
            helper.setFrom(mailProperties.getUsername());
            helper.setTo(myUser.getEmail());
            helper.setSubject("入职欢迎");
            helper.setSentDate(new Date());
            Context context = new Context();
            context.setVariable("name", myUser.getName());
            // 模板生成文本
            String text = templateEngine.process("mail", context);
            helper.setText(text, true);
            javaMailSender.send(msg);
        } catch (MessagingException e) {
            e.printStackTrace();
            logger.error("email error");
        }
    }
}

发送

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void sendMail() {
    MyUser myUser = new MyUser();
    myUser.setAge(20L);
    myUser.setName("mingyue");
    myUser.setEmail("284908631@qq.com");
    rabbitTemplate.convertAndSend("com.ming.mail", myUser);
}

可靠性传递【发送】

image-20211122132641224

# 已经过时
spring.rabbitmq.publisher-confirms=ture
# 高版本,发送端信息到达brock
sprin.rabbitmq.publisher-confirm-type=correlated
# 配置config,以下实现有
# 失败回调,发送端信息抵达队列
spring.rabbitmq.publisher-returns=true
# 抵达队列,异步方式回调
spring.rabbitmq.template.mandatory=true
    publisher-confirm-type: correlated
    publisher-returns: true
    # 抵达队列,异步方式回调
    template:
      mandatory: true

生产端可靠性投递

成功发出

成功接收

生产者能收到应答

1.消息落库,进行标记,mq回调改变状态为已发送,定时任务将未成功发送的重新发送

image-20210812141717920

2.消费者发送消息到队列,告诉回调服务已经消费完成,生产者延迟再次投递,回调服务进行确认,如果没有id相同的消费完成记录,则发起RPC重新发送

image-20210812142227372

实现

spring:
  rabbitmq:
    host: 101.227.11.219
    username: #
    password: #
    virtual-host: /ming
    # 确认回调
    publisher-confirm-type: correlated
    # 失败回调
    publisher-returns: true

config

设置 rabbitTemplate 成功和失败回调

@Configuration
public class RabbitMQConfig {
    public final Logger logger = LoggerFactory.getLogger(RabbitMQConfig.class);
    @Autowired
    private CachingConnectionFactory cachingConnectionFactory;
    @Autowired
    private MailLogService mailLogService;

    // 也可以使用@PostConstruct,对rabbitTemplate进行改造
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
        // broker确认接受到信息,回调
        rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
            String msgId = data.getId();
            if(ack) {
                logger.info("{}->发送成功", msgId);
                MailLog mailLog = new MailLog();
                mailLog.setStatus(1);
                mailLog.setMsgId(msgId);
                mailLogService.updateById(mailLog);
            } else {
                logger.error("{}->发送失败", msgId);
            }
        });
        /**
         * 队列接受消息失败,回调
         * msg:消息主题
         * respCode:响应码
         * respText:响应描述
         * exchange:交换机
         * routingKey: 路由器
         */
        rabbitTemplate.setReturnCallback((msg, respCode, respText, exchange, routeingKey) -> {
            logger.error("{}->发送到queue失败", msg.getBody());
        });
        return rabbitTemplate;
    }

    @Bean
    public Queue queue() {
        return new Queue(EmailConstants.MAIL_QUEUE_NAME);
    }
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(EmailConstants.MAIL_EXCHANGE_NAME);
    }
    @Bean
    public Binding binding() {
        return BindingBuilder
                .bind(queue())
                .to(topicExchange())
                .with(EmailConstants.MAIL_ROUTING_KEY);
    }
}

发送

准备 UUID 作为 msgID ,在 converAndSend中 new CorrelationData(msgId) 发送给队列

@Test
public void sendMail() {
    System.out.println("???");
    String msgID = UUID.randomUUID().toString();
    MailLog mailLog = new MailLog();
    mailLog.setMsgId(msgID);
    mailLog.setEid(0);
    mailLog.setStatus(0);
    mailLog.setRouteKey(EmailConstants.MAIL_ROUTING_KEY);
    mailLog.setExchange(EmailConstants.MAIL_EXCHANGE_NAME);
    mailLog.setCount(0);
    mailLog.setTryTime(LocalDateTime.now().plusMinutes(EmailConstants.MSG_TIMEOUT));
    mailLog.setCreateTime(LocalDateTime.now());
    mailLog.setUpdateTime(LocalDateTime.now());
    mailLogService.save(mailLog);

    MyUser myUser = new MyUser();
    myUser.setAge(20L);
    myUser.setName("mingyue");
    myUser.setEmail("284908631@qq.com");
    rabbitTemplate.convertAndSend(
        EmailConstants.MAIL_EXCHANGE_NAME,
        EmailConstants.MAIL_ROUTING_KEY,
        myUser,
        new CorrelationData(msgID) // 消息的唯一id
    );
}

定时

数据库中查找状态为 未发送0且到了重试时间的,如果其尝试次数小于3则重新发送给mq

@Scheduled(cron = "0/10 * * * * ?")
public void mail() {
    List<MailLog> list = mailLogService.list(
        new QueryWrapper<MailLog>()
        .eq("status", 0)
        .lt("tryTime", LocalDateTime.now())
    );
    System.out.println("????");
    list.forEach(System.out::println);
    System.out.println("????");
    list.forEach(mailLog -> {
        if (3 <= mailLog.getCount()) {
            MailLog mailLog1 = new MailLog();
            mailLog1.setStatus(2);
            mailLog1.setMsgId(mailLog.getMsgId());
            mailLogService.updateById(
                mailLog1
            );
        }
        MailLog mailLog2 = new MailLog();
        mailLog2.setTryTime(LocalDateTime.now().plusMinutes(EmailConstants.MSG_TIMEOUT));
        mailLog2.setUpdateTime(LocalDateTime.now());

        mailLog2.setMsgId(mailLog.getMsgId());
        mailLog2.setCount(mailLog.getCount() + 1);
        mailLogService.updateById(mailLog2);
        System.out.println("eid" + mailLog.getEid());

        MyUser myUser = new MyUser();
        myUser.setAge(20L);
        myUser.setName("mingyue");
        myUser.setEmail("284908631@qq.com");
        rabbitTemplate.convertAndSend(
            EmailConstants.MAIL_EXCHANGE_NAME,
            EmailConstants.MAIL_ROUTING_KEY,
            myUser,
            new CorrelationData(mailLog.getMsgId())
        );
    });
}

可靠性传递【接受】

队列发送信息到接收端,如果接收端宕机,channel也还是自动回复,消息就全部没有了

# 收到确认收获
spring.rabbitmq.listener.simple.acknowledge-mode=manual

tag自增

@Component
@RabbitListener(queues = EmailConstants.MAIL_QUEUE_NAME)
public void handle(MyUser myUser,
                   org.springframework.messaging.Message message,
                   Channel channel
                  ){
    // getTag 1
    MessageHeaders headers = message.getHeaders();
    long tag = (long) headers.get(AmqpHeaders.DELIVERY_TAG);
    // or 2
    long tag = message.getMessageProperties().getDeliveryTag();

    channel.basicAck(tag, false); // 只确认单条,非批量,不回队列

    // 异常
    catch (Exception e) {
        channel.basicNack(tag, false, true); //,非批量, 重回队列
        // channel.basicReject(tag, requeue);
        e.printStackTrace();
    }
}

消息可靠性

消息丢失

执行 convertAndSend 时网络断开

  • 进行try catch,可能网络失败,重试发送
  • 做好日志就,每个消息状态是否被服务器收到都记录
  • 定期重发,扫描数据库

image-20211125233014837

消息到达Broker,Broder没有写入磁盘

  • 确认回调,修改服务器状态

自动ack下,消费者收到消息,没来得及处理就宕机

  • 开启手动ACK,消费成功才移除,异常就noAck并重新入队

消息重复

消息处理完成,回复时宕机,导致重复处理

消息消费失败,再次发送

  • 使用幂等性接口
  • message.getMessageProperties().getRedelivered()是否重复投递
  • 防重表

消息积压

  • 消费者宕机积压
  • 消费者消费能力不足
  • 发送者流量太大
    • 上线更多消费者
    • 上线队列消费服务,将数据存入数据库再处理

消息幂等性

定时任务刚好把刚刚放入数据库的记录(但是重发时间3分钟,一般不会出现)

正常情况下,消费者在消费消息时候,消费完毕后,会发送一个确认信息给消息队列(RabbitMQ是发送一个ACK确认消息),就从消息队列中删除

因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道已经消费过该消息了,再次将该消息分发给其他的消费者

接受message,获取消息tag和msgId,判断redis是否存在msgId,开启手动确认

# 收到确认收获
spring.rabbitmq.listener.simple.acknowledge-mode=manual
@RabbitListener(queues = EmailConstants.MAIL_QUEUE_NAME)
public void handle(MyUser myUser,
                   org.springframework.messaging.Message message,
                   Channel channel
                  ){
    // or
    MyUser myUser1 = (MyUser)message.getPayload();
    MessageHeaders headers = message.getHeaders();
    long tag = (long) headers.get(AmqpHeaders.DELIVERY_TAG);
    String msgId = (String) headers.get("spring_returned_message_correlation");

    // redis中重复
    HashOperations hashOperations = redisTemplate.opsForHash();
    if(hashOperations.entries("email_log").containsKey(msgId)) {
        channel.basicAck(tag, false); // 只确认单条
        System.out.println("消息已经被消费");
        return;
    }

    // 确认消费
    hashOperations.put("email_log", msgId, "ok");
    channel.basicAck(tag, false); // 只确认单条


    // 异常
    catch (Exception e) {
        channel.basicNack(tag, false, true); // 重回队列
        e.printStackTrace();
    }
}

延时队列

场景:未付款的订单,超过十佳后,自动取消订单释放占有物品

方案:schedule定时任务轮询数据库

缺点:消耗系统内存,数据库压力,时间误差(必须要30分钟才能解锁,可能要60分钟才能扫描到)

解决:rabbitmq消息ttl和死信Exchange结合

消息的TTL

消息的存活时间,rabbitmq对队列和消息分别设置TTL

  • 对队列设置,就是没有消费者连着的保留时间,超过保留时间,消息就死了,成为死信
  • 对队列和消息都设置,取小的,消息如果被路由到不同的队列中,这个消息死亡的时间可能不一样,可以通过设置消息的expiration字段或者x-message-ttl属性设置时间

Dead Letter Exchanges

一个消息满足如下条件 ,会进入死信路由,一个路由可以对应很对队列

  • 消息被Consumer拒收,reject方法参数requeue为false,不会再次放到队列中
  • 消息的TTL到了(所以不设置监听)
  • 队列长度满了,排在前面的消息被丢弃或者扔到死信路由上

dead letter exchange就是普通的exchange,只是在某一个设置dead letter exchange中有消息过期了,就自动触发消息的转发,发送到dead letter exchange中

我们既然可以控制消息一段时间后变成死信,又可以变成死信的消息被路由到某一个指定的交换机,结合二者就变成延时队列

实现

推荐给消息队列设置过期时间,消息过期机制惰性检查,必须要第一个过期了才能拿后面的

设置消息队列,消息死了交给指定的死信队列

image-20211125143503428

设置消息过期时间

image-20211125150840842

使用

image-20211125152438248