消息队列可靠性保障方案

🎯 面试题:如何保证消息队列不丢消息?

消息丢失是 MQ 最严重的事故——支付成功消息丢了意味着用户付了钱但订单没创建,库存扣减消息丢了意味着超卖。这三节课覆盖了消息从生产者到 broker 到消费者的全链路可靠性。


一、消息丢失的三种场景

消息生命周期中的三个丢消息节点:

Producer ──①丢──→ Broker ──②丢──→ Consumer
                    ↓
              ① 生产者发送失败
              ② Broker 存储失败
              ③ 消费者处理失败

二、生产者端可靠性

发送模式对比

// 方式一:发完即忘(fire-and-forget)—— 最高效,最不可靠
kafkaTemplate.send(topic, message);
// 不关心是否到达 broker,网络抖动就丢了

// 方式二:同步发送 —— 可靠,性能差
try {
    RecordMetadata metadata = kafkaTemplate.send(topic, message).get(3, TimeUnit.SECONDS);
    System.out.println("发送成功,offset=" + metadata.getOffset());
} catch (ExecutionException e) {
    // 发送失败,broker 没收到
    handleFailure(e);
}

// 方式三:异步发送 + 回调 —— 推荐
kafkaTemplate.send(topic, message, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception e) {
        if (e != null) {
            log.error("发送失败", e);
            // 记录失败,重试
            retrySend(topic, message);
        } else {
            log.info("发送成功 offset={}", metadata.getOffset());
        }
    }
});

Producer 端重试策略

spring:
  kafka:
    producer:
      retries: 3                        # 重试次数
      retry-backoff-ms: 1000            # 重试间隔
      acks: all                         # 所有副本确认
      properties:
        enable.idempotence: true        # 开启幂等发送

三、Broker 端可靠性

Kafka 可靠性核心参数

参数 说明
acks all 所有 ISR 副本确认后才算发送成功
min.insync.replicas 2 最小同步副本数
replication.factor 3 分区副本数
unclean.leader.election.enable false 不允许非 ISR 副本成为 leader

刷盘策略

# Kafka 配置
log.flush.interval.messages=10000   # 消息达到多少条时刷盘
log.flush.interval.ms=1000           # 最多等多久刷盘

# 生产建议:不要用同步刷盘,用异步批量刷盘
# Kafka 默认就是异步刷盘,配合 acks=all 在性能和可靠性间取得平衡

Broker 端防丢配置完整示例

spring:
  kafka:
    producer:
      acks: all                      # 最强可靠性
      retries: 3
      properties:
        enable.idempotence: true     # 开启幂等
        max.in.flight.requests.per.connection: 5
        acks: all
    consumer:
      auto-offset-reset: earliest    # 最早未消费消息开始消费
      enable-auto-commit: false       # 手动提交 offset

四、消费者端可靠性

手动提交 offset

@KafkaListener(topics = "payment-success", groupId = "order-service")
public void handlePaymentSuccess(ConsumerRecord<String, PaymentMessage> record, Acknowledgment ack) {
    PaymentMessage message = record.value();

    try {
        // 1. 业务处理
        orderService.confirmOrder(message.getOrderId());

        // 2. 业务处理成功后才提交 offset
        ack.acknowledge();
        log.info("消费成功 offset={}", record.offset());

    } catch (Exception e) {
        // 3. 业务处理失败,不提交 offset,消息会被重新消费
        log.error("消费失败 offset={}", record.offset(), e);
        // 可以在这里记录失败日志,便于排查
        throw e; // 重新抛出,让 Kafka 自动重试
    }
}

⚠️ 先提交 offset 还是先处理业务?

❌ 先提交 offset,后处理业务:
  ack.submit() → 记录 offset=100
  处理业务时崩溃
  → 重启后从 offset=101 开始消费
  → offset=100 的消息被永久跳过 ❌

✅ 先处理业务,后提交 offset(正确):
  处理业务 → 成功
  ack.submit() → 记录 offset=100
  → 最多重复消费,不会漏消息 ✅

五、事务消息(最强可靠性)

RocketMQ 事务消息

// Step 1: 发送半消息(Half Message)—— 对消费者不可见
@Transactional
public void createOrder(OrderCreateRequest request) {
    // 1. 发送半消息,事务开始
    TransactionMQProducer producer = rocketMQTemplate.getProducer();
    Message message = MessageBuilder
        .withPayload(request)
        .setHeader("orderId", request.getOrderId())
        .build();

    // 发送半消息,并注册事务执行器
    TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
        "order-topic",
        message,
        new OrderTransactionListener(orderService, request) // 事务回调
    );
}

// Step 2: 事务执行器(本地事务 + 回查)
@Service
@Slf4j
public class OrderTransactionListener implements TransactionListener {

    @Autowired
    private OrderMapper orderMapper;

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        OrderCreateRequest request = (OrderCreateRequest) arg;
        try {
            // 本地事务:创建订单(和半消息在同一个事务中)
            orderMapper.createOrder(request);
            return LocalTransactionState.COMMIT_MESSAGE; // 提交,消息对消费者可见
        } catch (Exception e) {
            log.error("本地事务失败,回滚", e);
            return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚,消息被丢弃
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // RocketMQ 自动回查:事务超时未提交,调用此方法
        String orderId = msg.getUserProperties("orderId");
        Order order = orderMapper.selectByOrderId(orderId);

        if (order != null) {
            return LocalTransactionState.COMMIT_MESSAGE; // 订单存在,提交
        } else {
            return LocalTransactionState.ROLLBACK_MESSAGE; // 订单不存在,回滚
        }
    }
}

事务消息原理

正常流程:
  Producer → 发送 Half Message → Broker 存储(不可消费)
  Producer → 执行本地事务(创建订单)
  Producer → 提交 Transaction Commit → 消息对消费者可见
  Consumer → 消费消息

异常流程(本地事务失败):
  Producer → 发送 Half Message
  Producer → 本地事务失败
  Producer → 提交 Transaction Rollback → 消息被丢弃

超时流程(RocketMQ 自动回查):
  Producer → 发送 Half Message
  Producer → 本地事务执行中...(超时)
  Broker → 调用 checkLocalTransaction(回查接口)
  → 根据业务表判断事务是否成功
  → 决定提交或回滚

六、消息幂等消费

为什么需要幂等?

MQ 重投机制导致消息重复:

Consumer 处理消息成功,发送 ACK 时网络抖动
→ Broker 没收到 ACK
→ Broker 重投消息
→ Consumer 再次处理 → 重复消费 ❌

场景:
  - 支付成功消息重复 → 重复发货
  - 库存扣减消息重复 → 库存扣多次
  - 积分发放消息重复 → 积分多发

幂等方案一:数据库唯一约束

// 消息处理记录表
@TableName("message_dedup")
public class MessageDedup {
    private String messageId;      // 消息唯一 ID
    private Integer status;        // 0-处理中 1-成功 2-失败
    private LocalDateTime createTime;
}

@Service
public class OrderService {

    @Autowired
    private MessageDedupMapper dedupMapper;

    public void handlePaymentSuccess(PaymentMessage msg) {
        String messageId = msg.getMessageId();

        // 1. 尝试插入幂等记录
        int inserted = dedupMapper.insertIfNotExists(messageId);
        if (inserted == 0) {
            log.info("消息已处理过,跳过: {}", messageId);
            return;
        }

        // 2. 执行业务
        processOrder(msg);

        // 3. 更新状态
        dedupMapper.updateStatus(messageId, 1);
    }
}

幂等方案二:Redis 去重

public void handleMessage(String messageId) {
    String key = "msg:consumed:" + messageId;

    // SETNX 原子操作
    Boolean success = redisTemplate.opsForValue()
        .setIfAbsent(key, "1", Duration.ofHours(24));

    if (!Boolean.TRUE.equals(success)) {
        log.info("消息已消费,跳过: {}", messageId);
        return;
    }

    // 业务处理...
}

七、消息顺序性

为什么顺序重要?

下单流程:
  ① 创建订单(msg.order=1)
  ② 扣减库存(msg.order=2)
  ③ 扣减积分(msg.order=3)

乱序后果:
  ② 先执行 → 库存不够了
  ① 再执行 → 订单创建失败

支付流程:
  ① 扣款成功(余额 1000 → 900)
  ② 积分发放(发 100 积分)

乱序后果:
  ② 先执行 → 用户凭空多了积分

Kafka 分区有序

// 生产者:指定相同 key 的消息发送到同一分区
kafkaTemplate.send("order-topic", orderId, orderMessage);
// 同一 orderId 的消息一定到同一分区,分区内有序

// 消费者:单分区单线程消费
@KafkaListener(topics = "order-topic", containerFactory = "kafkaListenerContainerFactory")
public void consume(ConsumerRecord<String, OrderMessage> record) {
    // 同一分区的消息顺序消费
    processOrder(record.value());
}

八、死信队列(DLQ)

什么是死信?

消息进入死信的几种情况:
1. 消费者消费失败,超过最大重试次数
2. 消息过期(TTL)
3. 消息大小超过队列限制
4. 队列满了,新消息被丢弃

RocketMQ 死信队列配置

spring:
  rocketmq:
    consumer:
      # 死信队列配置
      max-reconsume-times: 3   # 最大重试次数
// 死信队列名称自动生成:DLQ.topic-name
// 消费死信队列消息,处理异常情况
@RocketMQMessageListener(
    topic = "DLQ-order-topic",
    consumerGroup = "dlq-handler-group"
)
public class DlqHandler {
    public void handle(MessageExt msg) {
        log.error("死信消息: {}", new String(msg.getBody()));
        // 人工处理:记录日志、发告警、人工补偿
    }
}

Kafka 死信主题

spring:
  kafka:
    listener:
      retry:
        initial-interval: 1000
        max-attempts: 3
        multiplier: 2.0
      # 超过重试次数后进入死信主题
      default-re_topic-recovery: true

九、高频面试题

Q1: Kafka 如何保证不丢消息?

从三个层面配置:① 生产者端:acks=all + retries=3 + enable.idempotence=true,确保消息发送到所有 ISR 副本;② Broker 端:replication.factor=3 + min.insync.replicas=2 + unclean.leader.election.enable=false;③ 消费者端:enable.auto.commit=false + 手动提交 offset,在业务处理完成后才提交。

Q2: 先提交 offset 还是先处理业务?

正确做法是先处理业务,后提交 offset。如果先提交 offset 再处理业务,处理过程中宕机会导致消息被跳过(永久丢失)。先处理业务后提交 offset,最多出现重复消费(可接受),不会出现消息丢失。

Q3: RocketMQ 事务消息的原理是什么?

RocketMQ 采用半消息机制:发送事务消息时,先发送一条对消费者不可见的半消息;然后执行本地事务;最后根据本地事务结果提交或回滚半消息。如果本地事务超时未响应,RocketMQ 会自动调用回查接口查询事务状态。核心思想是把「发送消息」和「本地事务」绑定为原子操作,确保要么全部成功,要么全部失败。

Q4: 如何保证消息消费的幂等性?

两种主流方案:① 数据库唯一约束:创建一张消息处理记录表,以消息 ID 为主键,消费前先尝试插入,插入成功才执行业务;② Redis 去重:以消息 ID 为 key 执行 SETNX,操作成功才处理。这两种方案都能保证消息只被处理一次。

Q5: 如何处理顺序消息?

以 Kafka 为例:① 生产者端用相同业务 key 发送消息(如 orderId),保证同一订单的消息路由到同一分区;② 消费者端单线程消费单个分区;③ 如果需要跨分区有序,可以在消费端用内存队列保证顺序。需要注意的是,分区有序 ≠ 全局有序,需要根据业务场景选择。