Rocketmq @PostConstruct 和 ApplicationRunner 预热都有那些场景应用说明
Rocketmq @PostConstruct 和 ApplicationRunner 预热都有那些场景应用说明
一、@PostConstruct 预热场景
适用场景:
- 只依赖当前 bean 的资源初始化或连接预热。
- 只需在 bean 初始化后立即执行,无需等待整个 Spring Boot 应用启动完成。
- 预热逻辑简单,不依赖其他 bean 的生命周期。
典型用法:
- MQ 连接预热(如 RocketMQTemplate)
- Redis、数据库连接池初始化
- 本地缓存加载
- 单个 bean 的资源准备
代码示例:
@Component
public class RocketMQPreheat {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@PostConstruct
public void preheat() {
// 发送一条预热消息,建立连接
rocketMQTemplate.convertAndSend("preheat_topic", "preheat");
}
}
二、ApplicationRunner/CommandLineRunner 预热场景
适用场景:
- 需要在整个 Spring Boot 应用启动完成后执行预热或初始化。
- 预热逻辑复杂,依赖多个 bean 或全局环境。
- 需要批量初始化、数据加载、全局任务等。
- 需要读取启动参数或环境变量。
典型用法:
- 批量缓存预加载(依赖多个服务/DAO)
- 全局数据同步、分布式任务调度
- 依赖外部服务(如注册中心、配置中心)初始化
- 复杂的多 bean 预热
代码示例 1:
@Component
public class GlobalPreheatRunner implements ApplicationRunner {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private CacheService cacheService;
@Override
public void run(ApplicationArguments args) {
// MQ 预热
rocketMQTemplate.convertAndSend("preheat_topic", "preheat");
// 批量缓存预加载
cacheService.loadAll();
// 其他全局初始化逻辑
}
}
代码示例2: @Bean 方式 @Bean public ApplicationRunner rocketMQFullPreheatRunner() { return args -> { // 预热逻辑 }; }
- 这是用 @Bean 方法注册一个 ApplicationRunner 实例,Spring 启动后会自动执行 run 方法里的代码。
- 适合简单的、无需单独类名的预热或初始化逻辑。
- 代码更简洁,适合只用一次的小型预热任务。
@Component
public class RocketMQFullPreheatConfig {
// 假设多个生产者实例(比如不同分组)
private final DefaultMQProducer producer1; // 业务生产者1
private final DefaultMQProducer producer2; // 业务生产者2
public RocketMQFullPreheatConfig(DefaultMQProducer producer1, DefaultMQProducer producer2) {
this.producer1 = producer1;
this.producer2 = producer2;
}
@Bean
public ApplicationRunner rocketMQFullPreheatRunner() {
return args -> {
// 预热所有生产者
preheatProducer(producer1);
preheatProducer(producer2);
};
}
// 通用预热方法:覆盖所有发送方式
private void preheatProducer(DefaultMQProducer producer) throws Exception {
if (!producer.isStarted()) {
producer.start();
}
String preheatTopic = "ROCKETMQ_PREHEAT";
int preheatTimes = 10;
// 1. 预热同步发送
System.out.println("开始预热[" + producer.getProducerGroup() + "]同步发送");
for (int i = 0; i < preheatTimes; i++) {
Message syncMsg = new Message(preheatTopic, "SYNC", ("Sync-" + i).getBytes());
producer.send(syncMsg);
}
// 2. 预热异步发送
System.out.println("开始预热[" + producer.getProducerGroup() + "]异步发送");
CountDownLatch asyncLatch = new CountDownLatch(preheatTimes);
for (int i = 0; i < preheatTimes; i++) {
Message asyncMsg = new Message(preheatTopic, "ASYNC", ("Async-" + i).getBytes());
producer.send(asyncMsg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
asyncLatch.countDown();
}
@Override
public void onException(Throwable e) {
asyncLatch.countDown();
System.err.println("异步预热失败:" + e.getMessage());
}
});
}
asyncLatch.await(5, TimeUnit.SECONDS); // 等待异步预热完成
// 3. 预热单向发送(若用到)
System.out.println("开始预热[" + producer.getProducerGroup() + "]单向发送");
for (int i = 0; i < preheatTimes; i++) {
Message onewayMsg = new Message(preheatTopic, "ONEWAY", ("Oneway-" + i).getBytes());
producer.sendOneway(onewayMsg);
}
// 4. 提前拉取业务Topic路由(可选,进一步优化)
producer.fetchPublishMessageQueues("你的业务Topic1");
producer.fetchPublishMessageQueues("你的业务Topic2");
System.out.println("[" + producer.getProducerGroup() + "]预热完成");
}
}
代码示例 3:implements ApplicationRunner 方式 @Component public class GlobalPreheatRunner implements ApplicationRunner { @Override public void run(ApplicationArguments args) {// 预热逻辑 } }
- 这是定义一个实现 ApplicationRunner 接口的类,并用 @Component 注册为 bean。
- 适合复杂的、需要复用或有较多依赖的初始化逻辑。
- 可以有更多成员变量、依赖注入、方法拆分,结构更清晰。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@Component
public class GlobalPreheatRunner implements ApplicationRunner {
private final DefaultMQProducer orderProducer;
private final DefaultMQProducer logProducer;
public GlobalPreheatRunner(
@Qualifier("orderProducer") DefaultMQProducer orderProducer,
@Qualifier("logProducer") DefaultMQProducer logProducer) {
this.omsProducer = omsProducer;
this.logProducer = logProducer;
}
@Override
public void run(ApplicationArguments args) throws Exception {
System.out.println("? 开始执行RocketMQ全局预热...");
// 预热所有生产者实例
preheatProducer(orderProducer);
preheatProducer(logProducer);
System.out.println("✅ RocketMQ全局预热完成");
}
/**
* 通用预热方法:覆盖所有消息发送方式
*/
private void preheatProducer(DefaultMQProducer producer) throws Exception {
String producerGroup = producer.getProducerGroup();
System.out.println("? 开始预热生产者组: " + producerGroup);
// 确保生产者已启动
if (!producer.isStarted()) {
producer.start();
System.out.println(" 启动生产者: " + producerGroup);
}
String preheatTopic = "ROCKETMQ_PREHEAT_TOPIC";
int preheatTimes = 10;
try {
// 1. 预热同步发送(最常用的发送方式)
System.out.println(" ? 预热同步发送...");
for (int i = 0; i < preheatTimes; i++) {
Message syncMsg = new Message(preheatTopic, "SYNC_TAG",
("Sync-Preheat-Message-" + i + "-" + System.currentTimeMillis()).getBytes());
producer.send(syncMsg);
}
// 2. 预热异步发送
System.out.println(" ⚡ 预热异步发送...");
CountDownLatch asyncLatch = new CountDownLatch(preheatTimes);
for (int i = 0; i < preheatTimes; i++) {
Message asyncMsg = new Message(preheatTopic, "ASYNC_TAG",
("Async-Preheat-Message-" + i).getBytes());
producer.send(asyncMsg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
asyncLatch.countDown();
}
@Override
public void onException(Throwable e) {
System.err.println(" 异步发送预热异常: " + e.getMessage());
asyncLatch.countDown();
}
});
}
// 等待异步发送完成(最多等待10秒)
boolean asyncCompleted = asyncLatch.await(10, TimeUnit.SECONDS);
if (!asyncCompleted) {
System.err.println(" ⚠️ 异步预热未在指定时间内完成");
}
// 3. 预热单向发送(适用于日志等不关心结果的场景)
System.out.println(" ? 预热单向发送...");
for (int i = 0; i < preheatTimes; i++) {
Message onewayMsg = new Message(preheatTopic, "ONEWAY_TAG",
("Oneway-Preheat-Message-" + i).getBytes());
producer.sendOneway(onewayMsg);
}
// 4. 预热业务Topic路由获取(避免首次业务调用时的路由发现延迟)
System.out.println(" ?️ 预热业务Topic路由...");
preheatBusinessTopicRoutes(producer);
System.out.println(" ✅ 生产者组 " + producerGroup + " 预热完成");
} catch (Exception e) {
System.err.println(" ❌ 生产者组 " + producerGroup + " 预热失败: " + e.getMessage());
// 这里可以根据需要决定是否抛出异常
// 如果是非关键生产者,可以选择记录日志但不中断整个预热流程
}
}
/**
* 预热业务Topic路由信息
*/
private void preheatBusinessTopicRoutes(DefaultMQProducer producer) {
// 替换为实际的业务Topic名称
String[] businessTopics = {"ORDER_TOPIC", "LOG_TOPIC", "NOTIFY_TOPIC"};
for (String topic : businessTopics) {
try {
producer.fetchPublishMessageQueues(topic);
System.out.println(" ✅ 已预热Topic路由: " + topic);
} catch (Exception e) {
System.err.println(" ⚠️ 预热Topic路由失败 " + topic + ": " + e.getMessage());
// 路由预热失败不影响主要功能,继续执行
}
}
}
}
三、对比总结
- @PostConstruct:适合单 bean、简单依赖、早期预热。
- ApplicationRunner:适合全局、复杂依赖、批量任务、需要完整上下文的预热。
实际开发中,简单预热用 @PostConstruct,复杂/全局预热用 ApplicationRunner/CommandLineRunner。
(。・v・。)
喜欢这篇文章吗?欢迎分享到你的微博、QQ群,并关注我们的微博,谢谢支持。