主从复制
如果一个broker有master和slave时,就需要将master上的消息复制到slave上,复制的方式有两种
- 「同步复制」:master和slave均写成功,才返回客户端成功。maste挂了以后可以保证数据不丢失,但是同步复制会增加数据写入延迟,降低吞吐量
- 「异步复制」:master写成功,返回客户端成功。拥有较低的延迟和较高的吞吐量,但是当master出现故障后,有可能造成数据丢失
消费端消息重试
顺序消息的重试
对于顺序消息,当消费者消费消息失败后,消息队列RocketMQ版会自动不断地进行消息重试(每次间隔时间为1秒),这时,应用会出现消息消费被阻塞的情况。所以一定要做好监控,避免阻塞现象的发生
「顺序消息消费失败后不会消费下一条消息而是不断重试这条消息,应该是考虑到如果跨过这条消息消费后面的消息会对业务逻辑产生影响」
「顺序消息暂时仅支持集群消费模式,不支持广播消费模式」
无序消息的重试
对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。
「无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息」
「消费时候后,重试的配置方式有如下三种」
- 返回Action.ReconsumeLater(推荐)
- 返回Null
- 抛出异常
- public class MessageListenerImpl implements MessageListener {
- @Override
- public Action consume(Message message, ConsumeContext context) {
- //消息处理逻辑抛出异常,消息将重试。
- doConsumeMessage(message);
- //方式1:返回Action.ReconsumeLater,消息将重试。
- return Action.ReconsumeLater;
- //方式2:返回null,消息将重试。
- return null;
- //方式3:直接抛出异常,消息将重试。
- throw new RuntimeException("Consumer Message exception");
- }
- }
「消费失败后,无需重试的配置方式」
集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回Action.CommitMessage,此后这条消息将不会再重试。
- public class MessageListenerImpl implements MessageListener {
- @Override
- public Action consume(Message message, ConsumeContext context) {
- try {
- doConsumeMessage(message);
- } catch (Throwable e) {
- //捕获消费逻辑中的所有异常,并返回Action.CommitMessage;
- return Action.CommitMessage;
- }
- //消息处理正常,直接返回Action.CommitMessage;
- return Action.CommitMessage;
- }
- }
「消息重试次数」
「RocketMQ默认允许每条消息最多重试16次,每次消费失败发送一条延时消息到重试队列,同一条消息失败一次将延时等级提高一次,然后再放到重试队列。重试16次后如果还没有消费成功,则将消息放到死信队列中。」
「注意:重试队列和死信队列都是按照Consumer Group划分的」
重试队列topic名字:%RETRY% + consumerGroup
死信队列topic名字:%DLQ% + consumerGroup
「为什么重试队列和死信队列要按照Consumer Group来进行划分?」
「因为在RocketMQ的时候使用一定要保持订阅关系一致。即一个Consumer Group订阅的topic和tag要完全一致,不然可能会导致消费逻辑混乱,消息丢失」
如下任意一种情况都表现为订阅关系不一致
- 相同ConsumerGroup下的Consumer实例订阅了不同的Topic。
- 相同ConsumerGroup下的Consumer实例订阅了相同的Topic,但订阅的Tag不一致。