首页 » java

Rocketmq 配置类(RocketmqAutoConfiguration)中注册多个 RocketMQTemplate

   发表于:java评论 (0)   热度:5

Spring Boot 配置类中注册多个 RocketMQTemplate 的完整代码示例(每个 group 一个 bean,可通过 @Qualifier/@Resource 注入):

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;

@Configuration
public class RocketmqMultiGroupConfig {

    @Value("${spring.rocketmq.name-server}")
    private String nameServer;

    @Value("${spring.rocketmq.producer.group}")
    private String groupA;

    @Value("${custom.rocketmq.groupB}")
    private String groupB;

    @Bean("rocketMQTemplateA")
    public RocketMQTemplate rocketMQTemplateA() {
        DefaultMQProducer producer = new DefaultMQProducer(groupA);
        producer.setNamesrvAddr(nameServer);
        producer.setVipChannelEnabled(false); // 推荐关闭 VIP 通道

        
        // 并发优化参数
        producer.setSendMsgTimeout(5000);                  // 发送超时时间(ms)
        producer.setRetryTimesWhenSendFailed(3);           // 同步发送失败重试次数
        producer.setRetryTimesWhenSendAsyncFailed(2);      // 异步发送失败重试次数
        producer.setCompressMsgBodyOverHowmuch(4096);      // 超过4KB自动压缩
        producer.setDefaultTopicQueueNums(8);              // 默认队列数,适当增加提升并发
        producer.setMaxMessageSize(1024 * 1024 * 2);       // 最大消息体2MB
        // 可选:异步回调线程池,适合高并发异步发送
        // producer.setAsyncSenderExecutor(Executors.newFixedThreadPool(8));

        
        RocketMQTemplate template = new RocketMQTemplate();
        template.setProducer(producer);
        return template;
    }

    @Bean("rocketMQTemplateB")
    public RocketMQTemplate rocketMQTemplateB() {
        DefaultMQProducer producer = new DefaultMQProducer(groupB);
        producer.setNamesrvAddr(nameServer);
        producer.setVipChannelEnabled(false);
        RocketMQTemplate template = new RocketMQTemplate();
        template.setProducer(producer);
        return template;
    }
}

配置文件示例(application.yml):

spring:
  rocketmq:
    name-server: 127.0.0.1:9876
    producer:
      group: groupA
custom:
  rocketmq:
    groupB: groupB

业务代码注入方式:

@Service
public class MyService {
    @Resource(name = "rocketMQTemplateA")
    private RocketMQTemplate rocketMQTemplateA;

    @Resource(name = "rocketMQTemplateB")
    private RocketMQTemplate rocketMQTemplateB;

    // 使用不同 group 的 template 发送消息
}

这样就可以为每个 group 注册独立的 RocketMQTemplate,灵活注入和使用。

在 Spring Boot 集成 RocketMQ(使用 rocketmq-spring-boot-starter)时,RocketMQTemplate 内部会自动管理 DefaultMQProducer 的启动和关闭,无需手动调用 producer.start()。

具体原因如下:

  • Spring Boot 自动装配会在容器启动时自动调用 producer.start(),并在容器关闭时自动 shutdown,保证生命周期一致。
  • 如果你手动调用 producer.start(),可能会和 Spring 的自动管理冲突,导致重复启动或关闭异常。
  • 官方推荐做法是只负责 new 和 set 属性,启动和销毁交给 Spring 管理。

只有在你完全手动管理 producer 时(不交给 Spring 管理),才需要自己调用 start()。在自动装配场景下,不需要手动 start()。

息体自动压缩(setCompressMsgBodyOverHowmuch)值的设置优化标准

RocketMQ 的消息体自动压缩(setCompressMsgBodyOverHowmuch)参数,设置的是“消息体超过多少字节时自动压缩”。标准如下:

  • 官方默认值是 4KB(4096 字节),即消息体超过 4KB 就会自动压缩。
  • 4KB 是大多数场景下的推荐值,既能节省带宽,又不会对小消息造成额外 CPU 开销。
  • 如果你的消息体普遍较大(如图片、文件、长文本),可以适当调低阈值(如 2KB),让更多消息被压缩。
  • 如果你的消息体普遍很小(如几十字节),可以适当调高阈值(如 8KB),避免不必要的压缩。

实际标准建议:

  • 普通业务场景,保持默认 4096 即可。
  • 高并发、大消息场景,可根据带宽和 CPU 压力做压力测试后微调。

总结:

  • 4096(4KB)是 RocketMQ 官方推荐和默认值,绝大多数场景都适用。
  • 具体可根据你的消息体大小分布和系统性能需求调整。

 

(。・v・。)
喜欢这篇文章吗?欢迎分享到你的微博、QQ群,并关注我们的微博,谢谢支持。