2、RabbitMQ高级特性
一、消息可靠性投递
在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ提供了两种方式用来控制消息的投递可靠性模式。RabbitMQ整个消息投递的路径为:
生产者(producer)--->RabbitMQ Broker(RabbitMQ Server)--->交换机(exchange)--->队列(queue)--->消费者(consumer)
1.1、confirm 确认模式
消息从生产者(producer)->交换机(exchange)则会返回一个 confirmCallback。
设置ConnectionFactory的publisher-confirms="true" 开启确认模式
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="81.71.140.7"
port="${rabbitmq.port}"
username="bq123"
password="bq123"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true"
/>
<!--消息可靠性投递(生产端)-->
<rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>
<rabbit:direct-exchange name="test_exchange_confirm">
<rabbit:bindings>
<rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
//测试Confirm模式
@Test
public void testConfirm() {
//定义回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 相关配置信息
* @param ack exchange交换机是否成功收到了消息。true 成功,false代表失败
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm方法被执行了....");
//ack 为 true表示 消息已经到达交换机
if (ack) {
//接收成功
System.out.println("接收成功消息" + cause);
} else {
//接收失败
System.out.println("接收失败消息" + cause);
//做一些处理,让消息再次发送。
}
}
});
//进行消息发送
rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message Confirm...");
//进行睡眠操作,否则会出现接收失败的情况
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
}
//批量发送消息,让消费者每次拉去指定的数量
@Test
public void testQos(){
for (int i = 0; i < 10; i++) {
// 发送消息
rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
}
}
}1.2、return 退回模式
消息从交换机(exchange)-->队列(queue)投递失败则会返回一个returnCallback。
设置ConnectionFactory的publisher-returns="true" 开启退回模式
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="81.71.140.7"
port="${rabbitmq.port}"
username="bq123"
password="bq123"
virtual-host="${rabbitmq.virtual-host}"
publisher-returns="true"
/>@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
//测试return模式
@Test
public void testReturn() {
//设置交换机处理失败消息的模式为true的时候,消息投递不了队列时,会将消息重新返回给生产者。
rabbitTemplate.setMandatory(true);
//定义回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* @param message 消息对象
* @param replyCode 错误码
* @param replyText 错误信息
* @param exchange 交换机
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return 执行了....");
System.out.println("message:"+message);
System.out.println("replyCode:"+replyCode);
System.out.println("replyText:"+replyText);
System.out.println("exchange:"+exchange);
System.out.println("routingKey:"+routingKey);
//处理业务逻辑。。。。。
}
});
//进行消息发送
rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message return...");
//进行睡眠操作
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
}
//批量发送消息,让消费者每次拉去指定的数量
@Test
public void testQos(){
for (int i = 0; i < 10; i++) {
// 发送消息
rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
}
}
}我们将利用这两个Callback控制消息的可靠性投递。
二、Consumer Ack
Ack为Acknowledge缩写,承认(权威、地位);承认(属实);告知收悉。
Consumer Ack(消费者确认机制)
在rabbit:listener-container标签中设置acknowledge属性,设置ack方式none:自动确认,manual:手动确认
<!--定义监听器容器 acknowledge="manual":手动签收 prefetch="1":每次抓取多少条消息 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" > <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener> </rabbit:listener-container>
有三种确认方式:
2.1、自动确认:acknowledge="none"
当消息一旦被消费者(Consumer)接收到,则自动确认收到,并将相应message从RabbitMQ的消息缓存中移除。但是在实际业务处理中,很可能消息接收到了,但是业务处理出现异常,那么该消息就会丢失。
2.2、手动确认:acknowledge="manual"
设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
@Component
public class AckListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//1、获取消息的id
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//2、获取消息
System.out.println("message:"+new String(message.getBody()));
//3、进行业务处理
System.out.println("=====进行业务处理====");
//模拟出现异常
int i = 5/0;
//4、进行消息签收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
//拒绝签收
//第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
channel.basicNack(deliveryTag, true, true);
}
}
}根据异常情况确认:acknowledge="auto",(这种方式一般不用)
三、消费者(消费端)限流
在<rabbit:listener-container> 中配置prefetch属性设置消费端一次拉取多少消息。
消费端的确认模式一定为手动确认。acknowledge="manual"
<!--定义监听器容器 acknowledge="manual":手动签收 prefetch="1000":每次抓取多少条消息 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1000" > <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener> </rabbit:listener-container>
四、TTL
TTL 全称 Time To Live(存活时间/过期时间)。
当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
<!--ttl--> <rabbit:queue name="test_queue_ttl" id="test_queue_ttl"> <!--设置queue的参数--> <rabbit:queue-arguments> <!--x-message-ttl指队列的过期时间(毫秒)--> <entry key="x-message-ttl" value="100000" value-type="java.lang.Integer"></entry> </rabbit:queue-arguments> </rabbit:queue> <!--ttl与某个交换机绑定--> <rabbit:topic-exchange name="test_exchange_ttl" > <rabbit:bindings> <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange>
设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在消费时,会单独判断这一消息是否过期。
如果两者都进行了设置,以时间短的为准。
五、死信队列
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
消息成为死信的三种情况:
1. 队列消息长度到达限制。2. 消费者拒接消费消息{basicNack()/basicReject()}。并且不把消息重新放入原目标队列(requeue=false时)。3. 原队列存在消息过期设置,消息到达超时时间未被消费。
<!-- 死信队列: 1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx) 2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx) 3. 正常队列绑定死信交换机 设置两个参数: * x-dead-letter-exchange:死信交换机名称 * x-dead-letter-routing-key:发送给死信交换机的routingkey --> <!--1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)--> <rabbit:queue name="test_queue_dlx" id="test_queue_dlx"> <!--3. 正常队列绑定死信交换机--> <rabbit:queue-arguments> <!--3.1 x-dead-letter-exchange:死信交换机名称--> <entry key="x-dead-letter-exchange" value="exchange_dlx" /> <!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey--> <entry key="x-dead-letter-routing-key" value="dlx.hehe" /> <!--4.1 设置队列的过期时间 ttl--> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" /> <!--4.2 设置队列的长度限制 max-length --> <entry key="x-max-length" value="10" value-type="java.lang.Integer" /> </rabbit:queue-arguments> </rabbit:queue> <rabbit:topic-exchange name="test_exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <!--2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)--> <rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue> <rabbit:topic-exchange name="exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange>
六、延迟队列
延迟队列指消息进入队列后,可以被延迟一定时间,再进行消费。
在RabbitMQ中并未提供延迟队列功能。但是可以使用:TTL+死信队列 组合实现延迟队列的效果。
版权声明
非特殊说明,本文由Zender原创或收集发布,欢迎转载。
上一篇:1、RabbitMQ基础 下一篇:Redis面试
ZENDER

发表评论:
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。