在Spring 多线程代码
package com.example.demo.controller;
import com.example.demo.service.OrderAsyncService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
@RestController
@RequestMapping("/api/order")
public class OrderController {
@Autowired
private OrderAsyncService orderAsyncService;
@PostMapping("/process/{orderId}")
public String processOrder(@PathVariable String orderId) {
orderAsyncService.processOrderAsync(orderId);
return "订单处理任务已提交";
}
@GetMapping("/inventory/batch")
public List<String> checkBatchInventory(@RequestParam List<String> productIds) throws Exception {
List<Future<String>> futures = new ArrayList<>();
for (String productId : productIds) {
futures.add(orderAsyncService.checkInventoryAsync(productId));
}
List<String> results = new ArrayList<>();
for (Future<String> future : futures) {
results.add(future.get()); // 这里等待,但多个检查是并行执行的
}
return results;
}
}
package com.example.demo.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
@Configuration
@EnableAsync // 启用异步支持
public class AsyncConfig {
@Value("${async.executor.thread.core_pool_size:5}")
private int corePoolSize;
@Value("${async.executor.thread.max_pool_size:10}")
private int maxPoolSize;
@Value("${async.executor.thread.queue_capacity:100}")
private int queueCapacity;
@Value("${async.executor.thread.name.prefix:async-task-}")
private String namePrefix;
@Value("native-async-task-")
private String nativeNamePrefix;
@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数:线程池维护的最小线程数量
executor.setCorePoolSize(corePoolSize);
// 最大线程数:线程池允许的最大线程数量
executor.setMaxPoolSize(maxPoolSize);
// 队列容量:任务队列的长度
executor.setQueueCapacity(queueCapacity);
// 线程名前缀:便于日志跟踪和问题排查
executor.setThreadNamePrefix(namePrefix);
// 线程空闲时间:非核心线程的空闲存活时间(秒)
executor.setKeepAliveSeconds(60);
// 拒绝策略:当线程池和队列都满时的处理方式
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务完成后关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 等待任务完成的最大时间
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
// New native ThreadPoolExecutor
/* @Bean("nativeThreadPool")
public ThreadPoolExecutor nativeThreadPool() {
return new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(queueCapacity),
r -> new Thread(r, namePrefix + r.hashCode()),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}*/
@Bean("nativeThreadPool")
public ThreadPoolExecutor nativeThreadPool() {
return new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(queueCapacity),
r -> {
Thread t = new Thread(r, nativeNamePrefix + r.hashCode());
t.setUncaughtExceptionHandler((thread, ex) ->
log.error("Thread {} threw uncaught exception", thread.getName(), ex));
return t;
},
new ThreadPoolExecutor.AbortPolicy() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.warn("Task rejected from thread pool {}", executor);
super.rejectedExecution(r, executor);
}
}
);
}
}
(。・v・。)
喜欢这篇文章吗?欢迎分享到你的微博、QQ群,并关注我们的微博,谢谢支持。