首页 » java

Rocketmq @PostConstruct 和 ApplicationRunner 预热都有那些场景应用说明

   发表于:java评论 (0)   热度:16

Rocketmq @PostConstruct 和 ApplicationRunner 预热都有那些场景应用说明

一、@PostConstruct 预热场景

适用场景:

  • 只依赖当前 bean 的资源初始化或连接预热。
  • 只需在 bean 初始化后立即执行,无需等待整个 Spring Boot 应用启动完成
  • 预热逻辑简单,不依赖其他 bean 的生命周期。

典型用法:

  1. MQ 连接预热(如 RocketMQTemplate)
  2. Redis、数据库连接池初始化
  3. 本地缓存加载
  4. 单个 bean 的资源准备

代码示例:

@Component
public class RocketMQPreheat {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @PostConstruct
    public void preheat() {
        // 发送一条预热消息,建立连接
        rocketMQTemplate.convertAndSend("preheat_topic", "preheat");
    }
}

二、ApplicationRunner/CommandLineRunner 预热场景

适用场景:

  • 需要在整个 Spring Boot 应用启动完成后执行预热或初始化
  • 预热逻辑复杂,依赖多个 bean 或全局环境。
  • 需要批量初始化、数据加载、全局任务等。
  • 需要读取启动参数或环境变量。

典型用法:

  1. 批量缓存预加载(依赖多个服务/DAO)
  2. 全局数据同步、分布式任务调度
  3. 依赖外部服务(如注册中心、配置中心)初始化
  4. 复杂的多 bean 预热

代码示例 1:

@Component
public class GlobalPreheatRunner implements ApplicationRunner {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @Autowired
    private CacheService cacheService;

    @Override
    public void run(ApplicationArguments args) {
        // MQ 预热
        rocketMQTemplate.convertAndSend("preheat_topic", "preheat");
        // 批量缓存预加载
        cacheService.loadAll();
        // 其他全局初始化逻辑
    }
}

代码示例2: @Bean 方式  @Bean public ApplicationRunner rocketMQFullPreheatRunner() { return args -> { // 预热逻辑 }; }  

 

  • 这是用 @Bean 方法注册一个 ApplicationRunner 实例,Spring 启动后会自动执行 run 方法里的代码。
  • 适合简单的、无需单独类名的预热或初始化逻辑。
  • 代码更简洁,适合只用一次的小型预热任务。
@Component
public class RocketMQFullPreheatConfig {
    // 假设多个生产者实例(比如不同分组)
    private final DefaultMQProducer producer1; // 业务生产者1
    private final DefaultMQProducer producer2; // 业务生产者2

    public RocketMQFullPreheatConfig(DefaultMQProducer producer1, DefaultMQProducer producer2) {
        this.producer1 = producer1;
        this.producer2 = producer2;
    }

    @Bean
    public ApplicationRunner rocketMQFullPreheatRunner() {
        return args -> {
            // 预热所有生产者
            preheatProducer(producer1);
            preheatProducer(producer2);
        };
    }

    // 通用预热方法:覆盖所有发送方式
    private void preheatProducer(DefaultMQProducer producer) throws Exception {
        if (!producer.isStarted()) {
            producer.start();
        }
        String preheatTopic = "ROCKETMQ_PREHEAT";
        int preheatTimes = 10;

        // 1. 预热同步发送
        System.out.println("开始预热[" + producer.getProducerGroup() + "]同步发送");
        for (int i = 0; i < preheatTimes; i++) {
            Message syncMsg = new Message(preheatTopic, "SYNC", ("Sync-" + i).getBytes());
            producer.send(syncMsg);
        }

        // 2. 预热异步发送
        System.out.println("开始预热[" + producer.getProducerGroup() + "]异步发送");
        CountDownLatch asyncLatch = new CountDownLatch(preheatTimes);
        for (int i = 0; i < preheatTimes; i++) {
            Message asyncMsg = new Message(preheatTopic, "ASYNC", ("Async-" + i).getBytes());
            producer.send(asyncMsg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    asyncLatch.countDown();
                }
                @Override
                public void onException(Throwable e) {
                    asyncLatch.countDown();
                    System.err.println("异步预热失败:" + e.getMessage());
                }
            });
        }
        asyncLatch.await(5, TimeUnit.SECONDS); // 等待异步预热完成

        // 3. 预热单向发送(若用到)
        System.out.println("开始预热[" + producer.getProducerGroup() + "]单向发送");
        for (int i = 0; i < preheatTimes; i++) {
            Message onewayMsg = new Message(preheatTopic, "ONEWAY", ("Oneway-" + i).getBytes());
            producer.sendOneway(onewayMsg);
        }

        // 4. 提前拉取业务Topic路由(可选,进一步优化)
        producer.fetchPublishMessageQueues("你的业务Topic1");
        producer.fetchPublishMessageQueues("你的业务Topic2");

        System.out.println("[" + producer.getProducerGroup() + "]预热完成");
    }
}

代码示例 3:implements ApplicationRunner 方式 @Component public class GlobalPreheatRunner implements ApplicationRunner { @Override  public void run(ApplicationArguments args) {// 预热逻辑 } }

  • 这是定义一个实现 ApplicationRunner 接口的类,并用 @Component 注册为 bean。
  • 适合复杂的、需要复用或有较多依赖的初始化逻辑。
  • 可以有更多成员变量、依赖注入、方法拆分,结构更清晰。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@Component
public class GlobalPreheatRunner implements ApplicationRunner {
    
    private final DefaultMQProducer orderProducer;
    private final DefaultMQProducer logProducer;

    public GlobalPreheatRunner(
            @Qualifier("orderProducer") DefaultMQProducer orderProducer,
            @Qualifier("logProducer") DefaultMQProducer logProducer) {
        this.omsProducer = omsProducer;
        this.logProducer = logProducer;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        System.out.println("? 开始执行RocketMQ全局预热...");
        
        // 预热所有生产者实例
        preheatProducer(orderProducer);
        preheatProducer(logProducer);
        
        System.out.println("✅ RocketMQ全局预热完成");
    }

    /**
     * 通用预热方法:覆盖所有消息发送方式
     */
    private void preheatProducer(DefaultMQProducer producer) throws Exception {
        String producerGroup = producer.getProducerGroup();
        System.out.println("? 开始预热生产者组: " + producerGroup);

        // 确保生产者已启动
        if (!producer.isStarted()) {
            producer.start();
            System.out.println("  启动生产者: " + producerGroup);
        }

        String preheatTopic = "ROCKETMQ_PREHEAT_TOPIC";
        int preheatTimes = 10;

        try {
            // 1. 预热同步发送(最常用的发送方式)
            System.out.println("  ? 预热同步发送...");
            for (int i = 0; i < preheatTimes; i++) {
                Message syncMsg = new Message(preheatTopic, "SYNC_TAG", 
                    ("Sync-Preheat-Message-" + i + "-" + System.currentTimeMillis()).getBytes());
                producer.send(syncMsg);
            }

            // 2. 预热异步发送
            System.out.println("  ⚡ 预热异步发送...");
            CountDownLatch asyncLatch = new CountDownLatch(preheatTimes);
            for (int i = 0; i < preheatTimes; i++) {
                Message asyncMsg = new Message(preheatTopic, "ASYNC_TAG", 
                    ("Async-Preheat-Message-" + i).getBytes());
                producer.send(asyncMsg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        asyncLatch.countDown();
                    }
                    @Override
                    public void onException(Throwable e) {
                        System.err.println("  异步发送预热异常: " + e.getMessage());
                        asyncLatch.countDown();
                    }
                });
            }
            // 等待异步发送完成(最多等待10秒)
            boolean asyncCompleted = asyncLatch.await(10, TimeUnit.SECONDS);
            if (!asyncCompleted) {
                System.err.println("  ⚠️ 异步预热未在指定时间内完成");
            }

            // 3. 预热单向发送(适用于日志等不关心结果的场景)
            System.out.println("  ? 预热单向发送...");
            for (int i = 0; i < preheatTimes; i++) {
                Message onewayMsg = new Message(preheatTopic, "ONEWAY_TAG", 
                    ("Oneway-Preheat-Message-" + i).getBytes());
                producer.sendOneway(onewayMsg);
            }

            // 4. 预热业务Topic路由获取(避免首次业务调用时的路由发现延迟)
            System.out.println("  ?️ 预热业务Topic路由...");
            preheatBusinessTopicRoutes(producer);

            System.out.println("  ✅ 生产者组 " + producerGroup + " 预热完成");

        } catch (Exception e) {
            System.err.println("  ❌ 生产者组 " + producerGroup + " 预热失败: " + e.getMessage());
            // 这里可以根据需要决定是否抛出异常
            // 如果是非关键生产者,可以选择记录日志但不中断整个预热流程
        }
    }

    /**
     * 预热业务Topic路由信息
     */
    private void preheatBusinessTopicRoutes(DefaultMQProducer producer) {
        // 替换为实际的业务Topic名称
        String[] businessTopics = {"ORDER_TOPIC", "LOG_TOPIC", "NOTIFY_TOPIC"};
        
        for (String topic : businessTopics) {
            try {
                producer.fetchPublishMessageQueues(topic);
                System.out.println("    ✅ 已预热Topic路由: " + topic);
            } catch (Exception e) {
                System.err.println("    ⚠️ 预热Topic路由失败 " + topic + ": " + e.getMessage());
                // 路由预热失败不影响主要功能,继续执行
            }
        }
    }
}

 

三、对比总结

  • @PostConstruct:适合单 bean、简单依赖、早期预热。
  • ApplicationRunner:适合全局、复杂依赖、批量任务、需要完整上下文的预热。

实际开发中,简单预热用 @PostConstruct,复杂/全局预热用 ApplicationRunner/CommandLineRunner。

(。・v・。)
喜欢这篇文章吗?欢迎分享到你的微博、QQ群,并关注我们的微博,谢谢支持。