작업 큐의 지연 큐와 우선순위 큐는 큐에서 파생된 것으로 큐의 모든 기능을 가지고 있지만 고유한 특성이 있으며, 지연 큐는 지연 큐의 기능을, 우선순위 큐는 우선순위 큐의 기능을 구현합니다.
이벤트 배경
실제 비즈니스 시나리오에서는 일정 기간 동안 작업 실행을 지연하거나 우선순위에 따라 작업을 실행해야 하는 경우가 흔히 발생합니다.
두 가지 작은 예만 들어보겠습니다:
- 작업이 실행에 실패하면 일정 시간 동안 지연된 후 다시 시도해야 합니다.
- 선입선출(FIFO) 방식이 아닌 일정 기간에 걸쳐 우선순위를 정해야 하는 더 중요한 작업이 있습니다.
R&D를 공부하는 학생으로서 저는 이때 뇌에 그림 감각이 나타났다고 생각합니다. 이러한 요구 사항은 실제 비즈니스 시나리오에서 매우 일반적이며 자체 솔루션도 있다고 생각하지만 이러한 프로그램에는 비즈니스 코드에서 구현된다는 공통된 특징이 있습니다.
그게 뭐가 문제인가요?
- 비즈니스 코드의 구현은 충분히 우아하지 않으며 일반적으로 특정 시나리오에만 적용할 수 있고 일반성이 부족합니다.
- 이러한 구현은 종종 중복되고 각 시나리오를 개별적으로 구현해야 하므로 코드 중복으로 이어집니다.
- 이 구현은 제어가 매우 어렵고 비즈니스 코드에 직접 내장되어 있어 문제가 발생하면 전체 비즈니스 프로세스에 영향을 미칠 수 있습니다.
그렇다면 보편적인 해결책이 있을까요?
쿼드러플 힙을 사용하는 이유
쿼드러플 힙은 가장 작은 요소 또는 가장 큰 요소를 찾는 등 특정 연산을 빠르게 수행하기 위해 데이터 집합을 관리하는 데 주로 사용되는 효율적인 데이터 구조입니다. 힙의 한 형태이지만 일반적인 바이너리 힙과 달리 쿼드러플 힙은 노드당 최대 4개의 자식이 있습니다.
대규모 데이터베이스를 관리하면서 가장 높거나 가장 낮은 값을 빠르게 찾아야 한다고 상상해 보세요. 이차 스택은 잘 정리된 서류 캐비넷과 같아서 맨 위 서랍에는 항상 필요한 최고값이 들어 있고 그 아래 각 서랍에는 약간 더 크거나 작은 값이 들어 있습니다.
쿼드러플 힙의 핵심 속성은 구조와 유지 관리 규칙입니다:
- 구조: 완전한 쿼드트리로, 하단을 제외한 모든 레벨이 노드로 채워지고 하단의 노드가 가능한 한 왼쪽에서 오른쪽으로 채워지는 것을 의미합니다.
- 힙 속성: 최소 4중 힙에서 각 노드의 값은 자식의 값보다 작거나 같고, 최대 4중 힙에서 각 노드의 값은 자식의 값보다 크거나 같습니다.
쿼드러플 힙은 우선순위 큐를 처리하거나 작업을 예약하고 특정 그래프 알고리즘을 구현할 때 특히 유용합니다. 바이너리 힙보다 높이가 낮기 때문에 특히 대량의 데이터로 작업할 때 특정 작업이 더 빠를 수 있습니다.
삼촌의 토속어.
특별한 트리로 상상할 수 있습니다. 이 트리에서 각 노드에는 최대 4개의 자식이 있습니다. 이는 노드당 최대 2개의 자식이 있는 일반적인 '이진 트리'와는 다릅니다.
레이스를 조직하고 있는데 최고의 선수를 빨리 찾아야 한다고 가정해 봅시다. 쿼드러플 힙에서 각 노드는 하위 노드보다 우수하거나 적어도 그 정도는 되어야 합니다. 이렇게 하면 트리의 루트인 최상위 노드가 항상 최고의 노드가 됩니다.
이차 힙의 특징은 최적의 요소 찾기, 새 요소 추가, 요소 제거 등 여러 가지 작업을 빠르게 수행할 수 있다는 점입니다. 이는 경주에서 누가 최고의 선수인지 빠르게 결정하거나 새로운 선수를 빠르게 경주에 참가시키고 퇴장시키는 것과 같습니다. 각 노드에는 최대 4개의 자식이 있기 때문에 트리의 높이가 상대적으로 낮기 때문에 이러한 작업을 완료하기 위해 트리를 더 빠르게 위아래로 이동할 수 있습니다.
Queue 소개
지연 대기열과 우선순위 대기열은 모두 작업이 실행되는 순서에 의존하므로 둘 다 핵심 모듈인 힙을 가지고 있습니다.
전통적인 힙은 각 노드의 위치를 결정하는 가중치 값이 있는 이진 트리로, 최소 힙과 최대 힙의 두 가지 유형이 있으며, 차이점은 최소 힙의 노드 가중치 값이 작을수록 노드가 루트 노드에 가까워지고 최대 힙의 노드 가중치 값이 클수록 루트 노드에 가까워진다는 것입니다.
- 지연 대기열은 최소 쿼드러플 힙을 사용하여 시간이 곧 만료되는 순서대로 배달 요소가 실행되도록 합니다.
- 우선순위 큐는 최소 쿼드러플 힙을 사용하여 우선순위에 따라 요소의 전송을 구현합니다.
Delaying Queue
Delaying Queue 소개
지연 큐는 지연된 큐로, 전달된 요소가 지정된 시간이 만료된 후에만 실행된다는 특징이 있습니다. 지연 큐는 큐에서 파생되었기 때문에 큐의 모든 기능, 즉 큐의 모든 인터페이스와 콜백 메서드를 상속합니다.
새로운 인터페이스.
- 지정된 시간 후에 실행되는 요소를 추가하는 역할을 하는 AddAfter.
// DelayingInterface 는 큐 메서드 인터페이스의 지연 버전입니다.
// DelayingInterface is the delayed version of the Queue method interface
type DelayingInterface interface {
Interface
// AddAfter 요소 추가 및 실행 지연하기
// Add an element, execute it after a delay
AddAfter(element any, delay time.Duration) error
}
지연 대기열에는 사용자가 데이터 처리 수명 주기에 개입하거나 개입하기 위해 정의할 수 있는 콜백 메서드도 포함되어 있습니다.
새로운 콜백 메서드.
- 지연 대기열에 요소가 추가된 후에 실행되는 OnAddAfter입니다.
// DelayingCallback 지연 큐는 큐의 콜백 인터페이스의 지연 버전입니다.
// DelayingCallback is the delayed version of the Queue callback interface
type DelayingCallback interface {
Callback
// OnAddAfter 요소 추가 후 콜백
// Callback after adding element
OnAddAfter(any, time.Duration)
}
대기열 지연 구현 원칙
지연 대기열의 전체 구조는 다음과 같이 설계되어 있습니다:
대기열 지연 사용 예제
물론 지연 대기열도 사용법이 매우 간단하며 복잡한 초기화 과정이 없습니다. 이 경우에도 큐와 동일한 주의 사항이 있습니다. 즉, Get을 사용하든 GetWithBlock을 사용하든 데이터가 처리되었음을 표시하기 위해 완료 메서드를 사용해야 하며, 그렇지 않으면 동일한 데이터를 지연 큐에 추가할 때 Delaying Queue 오류를 반환한다는 점을 기억해야 합니다. 단순 큐를 사용하는 경우 완료 메서드를 사용할 필요가 없습니다.
코드 예제
package main
import (
"fmt"
"time"
"github.com/workqueue"
)
func main() {
q := workqueue.NewDelayingQueue(nil) // create a queue
go func() {
for {
element, err := q.Get() // get element from queue
if err != nil {
fmt.Println(err)
return
}
fmt.Println("get element:", element)
q.Done(element) // mark element as done, 'Done' is required after 'Get'
}
}()
_ = q.Add("hello") // add element to queue, immediately execute
_ = q.Add("world")
_ = q.AddAfter("delay", time.Second) // add element to queue, execute after 1 seconds
time.Sleep(time.Second * 2) // wait for element to be executed
q.Stop()
}
이 방법으로 workqueue.NewDelayingQueue(nil) 사용하여 대기열을 만들고 싶지 않다면 다른 함수인 lazy workqueue.DefaultDelayingQueue() 사용할 수 있으며, 이 두 함수는 동일합니다.
참고: 지연 대기열의 인스턴스를 만들면 대기열이 인터페이스 인터페이스를 구현하는 한 사용자 지정 대기열에 바인딩할 수 있습니다.
// DelayingQueue 인스턴스 생성하기
// Create a new DelayingQueue config
func NewDelayingQueue(conf *DelayingQConfig) *DelayingQ {
conf = isDelayingQConfigValid(conf)
conf.QConfig.cb = conf.cb
return NewDelayingQueueWithCustomQueue(conf, NewQueue(&conf.QConfig))
}
대기열 프로세스에서 콜백 메서드를 사용하려는 경우 이 메서드를 사용하세요:
// 콜백 인터페이스 구현하기
type callback struct {}
func (cb *callback) OnAdd(item any) {}
func (cb *callback) OnGet(item any) {}
func (cb *callback) OnDone(item any) {}
func (cb *callback) OnAddAfter(item any, t time.Duration) {}
// 구성 개체 생성하기
conf := NewDelayingQConfig()
// 콜백 설정
conf.WithCallback(&callback{})
// 큐를 생성할 때 nil을 사용하는 대신 conf 객체를 전달하세요.
q := NewDelayingQueue(conf)
다음 코드는 위의 코드 예시와 동일합니다.
대기열 코드 분석 지연
추가 후
// AddAfter 대기열에 요소 추가 및 지연 후 처리하기
// Add an element to the queue and process it after a specified delay
func (q *DelayingQ) AddAfter(element any, delay time.Duration) error {
if q.IsClosed() {
return ErrorQueueClosed
}
// 지연 시간이 0보다 작거나 같으면 바로 큐에 추가되어 즉시 실행됩니다.
if delay <= 0 {
return q.Add(element)
}
ele := q.elepool.Get()
ele.SetData(element)
ele.SetValue(time.Now().Add(delay).UnixMilli()) // 요소의 만료 시간 설정하기
q.lock.Lock()
q.waiting.Push(ele) // 대기 큐에 추가하고 실행을 기다립니다.
q.lock.Unlock()
q.config.cb.OnAddAfter(element, delay) // 실행 콜백
return nil
}
Priority Queue
우선순위 대기열 소개
우선순위 큐는 우선순위 큐로, 전달된 요소들이 우선순위에 따라 실행되는 것이 특징입니다. 우선순위 큐는 큐에서 파생되었기 때문에 큐의 모든 기능, 즉 큐의 모든 인터페이스와 콜백 메서드를 상속합니다.
우선순위 큐에서 요소의 순서는 요소의 가중치에 따라 결정되며 가중치가 작을수록 우선순위가 높아집니다 (가중치가 0인 경우 요소는 즉시 실행됩니다 ). 우선순위 큐는 가중치에 따라 요소를 정렬하지만 정렬 프로세스 중에 대기 시간이 정해져 있습니다. 즉, 우선순위 큐는 AddWeight 메서드로 추가된 요소가 일정 시간 동안 순서대로 정렬되도록 보장합니다.
이는 우선순위가 가장 높은 요소만 실행되는 것이 아니라 우선순위 대기열에 있는 모든 요소가 실행될 기회를 갖도록 하기 위한 것입니다.
기본 정렬 창 시간: 500밀리초, 이 값은 PriorityQConfig의 WithWindow 메서드로 설정할 수 있습니다.
새로운 인터페이스.
- 지정된 우선순위 실행 요소를 추가하는 역할을 하는 AddWeight입니다.
// 우선순위 큐 메서드 인터페이스
// Priority queue interface
type PriorityInterface interface {
Interface
// AddWeight 요소 추가, 가중치 지정, 시간 경과에 따른 정렬하기
// Add an element with specified weight and sort it within a period of time
AddWeight(element any, weight int) error
}
우선순위 대기열에는 사용자가 데이터 처리 수명 주기에 개입하거나 개입하기 위해 정의할 수 있는 콜백 메서드도 포함되어 있습니다.
새로운 콜백 메서드.
- 요소의 우선순위 대기열에 추가된 후에 실행되는 OnAddWeight입니다.
// 우선순위 큐 콜백 인터페이스
// Priority queue callback interface
type PriorityCallback interface {
Callback
// OnAddWeight 요소 추가 후 콜백
// Callback after adding an element
OnAddWeight(element any, weight int)
}
우선순위 대기열 구현 원칙
우선순위 대기열의 전체 구조는 다음과 같이 설계되어 있습니다:
우선순위 대기열 사용 예제
물론 우선순위 큐는 사용하기 매우 간단하고 프로세스를 초기화하는 데 복잡한 매개 변수가 너무 많지 않으며, 우선순위 큐는 정렬 창 내부의 요소, 즉 정렬 창 외부의 요소가 순서대로 정렬되고 다른 정렬 창 내부의 요소도 순서대로 정렬되는 것을 보장합니다.
또 다른 한 가지는 큐와 동일하게 Get 또는 GetWithBlock을 사용하더라도 데이터가 처리되었음을 표시하기 위해 Done 메서드를 사용해야 하며, 그렇지 않으면 동일한 데이터를 우선순위 큐에 추가할 때 Priority Queue 오류를 반환한다는 점을 기억해야 합니다. 단순 큐를 사용하는 경우 완료 메서드를 사용할 필요가 없습니다.
코드 예제
package main
import (
"fmt"
"time"
"github.com/workqueue"
)
func main() {
conf := workqueue.NewPriorityQConfig().WithWindow(time.Second) // 정렬 기간을 1초로 설정하기
q := workqueue.NewPriorityQueue(conf) // create a queue
go func() {
for {
element, err := q.Get() // get element from queue
if err != nil {
fmt.Println(err)
return
}
fmt.Println("get element:", element)
q.Done(element) // mark element as done, 'Done' is required after 'Get'
}
}()
_ = q.Add("hello") // add element to queue, immediately execute
_ = q.Add("world")
_ = q.AddWeight("delay", 10) // add element with weight is 10 to queue, execute after 500 ms (sort window)
time.Sleep(time.Second * 2) // wait for element to be executed
q.Stop()
}
이 방법으로 workqueue.NewPriorityQueue(nil) 사용하여 대기열을 만들고 싶지 않다면 다른 함수인 lazy workqueue.DefaultPriorityQueue() 사용할 수 있으며, 이 두 함수는 동일합니다.
참고: 우선순위 대기열의 인스턴스를 만들면 대기열이 인터페이스 인터페이스를 구현하는 한 사용자 지정 대기열에 바인딩할 수 있습니다.
// 우선순위 큐 인스턴스 생성하기
// Create a new PriorityQueue config
func NewPriorityQueue(conf *PriorityQConfig) *PriorityQ {
conf = isPriorityQConfigValid(conf)
conf.QConfig.cb = conf.cb
return NewPriorityQueueWithCustomQueue(conf, NewQueue(&conf.QConfig))
}
우선순위 대기열 코드 분석
AddWeight
// AddWeight 요소 추가, 가중치 지정, 시간 경과에 따른 정렬하기
// Add an element, add it use weight and sort it in a period of time
func (q *PriorityQ) AddWeight(element any, weight int) error {
if q.IsClosed() {
return ErrorQueueClosed
}
if weight <= 0 {
return q.Add(element) // 가중치 값이 0보다 작거나 같으면 바로 큐에 추가되어 즉시 실행됩니다.
}
ele := q.elepool.Get()
ele.SetData(element)
ele.SetValue(int64(weight)) // 요소의 가중치 설정하기
q.lock.Lock()
q.waiting.Push(ele) // 대기 대기열에 큐를 추가하고 실행을 기다립니다. 일정 시간이 지나면 대기열은 가중치 값에 따라 정렬됩니다.
q.lock.Unlock()
q.config.cb.OnAddWeight(element, weight) // 실행 콜백
return nil
}
요약
지금까지 세 개의 글을 읽어주셔서 감사합니다. 이번 글에서는 지연 대기열과 우선순위 대기열의 정의, 사용법 및 구현에 대해 소개합니다. 지연 대기열과 우선순위 대기열에 대한 명확한 이해에 도움이 되셨기를 바랍니다.
지연 큐와 우선순위 큐는 모두 WorkQueue의 핵심 모듈로, 큐에서 파생되었으므로 큐의 모든 기능을 가지고 있지만 고유 한 특수 기능도 있습니다.지연 큐는 지연 큐의 기능을 구현하고 우선순위 큐는 다음과 같은 기능을 구현합니다. 우선순위 큐는 우선순위 큐의 기능을 구현합니다.
다음 포스팅은 WorkQueue 프로젝트 시리즈의 마지막 포스팅으로, WorkQueue에서 RateLimiting Queue 모듈의 구현과 사용법을 소개합니다.



