2、RabbitMQ高级特性

烟雨 4年前 (2021-08-03) 阅读数 509 #分布式消息队列

一、消息可靠性投递

在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ提供了两种方式用来控制消息的投递可靠性模式。RabbitMQ整个消息投递的路径为:
生产者(producer)--->RabbitMQ Broker(RabbitMQ  Server)--->交换机(exchange)--->队列(queue)--->消费者(consumer)

1.1、confirm 确认模式

消息从生产者(producer)->交换机(exchange)则会返回一个 confirmCallback。
设置ConnectionFactory的publisher-confirms="true" 开启确认模式
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="81.71.140.7"
                           port="${rabbitmq.port}"
                           username="bq123"
                           password="bq123"
                           virtual-host="${rabbitmq.virtual-host}"
                           publisher-confirms="true"
                           />

<!--消息可靠性投递(生产端)-->
<rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>

<rabbit:direct-exchange name="test_exchange_confirm">
    <rabbit:bindings>
        <rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {

    @Autowired
    private RabbitTemplate  rabbitTemplate;

    //测试Confirm模式
    @Test
    public void testConfirm() {

         //定义回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             *
             * @param correlationData 相关配置信息
             * @param ack   exchange交换机是否成功收到了消息。true 成功,false代表失败
             * @param cause 失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm方法被执行了....");
                //ack 为  true表示 消息已经到达交换机
                if (ack) {
                    //接收成功
                    System.out.println("接收成功消息" + cause);
                } else {
                    //接收失败
                    System.out.println("接收失败消息" + cause);
                    //做一些处理,让消息再次发送。
                }
            }
        });

        //进行消息发送
        rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message Confirm...");

        //进行睡眠操作,否则会出现接收失败的情况
        try {
            Thread.sleep(5000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

     //批量发送消息,让消费者每次拉去指定的数量
     @Test
     public void  testQos(){
         for (int i = 0; i < 10; i++) {
             // 发送消息
             rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
         }
     }
}

1.2、return 退回模式

消息从交换机(exchange)-->队列(queue)投递失败则会返回一个returnCallback。
设置ConnectionFactory的publisher-returns="true" 开启退回模式
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="81.71.140.7"
                           port="${rabbitmq.port}"
                           username="bq123"
                           password="bq123"
                           virtual-host="${rabbitmq.virtual-host}"
                           publisher-returns="true"
                           />
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {

    @Autowired
    private RabbitTemplate  rabbitTemplate;

    //测试return模式
    @Test
    public void testReturn() {
        //设置交换机处理失败消息的模式为true的时候,消息投递不了队列时,会将消息重新返回给生产者。
        rabbitTemplate.setMandatory(true);

        //定义回调
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * @param message   消息对象
             * @param replyCode 错误码
             * @param replyText 错误信息
             * @param exchange  交换机
             * @param routingKey 路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("return 执行了....");

                System.out.println("message:"+message);
                System.out.println("replyCode:"+replyCode);
                System.out.println("replyText:"+replyText);
                System.out.println("exchange:"+exchange);
                System.out.println("routingKey:"+routingKey);
                
                //处理业务逻辑。。。。。
            }
        });
        
        //进行消息发送
        rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message return...");

        //进行睡眠操作
        try {
            Thread.sleep(5000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

     //批量发送消息,让消费者每次拉去指定的数量
     @Test
     public void  testQos(){
         for (int i = 0; i < 10; i++) {
             // 发送消息
             rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
         }
     }
}
我们将利用这两个Callback控制消息的可靠性投递。

二、Consumer Ack

Ack为Acknowledge缩写,承认(权威、地位);承认(属实);告知收悉。
Consumer Ack(消费者确认机制)
在rabbit:listener-container标签中设置acknowledge属性,设置ack方式none:自动确认,manual:手动确认
<!--定义监听器容器
   acknowledge="manual":手动签收
   prefetch="1":每次抓取多少条消息
-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" >
    <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener>
</rabbit:listener-container>
有三种确认方式:

2.1、自动确认:acknowledge="none"

当消息一旦被消费者(Consumer)接收到,则自动确认收到,并将相应message从RabbitMQ的消息缓存中移除。但是在实际业务处理中,很可能消息接收到了,但是业务处理出现异常,那么该消息就会丢失。

2.2、手动确认:acknowledge="manual"

设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
@Component
public class AckListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        //1、获取消息的id
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
            //2、获取消息
            System.out.println("message:"+new String(message.getBody()));
            //3、进行业务处理
            System.out.println("=====进行业务处理====");
            //模拟出现异常
            int  i = 5/0;
            //4、进行消息签收
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            //拒绝签收
            //第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
            channel.basicNack(deliveryTag, true, true);

        }
    }
}
根据异常情况确认:acknowledge="auto",(这种方式一般不用)

三、消费者(消费端)限流

在<rabbit:listener-container> 中配置prefetch属性设置消费端一次拉取多少消息。
消费端的确认模式一定为手动确认。acknowledge="manual"
<!--定义监听器容器
   acknowledge="manual":手动签收
   prefetch="1000":每次抓取多少条消息
-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1000" >
    <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener>
</rabbit:listener-container>

四、TTL

TTL 全称 Time To Live(存活时间/过期时间)。
当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
<!--ttl-->
<rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
    <!--设置queue的参数-->
    <rabbit:queue-arguments>
        <!--x-message-ttl指队列的过期时间(毫秒)-->
        <entry key="x-message-ttl" value="100000" value-type="java.lang.Integer"></entry>
    </rabbit:queue-arguments>
</rabbit:queue>
<!--ttl与某个交换机绑定-->
<rabbit:topic-exchange name="test_exchange_ttl" >
    <rabbit:bindings>
        <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
    </rabbit:bindings>
</rabbit:topic-exchange>
设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在消费时,会单独判断这一消息是否过期。
如果两者都进行了设置,以时间短的为准。

五、死信队列

死信队列,英文缩写:DLX  。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
消息成为死信的三种情况:
1. 队列消息长度到达限制。
2. 消费者拒接消费消息{basicNack()/basicReject()}。并且不把消息重新放入原目标队列(requeue=false时)。
3. 原队列存在消息过期设置,消息到达超时时间未被消费。
<!--
    死信队列:
       1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
       2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
       3. 正常队列绑定死信交换机
             设置两个参数:
                 * x-dead-letter-exchange:死信交换机名称
                 * x-dead-letter-routing-key:发送给死信交换机的routingkey
-->

<!--1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)-->
<rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
   <!--3. 正常队列绑定死信交换机-->
    <rabbit:queue-arguments>
        <!--3.1 x-dead-letter-exchange:死信交换机名称-->
        <entry key="x-dead-letter-exchange" value="exchange_dlx" />
        <!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey-->
        <entry key="x-dead-letter-routing-key" value="dlx.hehe" />
        <!--4.1 设置队列的过期时间 ttl-->
        <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
        <!--4.2 设置队列的长度限制 max-length -->
        <entry key="x-max-length" value="10" value-type="java.lang.Integer" />
    </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_dlx">
    <rabbit:bindings>
        <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
    </rabbit:bindings>
</rabbit:topic-exchange>

<!--2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)-->
<rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
<rabbit:topic-exchange name="exchange_dlx">
    <rabbit:bindings>
        <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
    </rabbit:bindings>
</rabbit:topic-exchange>

image.png

六、延迟队列

延迟队列指消息进入队列后,可以被延迟一定时间,再进行消费。
在RabbitMQ中并未提供延迟队列功能。但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

image.png

版权声明

非特殊说明,本文由Zender原创或收集发布,欢迎转载。

上一篇:1、RabbitMQ基础 下一篇:Redis面试

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

作者文章
热门