掌握RocketMQ消费者高级方案,真正理解Offset存储、消费端重试、幂等策略

前言

RocketMQ 是一款强大的消息中间件,具备高可用性、高性能和可伸缩性等特点。

Offset存储机制

Offset是消费者用来标识消息消费详情的一个重要概念。在RocketMQ中,每个消费者都会维护一个消费详情Offset,表示消费到了哪一条消息。Offset存储可以分为两种方式:本地存储和远程存储。本地存储通常是保存在消费者端的本地文件中,远程存储通常是保存在 RocketMQ 的 Broker 服务器上。
Offset 存储的目的是确保即使消费者重启,也可以从上次的消费细节来看继续消费。
RocketMQ 通过OffsetStore来管理消费详情 Offset。通常,我们可以使用LocalFileOffsetStore来本地存储 Offset。以下是一个使用 RocketMQ Java 客户端的示例:

import org.apache.rocketmq.client.consumer.DefaultMQConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.concurrent.TimeUnit;

public class OffsetStorageExample {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQConsumer consumer = new DefaultMQConsumer("consumer_group");
        consumer.setNamesrvAddr("rocketmq-nameserver-ip:9876");
        // 设置消费进度存储方式为本地文件
        consumer.setOffsetStore(new LocalFileOffsetStore());
        consumer.subscribe("TopicTest", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
            for (MessageExt msg : msgs) {
                // 处理消息逻辑
                System.out.printf("Receive New Messages: %s %n", msg);
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();
        System.out.println("Consumer Started");

        TimeUnit.SECONDS.sleep(3600);
        consumer.shutdown();
    }
}

消费端重试

消息中间件通常会保证消息的可靠投递,但消费端在处理消息时可能会遇到异常,例如网络问题、业务异常等。为了保证消息不会丢失,RocketMQ 允许消费者开启消息重试机制。当消费者处理消息失败时,RocketMQ 将消息重新投递给消费者,直到达到最大重试次数统计。

consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
    for (MessageExt msg : msgs) {
        try {
            // 处理消息逻辑
            int result = processMessage(msg);
            if (result == 1) {
                // 消息处理成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } else {
                // 消息处理失败,RocketMQ 会自动进行重试
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        } catch (Exception e) {
            // 异常处理,RocketMQ 会自动进行重试
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

幂等策略

幂等是指对同一操作的多次执行具有相同的结果,不会产生额外的后果。在消息处理中,幂等性非常重要,因为重试机制可能导致同一消息被多次处理消费者需要实现幂等策略来确保处理消息的结果是一致的。

// 假设消息中有一个唯一的标识字段 messageKey
public int processMessage(MessageExt msg) {
    String messageKey = msg.getKeys();
    // 检查消息是否已经处理过,如果已处理,返回 1 表示成功
    if (isMessageProcessed(messageKey)) {
        return 1;
    }
    // 处理消息的业务逻辑
    // 处理成功后,标记消息为已处理
    markMessageAsProcessed(messageKey);
    return 1;
}
// 判断消息是否已经处理过
private boolean isMessageProcessed(String messageKey) {
    // 根据业务逻辑检查消息是否已经处理过
    // 如果已处理过,返回 true;否则,返回 false
}
// 标记消息为已处理
private void markMessageAsProcessed(String messageKey) {
    // 根据业务逻辑标记消息为已处理
}

Spring Boot 集成 RocketMQ:

要在 Spring Boot 项目中集成 RocketMQ,您需要添加相关的依赖,配置 RocketMQ 生产者和消费者,并编写消息处理逻辑。以下是一个简单的 Spring Boot 集成 RocketMQ 的示例:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>最新版本</version>
</dependency>

配置生产者:
在application.properties或application.yml中配置RocketMQ制作者的相关信息:

rocketmq.producer.group=your_producer_group
rocketmq.name-server=rocketmq-nameserver-ip:9876
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class MessageProducerService {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    public void sendMessage(String topic, String content) {
        Message<String> message = MessageBuilder.withPayload(content).build();
        rocketMQTemplate.send(topic, message);
    }
}

消费者配置:

rocketmq.consumer.group=your_consumer_group
rocketmq.name-server=rocketmq-nameserver-ip:9876
rocketmq.consumer.topics=TopicTest
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "TopicTest", consumerGroup = "your_consumer_group")
public class MessageConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        // 处理消息的业务逻辑
        System.out.println("Received message: " + message);
    }
}
掌握RocketMQ消费者高级方案,真正理解Offset存储、消费端重试、幂等策略

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

滚动到顶部