시퀀스
이 문서에서는 PowerJob의 UseCacheLock에 대해 살펴봅니다.
UseCacheLock
tech/powerjob/server/core/lock/UseCacheLock.java
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface UseCacheLock {
String type();
String key();
int concurrencyLevel();
}
UseCacheLock 어노테이션은 유형, 키 및 통화 수준 속성을 정의합니다.
UseCacheLockAspect
tech/powerjob/server/core/lock/UseCacheLockAspect.java
@Slf4j
@Aspect
@Component
@Order(1)
@RequiredArgsConstructor
public class UseCacheLockAspect {
private final MonitorService monitorService;
private final Map<String, Cache<String, ReentrantLock>> lockContainer = Maps.newConcurrentMap();
private static final long SLOW_THRESHOLD = 100;
@Around(value = "@annotation(useCacheLock))")
public Object execute(ProceedingJoinPoint point, UseCacheLock useCacheLock) throws Throwable {
Cache<String, ReentrantLock> lockCache = lockContainer.computeIfAbsent(useCacheLock.type(), ignore -> {
int concurrencyLevel = useCacheLock.concurrencyLevel();
log.info("[UseSegmentLockAspect] create Lock Cache for [{}] with concurrencyLevel: {}", useCacheLock.type(), concurrencyLevel);
return CacheBuilder.newBuilder()
.initialCapacity(300000)
.maximumSize(500000)
.concurrencyLevel(concurrencyLevel)
.expireAfterWrite(30, TimeUnit.MINUTES)
.build();
});
final Method method = AOPUtils.parseMethod(point);
Long key = AOPUtils.parseSpEl(method, point.getArgs(), useCacheLock.key(), Long.class, 1L);
final ReentrantLock reentrantLock = lockCache.get(String.valueOf(key), ReentrantLock::new);
long start = System.currentTimeMillis();
reentrantLock.lockInterruptibly();
try {
long timeCost = System.currentTimeMillis() - start;
if (timeCost > SLOW_THRESHOLD) {
final SlowLockEvent slowLockEvent = new SlowLockEvent()
.setType(SlowLockEvent.Type.LOCAL)
.setLockType(useCacheLock.type())
.setLockKey(String.valueOf(key))
.setCallerService(method.getDeclaringClass().getSimpleName())
.setCallerMethod(method.getName())
.setCost(timeCost);
monitorService.monitor(slowLockEvent);
log.warn("[UseSegmentLockAspect] wait lock for method({}#{}) cost {} ms! key = '{}', args = {}, ", method.getDeclaringClass().getSimpleName(), method.getName(), timeCost,
key,
JSON.toJSONString(point.getArgs()));
}
return point.proceed();
} finally {
reentrantLock.unlock();
}
}
}
예시
@UseCacheLock(type = "processJobInstance", key = "#instanceId", concurrencyLevel = 1024)
public void redispatchAsync(Long instanceId, int originStatus) {
// 상태를 디스패치 대기 중으로 재설정
instanceInfoRepository.updateStatusAndGmtModifiedByInstanceIdAndOriginStatus(instanceId, originStatus, InstanceStatus.WAITING_DISPATCH.getV(), new Date());
}
keySpEl 지원





