스레드 풀은 실행자를 사용하여 생성할 수 없으며, 대신 "스레드"풀실행자를 사용하여 스레드 풀이 어떻게 작동하는지 글을 읽는 사람에게 더 명확하게 보여주고 리소스 부족의 위험을 방지하는 방식으로 처리됩니다.
참고: 실행자가 반환하는 스레드 풀 객체의 단점은 다음과 같습니다:
일반적인 답변
Thread 풀실행자의 핵심 파라미터는 빌드할 때 전달해야 하는 파라미터이며, 그 생성자는 아래와 같습니다:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
// maximumPoolSize 必须大于 0,且必须大于 corePoolSize
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
첫 번째 매개변수인 corePoolSize는 스레드 풀에 상주하는 코어 스레드 수를 나타냅니다. 0으로 설정하면 작업이 없을 때 스레드 풀이 소멸되고, 0보다 크면 작업이 없을 때에도 풀의 스레드 수가 이 값과 같도록 보장됩니다. 단, 이 값을 작게 설정하면 스레드가 자주 생성 및 소멸되고, 크게 설정하면 시스템 리소스가 낭비되므로 개발자는 실제 업무에 맞게 이 값을 조정할 필요가 있습니다.
두 번째 매개변수인 maximumPoolSize는 작업 수가 최대일 때 풀에서 생성할 수 있는 최대 스레드 수를 나타냅니다. 공식적으로 이 값은 0보다 커야 하며 corePoolSize보다 크거나 같아야 하며, 이 값은 작업이 더 많을 때만 사용되며 작업 대기열에 저장할 수 없습니다.
세 번째 매개변수인 keepAliveTime은 스레드 풀이 유휴 상태이고 이 시간을 초과하면 풀의 스레드 수가 코어풀사이즈와 같을 때까지 여분의 스레드가 파괴되며, maximumPoolSize가 코어풀사이즈와 같으면 스레드 풀이 유휴 상태일 때 어떤 스레드도 파괴하지 않습니다. 최대풀사이즈가 코어풀사이즈와 같으면 풀은 유휴 상태일 때 스레드를 파괴하지 않습니다.
네 번째 파라미터인 단위는 유지되는 시간 단위를 나타내며, keepAliveTime 파라미터와 함께 사용됩니다.
다섯 번째 매개변수: workQueue는 스레드 풀의 작업 대기열을 나타내며, 스레드 풀의 모든 스레드가 작업을 처리하고 있을 때 새 작업이 있으면 이 작업 대기열에 캐시되어 실행을 위해 대기열에 대기합니다.
여섯 번째 매개 변수: "스레드"팩토리는 스레드 생성 팩토리를 의미하며, 이 매개 변수는 일반적으로 덜 사용되며, 일반적으로 스레드 풀 생성에서 이 매개 변수를 지정하지 않으며, 기본 스레드 생성 팩토리 메서드를 사용하여 스레드를 생성하며, 소스 코드는 다음과 같습니다:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
// Executors.defaultThreadFactory() 기본 스레드에 대한 팩토리 생성
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
// 默认的线程创建工厂,需要实现 ThreadFactory 接口
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
// 스레드 생성
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false); // 보호되지 않는 스레드 생성
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY); // 스레드 우선순위는 기본값으로 설정됩니다.
return t;
}
}
스레드 이름이나 스레드 실행 우선순위를 사용자 지정할 수 있는 "스레드"팩토리 인터페이스를 구현하여 스레드 팩토리를 사용자 지정할 수도 있습니다.
일곱 번째 매개변수인 RejectedExecutionHandler는 스레드 풀에 대한 거부 정책을 지정하며, 이는 스레드 풀의 작업이 캐시 큐인 작업 큐에 저장되어 작업을 실행할 새 스레드를 생성할 수 없는 경우 사용되는 흐름 제한 보호 메커니즘입니다.
스레드 풀의 워크플로는 실행 메서드인 execute() 로 시작되며, 소스 코드는 다음과 같습니다:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 현재 작동 중인 스레드 수가 코어 스레드 수보다 적습니다.
if (workerCountOf(c) < corePoolSize) {
// 이 작업을 수행할 새 스레드를 생성합니다.
if (addWorker(command, true))
return;
c = ctl.get();
}
// 스레드 풀이 실행 중인지 확인하고 실행 중이라면 작업을 대기열에 추가합니다.
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 스레드 풀이 실행 중인지 다시 확인하여 첫 번째 검사가 통과된 후 종료되지 않도록 하세요.
// 방금 대기열에 추가된 작업이 실행 중이 아닌 경우 제거합니다.
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果线程池的线程数为 0 时
else if (workerCountOf(recheck) == 0)
addWorker(null, false); // 작업 실행을 위한 새 스레드 만들기
}
// 코어 스레드가 사용 중이고 대기열이 꽉 차서 새 스레드를 시작하려고 해도 실패합니다!
else if (!addWorker(command, false))
// 거부된 정책 적용
reject(command);
}
스레드가 먼저 실행해야 하는 작업인 firstTask를 설정할 수 있으며, 이러한 작업이 없는 경우 null로 설정할 수 있습니다;
코어, 스레드 생성 가능 여부를 결정하는 임계값으로, 참이면 corePoolSize가 임계값으로 사용되며, 거짓이면 maximumPoolSize가 임계값으로 사용됩니다.
분석
"스레드" 풀실행자에는 몇 개의 실행 메서드가 있나요? 그 차이점은 무엇인가요?
스레드 거부 전략이란 무엇인가요?
거부 전략의 범주에는 어떤 것이 있나요?
거부 정책을 사용자 지정하려면 어떻게 해야 하나요?
"스레드" 풀실행자를 확장할 수 있나요? 어떻게 확장할 수 있나요?
지식 확장
execute() VS submit()
실행()과 제출()은 모두 스레드 풀링 작업을 수행하는 데 사용됩니다. 이 둘의 주요 차이점은 제출() 메서드는 스레드 풀링 실행에서 반환 값을 받을 수 있지만 실행()은 그렇지 않다는 점입니다.
두 가지 방법의 구체적인 사용법을 살펴보세요:
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 10, 10L,
TimeUnit.SECONDS, new LinkedBlockingQueue(20));
// execute
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println("Hello, execute.");
}
});
// submit
Future<String> future = executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("Hello, submit.");
return "Success";
}
});
System.out.println(future.get());
위 절차를 실행한 결과는 다음과 같습니다:
Hello, submit.
Hello, execute.
Success
위의 결과에서 submit() 메서드를 Futrue와 함께 사용하여 스레드 실행의 반환값을 받을 수 있음을 알 수 있습니다. 또 다른 차이점은 실행() 메서드는 Executor 인터페이스에 속하는 반면 제출() 메서드는 ExecutorService 인터페이스에 속한다는 점이며, 이들의 상속 관계는 다음 그림과 같습니다:
스레드 풀 거부 정책
스레드 풀의 작업 대기열이 이미 꽉 차 있는데 다른 작업이 추가되면 먼저 현재 스레드 풀의 스레드 수가 스레드 풀의 최대 값보다 크거나 같은지 확인하고, 만약 그렇다면 스레드 풀의 거부 정책을 트리거합니다.
Java에는 4가지 유형의 거부 정책이 제공됩니다:
스레드 풀이 예외를 던지고 실행을 종료하는 종료 정책인 AbortPolicy는 기본 거부 정책입니다;
호출자 실행 정책을 사용하여 현재 스레드에 작업을 실행할 권한을 부여합니다;
DiscardPolicy를 설정하여 이 작업을 무시합니다;
가장 오래된 작업을 무시하려면 DiscardOldestPolicy를 설정하세요.
예를 들어 AbortPolicy 거부 정책을 보여주기 위한 코드는 다음과 같습니다:
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 10,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),
new ThreadPoolExecutor.AbortPolicy()); // 添加 AbortPolicy 拒绝策略
for (int i = 0; i < 6; i++) {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
}
위 절차의 이행 결과:
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.lagou.interview.ThreadPoolExample$$Lambda$1/1096979270@448139f0 rejected from java.util.concurrent.ThreadPoolExecutor@7cca494b[Running, pool size = 3, active threads = 3, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.lagou.interview.ThreadPoolExample.rejected(ThreadPoolExample.java:35)
at com.lagou.interview.ThreadPoolExample.main(ThreadPoolExample.java:26)
6번째 작업이 오면 스레드 풀이 AbortPolicy를 실행하고 예외를 던지는 것을 볼 수 있습니다. 큐는 최대 2개의 작업을 저장할 수 있고 최대 3개의 스레드를 생성하여 작업을 실행할 수 있으므로 6번째 작업이 오면 스레드 풀이 "너무 바쁩니다".
사용자 지정 거부 정책
거부 정책을 사용자 지정하려면 다음 코드에 표시된 것처럼 새 RejectedExecutionHandler 객체를 생성하고 해당 객체의 rejectedExecution() 메서드를 재정의하면 됩니다:
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 10,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),
new RejectedExecutionHandler() { // 사용자 지정 거부 정책 추가하기
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 비즈니스 처리 메서드
System.out.println("사용자 지정 거부 정책 적용");
}
});
for (int i = 0; i < 6; i++) {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
}
위의 코드 실행 결과는 다음과 같습니다:
사용자 지정 거부 정책 적용
pool-1-thread-2
pool-1-thread-3
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2
스레드 풀이 사용자 지정 거부 정책을 적용하는 것을 볼 수 있으며, 거부된 실행에서 자체 비즈니스 처리를 위한 코드를 추가할 수 있습니다.
"스레드" 풀실행자 확장
Thread의 확장은 주로 beforeExecute() 및 afterExecute() 메서드를 재정의하여 이루어지며, 다음 코드와 같이 확장 메서드에 스레드의 실행 시간에 대한 통계와 같은 로깅 또는 통계를 추가할 수 있습니다. 와 같이 추가할 수 있습니다:
public class ThreadPoolExtend {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 스레드 풀 확장 호출
MyThreadPoolExecutor executor = new MyThreadPoolExecutor(2, 4, 10,
TimeUnit.SECONDS, new LinkedBlockingQueue());
for (int i = 0; i < 3; i++) {
executor.execute(() -> {
Thread.currentThread().getName();
});
}
}
/**
* 스레드 풀 확장
*/
static class MyThreadPoolExecutor extends ThreadPoolExecutor {
// 스레드 실행 시작 시간 절약
private final ThreadLocal<Long> localTime = new ThreadLocal<>();
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
/**
* 실행 시작
* @param t
* @param r
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
Long sTime = System.nanoTime(); // 开始时间 (单位:纳秒)
localTime.set(sTime);
System.out.println(String.format("%s | before | time=%s",
t.getName(), sTime));
super.beforeExecute(t, r);
}
/**
* 실행 완료 후
* @param r
* @param t 던져진 예외
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
Long eTime = System.nanoTime(); // 结束时间 (单位:纳秒)
Long totalTime = eTime - localTime.get(); // 총 실행 시간
System.out.println(String.format("%s | after | time=%s | %s ,
Thread.currentThread().getName(), eTime, (totalTime / 1000000.0)));
super.afterExecute(r, t);
}
}
}
위 프로그램을 실행한 결과는 아래와 같습니다:
pool-1-thread-1 | before | time=4570298843700
pool-1-thread-2 | before | time=4570298840000
pool-1-thread-1 | after | time=4570327059500 | 시간: 28.2158
pool-1-thread-2 | after | time=4570327138100 | 시간: 28.2981
pool-1-thread-1 | before | time=4570328467800
pool-1-thread-1 | after | time=4570328636800 | 시간: 0.169
요약
마지막으로 요약하자면, 스레드 풀의 사용은 Thread PoolExecutor를 통해 생성해야 스레드 풀링 규칙을 보다 명확하게 파악하고 리소스 부족의 위험을 피할 수 있습니다. 동시에 코어 스레드 수와 최대 스레드 수의 차이, 스레드 풀 작업 큐에 사용 가능한 공간이없고 스레드 풀의 스레드 수가 최대 스레드 수에 도달하면 거부 전략 인 Java 자동을 실행하는 등 ThreadPoolExecutor의 7 가지 핵심 매개 변수도 소개합니다. 거부 전략에는 4가지 종류가 있으며, 사용자는 거부 전략을 재정의하여 거부 전략을 사용자 정의할 수 있으며, 또한 rejectedExecution()을 재정의하고, beforeExecute() 및 afterExecute()를 재정의하여 Thread풀 실행자의 확장을 구현하기 위해 beforeExecute() 및 afterExecute()를 재정의할 수도 있습니다.





