Rocketmq 配置类(RocketmqAutoConfiguration)中注册多个 RocketMQTemplate
Spring Boot 配置类中注册多个 RocketMQTemplate 的完整代码示例(每个 group 一个 bean,可通过 @Qualifier/@Resource 注入):
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
@Configuration
public class RocketmqMultiGroupConfig {
@Value("${spring.rocketmq.name-server}")
private String nameServer;
@Value("${spring.rocketmq.producer.group}")
private String groupA;
@Value("${custom.rocketmq.groupB}")
private String groupB;
@Bean("rocketMQTemplateA")
public RocketMQTemplate rocketMQTemplateA() {
DefaultMQProducer producer = new DefaultMQProducer(groupA);
producer.setNamesrvAddr(nameServer);
producer.setVipChannelEnabled(false); // 推荐关闭 VIP 通道
// 并发优化参数
producer.setSendMsgTimeout(5000); // 发送超时时间(ms)
producer.setRetryTimesWhenSendFailed(3); // 同步发送失败重试次数
producer.setRetryTimesWhenSendAsyncFailed(2); // 异步发送失败重试次数
producer.setCompressMsgBodyOverHowmuch(4096); // 超过4KB自动压缩
producer.setDefaultTopicQueueNums(8); // 默认队列数,适当增加提升并发
producer.setMaxMessageSize(1024 * 1024 * 2); // 最大消息体2MB
// 可选:异步回调线程池,适合高并发异步发送
// producer.setAsyncSenderExecutor(Executors.newFixedThreadPool(8));
RocketMQTemplate template = new RocketMQTemplate();
template.setProducer(producer);
return template;
}
@Bean("rocketMQTemplateB")
public RocketMQTemplate rocketMQTemplateB() {
DefaultMQProducer producer = new DefaultMQProducer(groupB);
producer.setNamesrvAddr(nameServer);
producer.setVipChannelEnabled(false);
RocketMQTemplate template = new RocketMQTemplate();
template.setProducer(producer);
return template;
}
}
配置文件示例(application.yml):
spring:
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: groupA
custom:
rocketmq:
groupB: groupB
业务代码注入方式:
@Service
public class MyService {
@Resource(name = "rocketMQTemplateA")
private RocketMQTemplate rocketMQTemplateA;
@Resource(name = "rocketMQTemplateB")
private RocketMQTemplate rocketMQTemplateB;
// 使用不同 group 的 template 发送消息
}
这样就可以为每个 group 注册独立的 RocketMQTemplate,灵活注入和使用。
在 Spring Boot 集成 RocketMQ(使用 rocketmq-spring-boot-starter)时,RocketMQTemplate 内部会自动管理 DefaultMQProducer 的启动和关闭,无需手动调用 producer.start()。
具体原因如下:
- Spring Boot 自动装配会在容器启动时自动调用 producer.start(),并在容器关闭时自动 shutdown,保证生命周期一致。
- 如果你手动调用 producer.start(),可能会和 Spring 的自动管理冲突,导致重复启动或关闭异常。
- 官方推荐做法是只负责 new 和 set 属性,启动和销毁交给 Spring 管理。
只有在你完全手动管理 producer 时(不交给 Spring 管理),才需要自己调用 start()。在自动装配场景下,不需要手动 start()。
息体自动压缩(setCompressMsgBodyOverHowmuch)值的设置优化标准
RocketMQ 的消息体自动压缩(setCompressMsgBodyOverHowmuch)参数,设置的是“消息体超过多少字节时自动压缩”。标准如下:
- 官方默认值是 4KB(4096 字节),即消息体超过 4KB 就会自动压缩。
- 4KB 是大多数场景下的推荐值,既能节省带宽,又不会对小消息造成额外 CPU 开销。
- 如果你的消息体普遍较大(如图片、文件、长文本),可以适当调低阈值(如 2KB),让更多消息被压缩。
- 如果你的消息体普遍很小(如几十字节),可以适当调高阈值(如 8KB),避免不必要的压缩。
实际标准建议:
- 普通业务场景,保持默认 4096 即可。
- 高并发、大消息场景,可根据带宽和 CPU 压力做压力测试后微调。
总结:
- 4096(4KB)是 RocketMQ 官方推荐和默认值,绝大多数场景都适用。
- 具体可根据你的消息体大小分布和系统性能需求调整。
(。・v・。)
喜欢这篇文章吗?欢迎分享到你的微博、QQ群,并关注我们的微博,谢谢支持。