首页 » java

在Spring 多线程代码

   发表于:java评论 (0)   热度:4
package com.example.demo.controller;

import com.example.demo.service.OrderAsyncService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;

@RestController
@RequestMapping("/api/order")
public class OrderController {

    @Autowired
    private OrderAsyncService orderAsyncService;

    @PostMapping("/process/{orderId}")
    public String processOrder(@PathVariable String orderId) {
        orderAsyncService.processOrderAsync(orderId);
        return "订单处理任务已提交";
    }

    @GetMapping("/inventory/batch")
    public List<String> checkBatchInventory(@RequestParam List<String> productIds) throws Exception {
        List<Future<String>> futures = new ArrayList<>();
        for (String productId : productIds) {
            futures.add(orderAsyncService.checkInventoryAsync(productId));
        }

        List<String> results = new ArrayList<>();
        for (Future<String> future : futures) {
            results.add(future.get());  // 这里等待,但多个检查是并行执行的
        }
        return results;
    }
}

 

package com.example.demo.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Slf4j
@Configuration
@EnableAsync  // 启用异步支持
public class AsyncConfig {

    @Value("${async.executor.thread.core_pool_size:5}")
    private int corePoolSize;

    @Value("${async.executor.thread.max_pool_size:10}")
    private int maxPoolSize;

    @Value("${async.executor.thread.queue_capacity:100}")
    private int queueCapacity;

    @Value("${async.executor.thread.name.prefix:async-task-}")
    private String namePrefix;

    @Value("native-async-task-")
    private String nativeNamePrefix;

    @Bean("taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心线程数:线程池维护的最小线程数量
        executor.setCorePoolSize(corePoolSize);
        // 最大线程数:线程池允许的最大线程数量
        executor.setMaxPoolSize(maxPoolSize);
        // 队列容量:任务队列的长度
        executor.setQueueCapacity(queueCapacity);
        // 线程名前缀:便于日志跟踪和问题排查
        executor.setThreadNamePrefix(namePrefix);
        // 线程空闲时间:非核心线程的空闲存活时间(秒)
        executor.setKeepAliveSeconds(60);
        // 拒绝策略:当线程池和队列都满时的处理方式
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 等待所有任务完成后关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        // 等待任务完成的最大时间
        executor.setAwaitTerminationSeconds(60);

        executor.initialize();
        return executor;
    }

    // New native ThreadPoolExecutor
/*    @Bean("nativeThreadPool")
    public ThreadPoolExecutor nativeThreadPool() {
        return new ThreadPoolExecutor(
                corePoolSize,
                maxPoolSize,
                60L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(queueCapacity),
                r -> new Thread(r, namePrefix + r.hashCode()),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }*/

    @Bean("nativeThreadPool")
    public ThreadPoolExecutor nativeThreadPool() {
        return new ThreadPoolExecutor(
                corePoolSize,
                maxPoolSize,
                60L, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(queueCapacity),
                r -> {
                    Thread t = new Thread(r, nativeNamePrefix + r.hashCode());
                    t.setUncaughtExceptionHandler((thread, ex) ->
                            log.error("Thread {} threw uncaught exception", thread.getName(), ex));
                    return t;
                },
                new ThreadPoolExecutor.AbortPolicy() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        log.warn("Task rejected from thread pool {}", executor);
                        super.rejectedExecution(r, executor);
                    }
                }
        );
    }
}

 

(。・v・。)
喜欢这篇文章吗?欢迎分享到你的微博、QQ群,并关注我们的微博,谢谢支持。