blog

파이썬 아키텍처 패턴: 1.

고급 및 저급 TDD는 서비스 계층을 도입하여 작동 중인 애플리케이션에서 필요한 추가 오케스트레이션 책임 중 일부를 포착합니다. 서비스 계층은 리포지토리에서 가져와야 하는 것, 리...

Oct 17, 2025 · 69 min. read
シェア

고급 및 저급 TDD

프로토콜:CC BY-NC-SA 4.0

서비스 계층은 작업 중인 애플리케이션에서 필요한 추가 오케스트레이션 책임 중 일부를 포착하기 위해 도입되었습니다. 서비스 계층은 리포지토리에서 무엇을 가져와야 하는지, 어떤 사전 및 현재 상태 유효성 검사를 수행해야 하는지, 최종적으로 무엇을 저장해야 하는지 등 각 사용 사례와 워크플로우를 명확하게 정의하는 데 도움이 됩니다.

그러나 현재 많은 단위 테스트는 모델에 직접 작용하는 훨씬 낮은 수준에서 작동합니다. 이 장에서는 이러한 테스트를 서비스 계층 수준으로 끌어올리는 데 따르는 장단점을 좀 더 일반적인 테스트 지침과 함께 설명합니다.

테스트 피라미드는 어떻게 생겼나요?

이 이니셔티브를 서비스 계층과 자체 서비스 계층 테스트 사용으로 전환하는 것이 테스트 피라미드에 어떤 영향을 미치는지 살펴봅시다:

$ grep -c test_ test_*.py
tests/unit/test_allocate.py:4
tests/unit/test_batches.py:8
tests/unit/test_services.py:3
tests/integration/test_orm.py:6
tests/integration/test_repository.py:2
tests/e2e/test_api.py:2

나쁘지 않네요! 15개의 단위 테스트, 8개의 통합 테스트, 단 2개의 엔드투엔드 테스트가 있습니다. 이것은 이미 건강한 테스트 피라미드입니다.

도메인 계층 테스트를 서비스 계층으로 옮겨야 하나요?

한 단계 더 나아가면 어떻게 되는지 살펴봅시다. 서비스 계층에 대해 테스트할 수 있는 소프트웨어는 더 이상 실제로 도메인 모델에 대해 테스트할 필요가 없습니다. 대신 1장의 모든 도메인 수준 테스트를 서비스 계층 용어로 다시 작성할 수 있습니다:

# domain-layer test:
def test_prefers_current_stock_batches_to_shipments():
 in_stock_batch = Batch("in-stock-batch", "RETRO-CLOCK", 100, eta=None)
 shipment_batch = Batch("shipment-batch", "RETRO-CLOCK", 100, eta=tomorrow)
 line = OrderLine("oref", "RETRO-CLOCK", 10)
 allocate(line, [in_stock_batch, shipment_batch])
 assert in_stock_batch.available_quantity == 90
 assert shipment_batch.available_quantity == 100
# service-layer test:
def test_prefers_warehouse_batches_to_shipments():
 in_stock_batch = Batch("in-stock-batch", "RETRO-CLOCK", 100, eta=None)
 shipment_batch = Batch("shipment-batch", "RETRO-CLOCK", 100, eta=tomorrow)
 repo = FakeRepository([in_stock_batch, shipment_batch])
 session = FakeSession()
 line = OrderLine('oref', "RETRO-CLOCK", 10)
 services.allocate(line, repo, session)
 assert in_stock_batch.available_quantity == 90
 assert shipment_batch.available_quantity == 100

왜 그럴까요?

테스트는 두려움 없이 시스템을 변경하는 데 도움이 되어야 하지만, 도메인 모델에 대해 너무 많은 테스트를 작성하는 팀을 흔히 볼 수 있습니다. 이로 인해 코드 기반을 변경할 때 수십 개 또는 수백 개의 단위 테스트를 업데이트해야 하는 문제가 발생할 수 있습니다.

자동화된 테스트의 목적에 대해 잠시 생각해보면 이 말이 이해가 됩니다. 테스트를 사용하여 시스템이 작동하는 동안 시스템의 속성이 변경되지 않도록 강제합니다. 테스트를 사용하여 API가 계속 200을 반환하는지, 데이터베이스 세션이 계속 제출되는지, 주문이 계속 할당되고 있는지 확인합니다.

이러한 동작 중 하나가 실수로 변경되면 테스트가 실패합니다. 반면에 코드의 디자인이 변경되면 해당 코드에 직접적으로 의존하는 모든 테스트도 실패합니다.

이 책을 자세히 살펴보면 서비스 계층이 시스템을 여러 가지 방식으로 구동할 수 있는 API를 어떻게 형성하는지 알 수 있습니다. 이 API에 대해 테스트하면 도메인 모델을 리팩토링할 때 변경해야 하는 코드의 양이 줄어듭니다. 서비스 계층에 대해서만 테스트하도록 제한하면 모델 객체의 "비공개" 메서드나 프로퍼티와 직접 상호 작용하는 테스트가 없으므로 더 자유롭게 리팩터링할 수 있습니다.

STH에 주목하세요.

테스트에 입력되는 각 코드 줄은 시스템을 특정 모양으로 고정하는 접착제 덩어리와 같습니다. 낮은 수준의 테스트가 많을수록 변경하기가 더 어려워집니다.

어떤 테스트를 작성할지 결정할 때

"그럼 모든 단위 테스트를 다시 작성해야 하나요?"라고 자문하실 수도 있습니다. 도메인 모델에 대해 테스트를 작성하는 것이 잘못된 것인가요?"라고 질문할 수 있습니다. 이러한 질문에 답하려면 커플링과 디자인 피드백 간의 장단점을 이해하는 것이 중요합니다.

그림 5-1: 테스트 스펙트럼
[ditaa, apwp_0501]
| Low feedback High feedback |
| Low barrier to change High barrier to change |
| High system coverage Focused coverage |
| |
| <--------- ----------> |
| |
| API Tests Service-Layer Tests Domain Tests |

익스트림 프로그래밍은 "코드에 귀 기울이기"를 촉구합니다. 테스트를 작성하다 보면 코드가 사용하기 어렵거나 코드가 이상한 점을 발견할 수 있습니다. 이는 디자인을 리팩터링하고 다시 생각하게 하는 계기가 됩니다.

그러나 이러한 피드백은 대상 코드와 긴밀하게 작업할 때만 얻을 수 있습니다. HTTP API에 대한 테스트는 훨씬 더 높은 수준의 추상화에 있는 객체의 세분화된 디자인에는 도움이 되지 않습니다.

반면에 전체 애플리케이션을 다시 작성할 수 있으며 URL이나 요청 형식을 변경하지 않는 한 HTTP 테스트는 계속 통과합니다. 따라서 데이터베이스 스키마 변경과 같은 대규모 변경이 코드를 손상시키지 않는다는 확신을 줄 수 있습니다.

다른 한편으로, 1장에서 작성한 테스트는 원하는 객체를 완전히 이해하는 데 도움이 됩니다. 이 테스트는 도메인 언어를 준수하는 합리적인 디자인으로 설계를 안내합니다. 테스트를 도메인 언어로 읽으면 코드가 문제에 대한 해결책에 대한 직관과 일치하는 것처럼 느껴집니다.

테스트는 도메인 언어로 작성되므로 모델의 살아있는 문서 역할을 합니다. 새로운 팀원은 이러한 테스트를 읽고 시스템의 작동 방식과 핵심 개념이 어떻게 상호 연관되어 있는지 빠르게 이해할 수 있습니다.

새로운 동작은 종종 이 수준에서 테스트를 작성하여 코드가 어떻게 보일지 '스케치'합니다. 그러나 코드의 디자인을 개선하려고 할 때 이러한 테스트는 특정 구현과 밀접하게 연결되어 있으므로 교체하거나 제거해야 합니다.

업스케일 및 다운스케일

대부분의 경우 새로운 기능을 추가하거나 버그를 수정할 때 도메인 모델을 광범위하게 변경할 필요는 없습니다. 이러한 경우에는 결합이 적고 커버리지가 높기 때문에 서비스에 대한 테스트를 작성하는 것이 좋습니다.

예를 들어 add_stock 함수나 cancel_order 함수를 작성할 때 서비스 계층에 대해 테스트를 작성하면 더 빠르고 적은 커플링으로 작업할 수 있습니다.

새 프로젝트를 시작하거나 특히 까다로운 문제가 발생하면 도메인 모델에 대한 테스트를 다시 작성하여 더 나은 피드백과 실행 가능한 의도 문서를 확보할 수 있습니다.

비유하자면 기어를 변속하는 것과 같습니다. 여행을 시작할 때 자전거는 관성을 극복하기 위해 저단 기어를 사용해야 합니다. 일단 달리기 시작하면 더 높은 기어로 변속하여 더 빠르고 효율적으로 이동할 수 있지만, 갑자기 가파른 언덕에 부딪히거나 위험 요소로 인해 속도를 줄여야 하는 경우 다시 가속할 수 있을 때까지 다시 낮은 기어로 내려가게 됩니다.

도메인에서 서비스 계층 테스트를 완전히 분리하기

테스트 데이터는 도메인 개체를 사용하여 설정되고 서비스 계층 함수가 호출되므로 서비스 계층 테스트에서는 여전히 도메인에 대한 직접적인 종속성이 존재합니다.

서비스 계층을 도메인에서 완전히 분리하려면 기본 유형으로 작동하도록 API를 다시 작성해야 합니다.

의 서비스 계층은 현재 OrderLine 필드 객체를 허용합니다:

def allocate(line: OrderLine, repo: AbstractRepository, session) -> str:

모든 인수가 기본 유형이라면 어떤 모습일까요?

def allocate(
 orderid: str, sku: str, qty: int, repo: AbstractRepository, session
) -> str:

테스트도 이러한 용어로 다시 작성했습니다:

def test_returns_allocation():
 batch = model.Batch("batch1", "COMPLICATED-LAMP", 100, eta=None)
 repo = FakeRepository([batch])
 result = services.allocate("o1", "COMPLICATED-LAMP", 10, repo, FakeSession())
 assert result == "batch1"

그러나 Batch 개체는 여전히 수동으로 인스턴스화되므로 테스트는 여전히 도메인에 따라 달라집니다. 따라서 언젠가 Batch 모델의 작동 방식을 대규모로 리팩터링하기로 결정하면 많은 테스트를 변경해야 합니다.

완화: 고정 함수에서 모든 도메인 종속성을 유지합니다.

최소한 테스트에서 도우미 함수나 픽스처로 추상화할 수 있습니다. 다음은 FakeRepository에 팩토리 함수를 추가하여 이를 수행하는 한 가지 방법입니다:

설비에 대한 공장 기능 가능 tests/unit/test_services.py

class FakeRepository(set):
 @staticmethod
 def for_batch(ref, sku, qty, eta=None):
 return FakeRepository([
 model.Batch(ref, sku, qty, eta),
 ])
 ...
def test_returns_allocation():
 repo = FakeRepository.for_batch("batch1", "COMPLICATED-LAMP", 100, eta=None)
 result = services.allocate("o1", "COMPLICATED-LAMP", 10, repo, FakeSession())
 assert result == "batch1"

적어도 이렇게 하면 도메인에 대한 모든 테스트의 종속성을 한 곳에 모을 수 있습니다.

누락된 서비스 추가

그러나 더 나아갈 수도 있습니다. 인벤토리를 추가하는 서비스가 있는 경우 이를 사용하여 서비스 계층의 공식 사용 사례에 표현된 대로 서비스 계층 테스트를 수행하여 도메인에 대한 모든 종속성을 제거할 수 있습니다:

새 add_batch 서비스 테스트 tests/unit/test_services.py

def test_add_batch():
 repo, session = FakeRepository([]), FakeSession()
 services.add_batch("b1", "CRUNCHY-ARMCHAIR", 100, None, repo, session)
 assert repo.get("b1") is not None
 assert session.committed
STH에 주목하세요.

일반적으로 서비스 계층 테스트에서 도메인 계층 작업을 직접 수행해야 하는 경우 서비스 계층이 불완전하다는 것을 나타낼 수 있습니다.

그리고 구현은 단 두 줄의 코드만으로 가능합니다:

def add_batch(
 ref: str, sku: str, qty: int, eta: Optional[date],
 repo: AbstractRepository, session,
):
 repo.add(model.Batch(ref, sku, qty, eta))
 session.commit()
def allocate(
 orderid: str, sku: str, qty: int, repo: AbstractRepository, session
) -> str:
 ...
다음 사항에 유의하십시오.

테스트에서 종속성을 제거하는 데 도움이 된다고 해서 새 서비스를 작성해야 할까요? 아마 아닐 겁니다. 하지만 이 경우 add_batch 서비스는 언젠가는 반드시 필요할 것입니다.

이제 모델에 의존하지 않고 원어만 사용하여 서비스 자체의 관점에서 순수하게 서비스 계층 테스트를 다시 작성할 수 있습니다:

def test_allocate_returns_allocation():
 repo, session = FakeRepository([]), FakeSession()
 services.add_batch("batch1", "COMPLICATED-LAMP", 100, None, repo, session)
 result = services.allocate("o1", "COMPLICATED-LAMP", 10, repo, session)
 assert result == "batch1"
def test_allocate_errors_for_invalid_sku():
 repo, session = FakeRepository([]), FakeSession()
 services.add_batch("b1", "AREALSKU", 100, None, repo, session)
 with pytest.raises(services.InvalidSku, match="Invalid sku NONEXISTENTSKU"):
 services.allocate("o1", "NONEXISTENTSKU", 10, repo, FakeSession())

이곳은 정말 좋은 곳입니다. 서비스 계층 테스트는 서비스 계층 자체에만 의존하므로 필요에 따라 모델을 완전히 자유롭게 리팩터링할 수 있습니다.

엔드투엔드 테스트 개선 사항 제공

add_batch를 추가하여 모델에서 서비스 계층 테스트를 분리하는 데 도움이 된 것처럼, 배치 추가를 위해 API 엔드포인트를 추가하면 보기 흉한 add_stock 장치가 필요하지 않으며, 엔드투엔드 테스트를 통해 하드코딩된 SQL 쿼리와 데이터베이스에 대한 직접적인 종속성을 없앨 수 있습니다.

서비스 함수 덕분에 엔드포인트를 쉽게 추가할 수 있으며, 약간의 JSON 처리와 함수 호출만 있으면 됩니다:

@app.route("/add_batch", methods=['POST'])
def add_batch():
 session = get_session()
 repo = repository.SqlAlchemyRepository(session)
 eta = request.json['eta']
 if eta is not None:
 eta = datetime.fromisoformat(eta).date()
 services.add_batch(
 request.json['ref'], request.json['sku'], request.json['qty'], eta,
 repo, session
 )
 return 'OK', 201
다음 사항에 유의하십시오.

추가_배치/배치*로 POST 요청을 보낼 생각이라면 꼭 그렇게 하세요! 플라스크는 얇은 어댑터이기 때문에 쉽습니다. 다음 사이드바를 참조하세요.

또한 conftest.py에 하드코딩된 SQL 쿼리가 일부 API 호출로 대체되어 API 테스트에 API 이외의 종속성이 없다는 점도 좋습니다:

이제 API 테스트에서 자체 배치를 추가할 수 있습니다 tests/e2e/test_api.py.

def post_to_add_batch(ref, sku, qty, eta):
 url = config.get_api_url()
 r = requests.post(
 f'{url}/add_batch',
 json={'ref': ref, 'sku': sku, 'qty': qty, 'eta': eta}
 )
 assert r.status_code == 201
@pytest.mark.usefixtures('postgres_db')
@pytest.mark.usefixtures('restart_api')
def test_happy_path_returns_201_and_allocated_batch():
 sku, othersku = random_sku(), random_sku('other')
 earlybatch = random_batchref(1)
 laterbatch = random_batchref(2)
 otherbatch = random_batchref(3)
 post_to_add_batch(laterbatch, sku, 100, '2011-01-02')
 post_to_add_batch(earlybatch, sku, 100, '2011-01-01')
 post_to_add_batch(otherbatch, othersku, 100, None)
 data = {'orderid': random_orderid(), 'sku': sku, 'qty': 3}
 url = config.get_api_url()
 r = requests.post(f'{url}/allocate', json=data)
 assert r.status_code == 201
 assert r.json()['batchref'] == earlybatch

요약

서비스 계층이 마련되면 테스트 커버리지의 대부분을 단위 테스트로 이동하고 건강한 테스트 피라미드를 구축할 수 있습니다.

몇 가지 도움이 될 것입니다:

  • 서비스 계층을 도메인 객체가 아닌 프리미티브로 표현하세요.

  • 이상적인 세계에서는 리포지토리나 데이터베이스를 통해 상태를 수정하는 대신 서비스 계층에 대해 완전히 테스트할 수 있는 모든 서비스를 보유할 수 있습니다. 이는 엔드투엔드 테스트에서도 마찬가지입니다.

다음 장으로 이동하세요!

¹ 더 높은 수준의 테스트 작성에 대한 타당한 우려는 더 복잡한 사용 사례의 경우 포트폴리오가 폭발적으로 늘어날 수 있다는 것입니다. 이러한 경우 다양한 협업 도메인 객체로 내려가는 하위 수준의 단위 테스트가 유용할 수 있습니다. 그러나 8장 및 "선택 사항: 가짜 메시지 버스를 사용하여 이벤트 핸들러 개별 테스트하기"도 참조하세요.

: 작업 모델 단위

원본: CC BY-NC-SA 4.0

번역자: CC BY-NC-SA 4.0

프로토콜:

이 장에서는 리포지토리와 서비스 계층 패턴을 하나로 묶는 퍼즐의 마지막 조각인 스키마에 대해 설명합니다.

리포지토리 패턴이 영구 저장소 개념의 추상화라면, 작업 단위 패턴은 개념의 추상화입니다. 이는 궁극적으로 데이터 계층에서 서비스 계층을 완전히 분리할 수 있게 해줍니다.

그림 6-1은 API가 데이터베이스 계층과 직접 상호작용하여 세션을 시작하고, 리포지토리 계층과 상호작용하여 SQLAlchemyRepository 초기화하고, 서비스 계층과 상호작용하여 할당을 요청하는 등 현재 인프라 계층 간에 많은 양의 통신이 이루어지고 있음을 보여줍니다.

STH에 주목하세요.

이 챕터의 코드는 GitHub의 chapter_06_uow 브랜치에 있습니다:

git clone https://.com/cosmicpython/.git
cd code
git checkout chapter_06_uow
# or to code along, checkout Chapter 4:
git checkout chapter_04_service_layer

그림 6-1: UoW 없음: API가 레이어 3과 직접 상호 작용함

그림 6-2는 목표 상태를 보여줍니다. 이제 Flask API는 작업 단위를 초기화하고 서비스를 호출하는 두 가지 작업만 수행합니다. 서비스는 UoW와 함께 작동하지만, 서비스 함수 자체나 Flask는 이제 데이터베이스와 직접 상호 작용할 필요가 없습니다.

는 파이썬 구문의 멋진 부분인 컨텍스트 관리자를 사용합니다.

그림 6-2: UoW 사용: 이제 UoW가 데이터베이스 상태 관리

작업 단위는 리포지토리와 함께 작동합니다.

작업 단위가 실제로 작동하는 모습을 살펴봅시다. 서비스 계층이 완료되면 어떤 모습일까요?

def allocate(
 orderid: str, sku: str, qty: int,
 uow: unit_of_work.AbstractUnitOfWork,
) -> str:
 line = OrderLine(orderid, sku, qty)
 with uow: #(1)
 batches = uow.batches.list() #(2)
 ...
 batchref = model.allocate(line, batches)
 uow.commit() #(3)

컨텍스트 관리자로 UoW를 시작합니다.

uow.batches는 배치 리포지토리이므로 UoW는 영구 저장소에 대한 액세스를 제공합니다.

(iii)

작업이 끝나면 UoW를 사용하여 작업을 커밋하거나 롤백합니다.

UoW는 퍼시스턴트 저장소에 대한 단일 진입점 역할을 하며 로드된 개체와 최신 상태를 추적합니다. ¹

이를 통해 세 가지 유용한 정보를 얻을 수 있습니다:

  • 데이터베이스의 안정적인 스냅샷으로 작업 도중에 사용되는 개체가 변경되지 않도록 합니다.

  • 모든 변경 사항을 한 번에 유지하므로 문제가 발생해도 일관성 없는 상태가 되지 않습니다.

  • 쌍의 지속성 문제는 간단한 API와 리포지토리를 가져올 수 있는 편리한 장소를 제공합니다.

통합 테스트를 통한 UoW 추진

UOW 통합 테스트입니다:

def test_uow_can_retrieve_a_batch_and_allocate_to_it(session_factory):
 session = session_factory()
 insert_batch(session, "batch1", "HIPSTER-WORKBENCH", 100, None)
 session.commit()
 uow = unit_of_work.SqlAlchemyUnitOfWork(session_factory) #(1)
 with uow:
 batch = uow.batches.get(reference="batch1") #(2)
 line = model.OrderLine("o1", "HIPSTER-WORKBENCH", 10)
 batch.allocate(line)
 uow.commit() #(3)
 batchref = get_allocated_batch_ref(session, "o1", "HIPSTER-WORKBENCH")
 assert batchref == "batch1"

사용자 지정 세션 팩토리를 사용하여 UoW를 초기화하고 with 블록에서 사용할 uow 객체를 가져옵니다.

UoW는 uow.batches를 통해 배치 리포지토리에 대한 액세스를 제공합니다.

(iii)

완료되면 commit()을 호출합니다.

궁금하신 분들을 위해 삽입_배치 및 get_allocated_batch_ref 절차는 다음과 같습니다:

def insert_batch(session, ref, sku, qty, eta):
 session.execute(
 'INSERT INTO batches (reference, sku, _purchased_quantity, eta)'
 ' VALUES (:ref, :sku, :qty, :eta)',
 dict(ref=ref, sku=sku, qty=qty, eta=eta)
 )
def get_allocated_batch_ref(session, orderid, sku):
 [[orderlineid]] = session.execute(
 'SELECT id FROM order_lines WHERE orderid=:orderid AND sku=:sku',
 dict(orderid=orderid, sku=sku)
 )
 [[batchref]] = session.execute(
 'SELECT b.reference FROM allocations JOIN batches AS b ON batch_id = b.id'
 ' WHERE orderline_id=:orderlineid',
 dict(orderlineid=orderlineid)
 )
 return batchref

작업 단위 및 해당 컨텍스트 관리자

UoW가 구현해야 하는 인터페이스는 테스트에 암시적으로 정의되어 있습니다. 추상 베이스 클래스를 사용하여 명시적으로 정의할 수 있습니다:

lass AbstractUnitOfWork(abc.ABC):
 batches: repository.AbstractRepository #(1)
 def __exit__(self, *args): #(2)
 self.rollback() #(4)
 @abc.abstractmethod
 def commit(self): #(3)
 raise NotImplementedError
 @abc.abstractmethod
 def rollback(self): #(4)
 raise NotImplementedError

UoW는 배치 리포지토리에 대한 액세스를 제공하는 .batches라는 속성을 제공합니다.

컨텍스트 관리자를 처음 보신다면, __enter__와 __exit__는 with 블록에 들어갈 때와 with 블록을 나갈 때 실행되는 두 가지 마법 메서드입니다. 이는 설정 및 해체 단계입니다.

(iii)

준비가 되면 이 메서드가 호출되어 작업을 명시적으로 커밋합니다.

커밋되지 않거나 오류를 발생시켜 컨텍스트 관리자를 종료하면 롤백이 수행됩니다.

실제 작업 단위는 SQLAlchemy 세션을 사용합니다.

구체적인 구현의 주요 요소는 데이터베이스 세션입니다:

DEFAULT_SESSION_FACTORY = sessionmaker( #(1)
 bind=create_engine(
 config.get_postgres_uri(),
 )
)
class SqlAlchemyUnitOfWork(AbstractUnitOfWork):
 def __init__(self, session_factory=DEFAULT_SESSION_FACTORY):
 self.session_factory = session_factory #(1)
 def __enter__(self):
 self.session = self.session_factory() # type: Session #(2)
 self.batches = repository.SqlAlchemyRepository(self.session) #(2)
 return super().__enter__()
 def __exit__(self, *args):
 super().__exit__(*args)
 self.session.close() #(3)
 def commit(self): #(4)
 self.session.commit()
 def rollback(self): #(4)
 self.session.rollback()

이 모듈은 Postgres에 연결할 기본 세션 팩토리를 정의하지만, 통합 테스트에서 재정의가 가능하므로 SQLite를 대신 사용할 수 있습니다.

enter__ 메서드는 데이터베이스 세션을 시작하고 해당 세션을 사용할 수 있는 실제 리포지토리를 인스턴스화하는 작업을 담당합니다.

(iii)

종료 시 세션을 닫습니다.

마지막으로 사용된 데이터베이스 세션에 대한 구체적인 커밋() 및 롤백() 메서드를 제공합니다.

테스트용 더미 작업 단위

이것이 서비스 계층 테스트에서 가짜 UoW가 사용되는 방식입니다:

class FakeUnitOfWork(unit_of_work.AbstractUnitOfWork):
 def __init__(self):
 self.batches = FakeRepository([]) #(1)
 self.committed = False #(2)
 def commit(self):
 self.committed = True #(2)
 def rollback(self):
 pass
def test_add_batch():
 uow = FakeUnitOfWork() #(3)
 services.add_batch("b1", "CRUNCHY-ARMCHAIR", 100, None, uow) #(3)
 assert uow.batches.get("b1") is not None
 assert uow.committed
def test_allocate_returns_allocation():
 uow = FakeUnitOfWork() #(3)
 services.add_batch("batch1", "COMPLICATED-LAMP", 100, None, uow) #(3)
 result = services.allocate("o1", "COMPLICATED-LAMP", 10, uow) #(3)
 assert result == "batch1"

FakeUnitOfWork와 FakeRepository는 실제 UnitofWork와 Repository 클래스와 마찬가지로 긴밀하게 결합되어 있습니다. 이 객체들이 공동 작업자라는 것을 인식하기 때문에 괜찮습니다.

FakeSession의 fake commit() 함수와 유사하다는 점에 주목하세요. 하지만 이제 타사 코드가 아닌 자체 작성된 코드를 모델링한다는 점에서 크게 개선된 기능입니다. 어떤 사람들은 "소유하지 않은 것을 모방하지 말라"고 말합니다.

(iii)

리포지토리와 세션을 전달하는 대신 UoW를 인스턴스화하여 테스트에서 서비스 계층에 전달할 수 있습니다. 훨씬 더 간단합니다.

서비스 계층에서 UoW 사용

새 서비스 계층은 다음과 같습니다:

def add_batch(
 ref: str, sku: str, qty: int, eta: Optional[date],
 uow: unit_of_work.AbstractUnitOfWork, #(1)
):
 with uow:
 uow.batches.add(model.Batch(ref, sku, qty, eta))
 uow.commit()
def allocate(
 orderid: str, sku: str, qty: int,
 uow: unit_of_work.AbstractUnitOfWork, #(1)
) -> str:
 line = OrderLine(orderid, sku, qty)
 with uow:
 batches = uow.batches.list()
 if not is_valid_sku(line.sku, batches):
 raise InvalidSku(f"Invalid sku {line.sku}")
 batchref = model.allocate(line, batches)
 uow.commit()
 return batchref

의 서비스 계층에는 이제 다시 한 번 UoW에 대한 종속성이 하나만 있습니다.

커밋/롤백 동작을 명시적으로 테스트하기

커밋/롤백 동작이 제대로 작동하는지 확인하기 위해 몇 가지 테스트를 작성했습니다:

def test_rolls_back_uncommitted_work_by_default(session_factory):
 uow = unit_of_work.SqlAlchemyUnitOfWork(session_factory)
 with uow:
 insert_batch(uow.session, 'batch1', 'MEDIUM-PLINTH', 100, None)
 new_session = session_factory()
 rows = list(new_session.execute('SELECT * FROM "batches"'))
 assert rows == []
def test_rolls_back_on_error(session_factory):
 class MyException(Exception):
 pass
 uow = unit_of_work.SqlAlchemyUnitOfWork(session_factory)
 with pytest.raises(MyException):
 with uow:
 insert_batch(uow.session, 'batch1', 'LARGE-FORK', 100, None)
 raise MyException()
 new_session = session_factory()
 rows = list(new_session.execute('SELECT * FROM "batches"'))
 assert rows == []
STH에 주목하세요.

여기에는 표시되어 있지 않지만 트랜잭션과 같은 좀 더 "모호한" 데이터베이스 동작을 "실제" 데이터베이스, 즉 동일한 엔진에 대해 테스트해 볼 가치가 있습니다. 현재는 Postgres 대신 SQLite가 사용되지만 7장에서는 일부 테스트가 실제 데이터베이스를 사용하는 것으로 전환됩니다. UoW 클래스는 이를 쉽게 만들어줍니다!

명시적 커밋과 암시적 커밋

이제 UoW 모델을 구현하는 다양한 방법에 대해 간략하게 살펴보겠습니다.

기본적으로 커밋하고 예외가 발견될 때만 롤백하는 약간 다른 버전의 UoW를 상상해 볼 수 있습니다:

class AbstractUnitOfWork(abc.ABC):
 def __enter__(self):
 return self
 def __exit__(self, exn_type, exn_value, traceback):
 if exn_type is None:
 self.commit() #(1)
 else:
 self.rollback() #(2)

일반적인 상황에서 암시적 커밋이 있어야 하나요?

그리고 예외적인 경우에만 롤백할 수 있나요?

이렇게 하면 코드 한 줄을 저장하고 클라이언트 코드에서 명시적 커밋을 제거할 수 있습니다:

def add_batch(ref: str, sku: str, qty: int, eta: Optional[date], uow):
 with uow:
 uow.batches.add(model.Batch(ref, sku, qty, eta))
 # uow.commit()

이것은 판단의 문제이지만 명시적인 커밋이 필요한 경우가 많으므로 상태를 새로 고칠 시기를 선택해야 합니다.

추가 코드 줄이 사용되지만 기본적으로 소프트웨어는 안전합니다. 기본 동작은 다음과 같습니다. 결과적으로 시스템 변경을 초래하는 코드 경로는 전체 성공과 명시적 커밋 하나뿐이므로 코드를 더 쉽게 추론할 수 있습니다. 다른 코드 경로, 예외, UoW 범위의 조기 종료는 모두 안전한 상태가 됩니다.

다시 말하지만, 롤백은 사용자가 커밋을 했거나 변경을 취소한 커밋으로 되돌아가기 때문에 이해하기 쉽기 때문에 기본적으로 롤백을 선호합니다. 거칠지만 간단합니다.

예: UoW를 사용하여 여러 작업을 단일 원자 단위로 그룹화하기

다음은 UoW 패턴의 사용을 보여주는 몇 가지 예시입니다. 코드 블록이 언제 결합되는지에 대한 간단한 추론으로 이어지는 방법을 확인할 수 있습니다.

예 1: 재배포

할당을 취소한 다음 주문을 다시 할당할 수 있다고 가정해 보겠습니다:

def reallocate(
 line: OrderLine,
 uow: AbstractUnitOfWork,
) -> str:
 with uow:
 batch = uow.batches.get(sku=line.sku)
 if batch is None:
 raise InvalidSku(f'Invalid sku {line.sku}')
 batch.deallocate(line) #(1)
 allocate(line) #(2)
 uow.commit()

deallocate()가 실패하면 당연히 allocate()를 호출하고 싶지 않을 것입니다.

allocate()가 실패하면 실제로 deallocate()를 커밋하고 싶지 않을 수 있습니다.

예 2: 배치 크기 변경하기

컨테이너 문이 열려 소파의 절반이 인도양으로 떨어졌다는 배송업체의 전화가 왔습니다. 이런!

def change_batch_quantity(
 batchref: str, new_qty: int,
 uow: AbstractUnitOfWork,
):
 with uow:
 batch = uow.batches.get(reference=batchref)
 batch.change_purchased_quantity(new_qty)
 while batch.available_quantity < 0:
 line = batch.deallocate_one() #(1)
 uow.commit()

여기서 원하는 수의 행 할당을 취소해야 할 수도 있습니다. 어느 단계에서든 오류가 발생하면 변경 사항을 커밋하지 않을 수 있습니다.

통합 테스트 구성

이제 기본적으로 데이터베이스를 가리키는 세 가지 테스트 세트가 있습니다. 이 중 하나라도 삭제해야 할까요?

└── 테스트
 컨테스트.py
 “e2e
 │   └── 테스트_api.py
 통합
 │   ├── 테스트_orm.py
 │   ├── 테스트_repository.py
 │   └── test_uow.py
 파이 테스트.ini
 └── 단위
 ├── test_allocate.py
 ├── test_batches.py
 └── test_services.py

이 테스트가 장기적으로 가치를 더할 수 없다고 생각되면 언제든지 삭제해야 합니다. 이 테스트는 주로 SQLAlchemy를 배우는 데 도움이 되는 도구이므로 장기적으로는 필요하지 않으며, 특히 주요 기능이 이미 포함되어 있는 경우에는 더욱 그렇습니다. 마지막 테스트는 유지할 수도 있지만 가능한 가장 높은 수준의 추상화 수준에서만 유지해야 한다는 주장은 확실히 알 수 있습니다.

STH에 주목하세요.

이것은 5장에서 배운 또 다른 교훈의 한 예입니다. 더 나은 추상화가 구축되면 의도한 추상화에 대해 테스트를 실행하여 기본 세부 사항을 자유롭게 변경할 수 있습니다. * *# 요약

단일 작업 패턴이 유용하고 컨텍스트 관리자가 코드를 원자적으로 수행하려는 블록으로 시각적으로 그룹화할 수 있는 훌륭한 Python 방법이라는 것을 확신하셨기를 바랍니다.

사실 이 패턴은 매우 유용해서 SQLAlchemy에서는 이미 세션 개체 형태로 UoW를 사용하고 있습니다. SQLAlchemy의 세션 개체는 애플리케이션이 데이터베이스에서 데이터를 로드하는 방식입니다.

데이터베이스에서 새 엔티티가 로드될 때마다 세션이 엔티티를 변경하기 시작하고 세션이 종료될 때 모든 변경 사항이 함께 유지됩니다. SQLAlchemy가 이미 원하는 패턴을 구현하고 있다면 왜 굳이 SQLAlchemy 세션을 추상화할 필요가 있을까요?

표 6-1에서는 몇 가지 장단점에 대해 설명합니다.

모듈 작동 모드: 장단점

원자 연산이라는 개념이 잘 추상화되어 있으며 컨텍스트 관리자를 사용하면 어떤 코드 블록이 원자 단위로 그룹화되어 있는지 쉽게 시각화할 수 있습니다.ORM에는 원자성에 대한 완벽하게 적합한 추상화가 이미 있을 것입니다. 그리고 SQLAlchemy에는 컨텍스트 관리자도 있습니다. 세션을 전달하는 것만으로도 큰 도움이 될 수 있습니다.
트랜잭션 시작과 종료 시점을 명시적으로 제어하는 애플리케이션은 기본적으로 안전한 방식으로 실패합니다. 작업이 부분적으로 커밋되는 것에 대해 걱정할 필요가 없습니다.쉽게 보이게 만드는 것은 쉽지만 롤백, 멀티스레딩 및 중첩 트랜잭션과 같은 것에 대해 신중하게 생각해야 합니다. 어쩌면 장고나 플라스크-SQLAlchemy가 제공하는 기능에 충실하는 것이 여러분의 삶을 더 쉽게 만들 수도 있습니다.
클라이언트 코드가 액세스할 수 있도록 모든 리포지토리를 함께 보관할 수 있는 좋은 장소입니다.
이후 장에서 살펴보겠지만 원자성은 트랜잭션에만 적용되는 것이 아니라 이벤트와 메시지 버스에도 도움이 될 수 있습니다.

첫째, 세션 API는 매우 풍부하여 도메인에서 필요하지 않거나 원하지 않는 작업을 지원합니다. 유닛오브워크는 세션을 시작, 커밋 또는 폐기할 수 있는 기본 핵심으로 세션을 단순화합니다.

또는 UnitOfWork를 사용하여 액세스하는 리포지토리 개체를 사용하세요. 이는 일반 SQLAlchemySession으로는 달성할 수 없는 훌륭한 개발자 사용성 트릭입니다.

마지막으로, 다시 종속성 반전 원칙에 따라 서비스 계층은 얇은 추상화에 의존하고 시스템 주변에 구체적인 구현을 첨부합니다. 이는 SQLAlchemy의 자체 제안과 잘 맞습니다:

세션의 라이프사이클을 분리하여 외부화하세요. 보다 실질적인 애플리케이션에 권장되는 가장 포괄적인 접근 방식은 세션, 트랜잭션 및 예외 관리의 세부 사항을 해당 작업을 수행하는 절차의 세부 사항과 가능한 한 분리하는 것입니다.

--SQLALchemy "세션 기본 사항" 문서

¹ 목표를 달성하기 위해 함께 작업하는 객체를 설명할 때 "공동 작업자"라는 용어를 들어보셨을 것입니다. 작업 단위와 리포지토리는 객체 모델링의 의미에서 협업자의 훌륭한 예입니다. 책임 중심 디자인에서는 각자의 역할에 따라 협업하는 객체들의 클러스터를 "객체 이웃"이라고 하는데, 전문가들의 의견으로는 매우 귀엽다고 합니다.

장: 집계 및 일관성 경계

원본: 7: 집계 및 일관성 경계

번역자: GitHub

프로토콜: CC BY-NC-SA 4.0

이 장에서는 도메인 모델을 다시 살펴보고, 불변성과 제약 조건에 대해 논의하고, 도메인 객체가 개념적으로나 영구 저장소에서 자체 내부 일관성을 유지하는 방법을 살펴보고자 합니다. 개념에 대해 논의하고 이를 명시적으로 만드는 것이 유지보수성을 손상시키지 않고 고성능 소프트웨어를 구축하는 데 어떻게 도움이 되는지 보여드릴 것입니다.

그림 7-1은 여러 배치를 래핑하기 위해 Product라는 새로운 모델 객체를 도입하고, 기존 allocate() 도메인 서비스를 Product의 메서드로 사용할 수 있도록 한다는 목표를 보여줍니다.

그림 7-1.

왜 그럴까요? 그 이유를 알아보세요.

STH에 주목하세요.

이 장의 코드는 SELECT FOR UPDATE appendix_csvs 브랜치에 있습니다:

git clone https://.com/cosmicpython/.git
cd code
git checkout appendix_csvs
# or to code along, checkout the previous chapter:
git checkout chapter_06_uow

스프레드시트에서 모든 것을 실행하는 것은 어떨까요?

도메인 모델링의 중요성은 무엇인가요? 도메인 모델링이 해결하려는 근본적인 문제는 무엇인가요?

스프레드시트에서 모든 것을 실행할 수 없나요? 많은 사용자가 비즈니스 사용자는 스프레드시트가 간단하고 친숙하면서도 매우 강력하기 때문에 스프레드시트를 사용합니다.

실제로 많은 비즈니스 프로세스가 스프레드시트를 이메일로 수동으로 주고받는 방식으로 운영되고 있습니다. 이러한 'CSV over SMTP' 아키텍처는 초기 복잡성은 낮지만 로직을 적용하고 일관성을 유지하기가 어렵기 때문에 확장성이 떨어지는 경향이 있습니다.

특정 필드를 볼 수 있는 권한은 누구에게 있나요? 업데이트 권한은 누구에게 있나요? 350개의 의자 또는 천만 개의 테이블을 주문하려고 하면 어떻게 되나요? 직원이 마이너스 급여를 받을 수 있나요?

이것이 시스템의 제약 조건입니다. 작성된 도메인 로직의 대부분은 시스템의 불변성을 보존하기 위해 이러한 제약 조건을 적용하기 위해 존재합니다. 는 연산이 완료될 때마다 참이어야 하는 것들입니다.

불변성, 제약 조건 및 일관성

두 용어는 어느 정도 서로 바꿔 사용할 수 있지만, 제한된 모델이 들어갈 수 있는 가능한 상태에 대한 규칙은 항상 참인 조건으로 더 정확하게 정의됩니다.

호텔 예약 시스템을 작성하는 경우 이중 예약을 허용하지 않는 제약 조건이 있을 수 있습니다. 이는 한 객실에 같은 날 밤에 두 개 이상의 예약이 있을 수 없다는 불변성을 지원합니다.

물론 일시적인 규칙이 필요할 때도 있습니다. VIP 예약으로 인해 객실을 재배치해야 할 수도 있습니다. 메모리에서 예약을 이동할 때 예약이 이중 예약될 수 있지만 도메인 모델은 완료 시 불변성이 만족되는 일관된 상태가 되도록 해야 합니다. 모든 게스트를 수용할 수 있는 방법을 찾을 수 없는 경우 오류가 발생하고 완료 작업이 거부되어야 합니다.

비즈니스 요구 사항에서 몇 가지 구체적인 예를 살펴보겠습니다:

주문 라인은 한 번에 하나의 배치에만 할당할 수 있습니다.

--작업

이것은 불변성을 부과하는 비즈니스 규칙입니다. 불변성은 주문 라인이 0개 또는 하나의 배치에 할당되어야 하지만 절대 두 개 이상 할당되어서는 안 된다는 것입니다. 보장해야 하는 코드는 실수로 두 개의 다른 배치에 대해 동일한 행에서 Batch.allocate()를 호출하지 않도록 해야 하며, 현재 이를 명시적으로 방지하는 방법은 없습니다.

불변성, 동시성 및 잠금

다른 비즈니스 규칙을 다시 한 번 살펴보겠습니다:

사용 가능한 수량이 주문 라인 수보다 적으면 배치에 할당할 수 없습니다.

--작업

여기서 제약 조건은 배치의 사용 가능한 수량보다 더 많은 재고를 할당할 수 없다는 것이므로 동일한 실제 매트에 두 명의 고객을 할당하여 재고를 초과 판매해서는 안 됩니다. 시스템 상태가 업데이트될 때마다 코드는 불변성을 깨지 않도록, 즉 사용 가능한 수량이 0보다 크거나 같아야 한다는 것을 보장해야 합니다.

단일 스레드, 단일 사용자 애플리케이션에서 이 불변성을 유지하는 것은 비교적 쉽습니다. 한 번에 한 줄씩 인벤토리를 할당하고 사용 가능한 인벤토리가 없는 경우 오류를 발생시킬 수 있습니다.

이것은 도입 개념이 도입되면서 더욱 어려워졌습니다. 갑자기 여러 주문 라인에 재고를 동시에 할당하는 것이 가능해졌습니다. 배치 자체에 대한 변경 사항을 처리하는 동시에 주문 라인을 할당하는 것도 가능할 수 있습니다.

일반적으로 이 문제는 데이터베이스 테이블에 적용하여 해결합니다. 이렇게 하면 동일한 행이나 테이블에서 두 개의 작업이 동시에 수행되지 않습니다.

애플리케이션 확장을 고려하기 시작하면서 사용 가능한 모든 배치에 대해 행을 할당하는 모델이 확장되지 않을 수 있다는 것을 깨달았습니다. 시간당 수만 건의 주문이 처리되고 수십만 개의 주문 행이 처리되는 경우, 각 주문 행에 대해 전체 배치 테이블을 잠그는 것은 불가능하며, 최소한 교착 상태나 성능 문제가 발생할 것입니다.

집계란 무엇인가요?

주문 라인을 할당할 때마다 전체 데이터베이스를 잠글 수 없다면 어떻게 해야 할까요? 시스템의 불변성을 보호하면서도 최대한의 동시성을 허용하는 것이 바람직합니다. 불변성을 유지한다는 것은 필연적으로 동시 쓰기를 방지한다는 의미이며, 여러 사용자가 동시에 DEADLY-SPOON을 할당할 수 있다면 과잉 할당의 위험이 있습니다.

반면에 DEADLY-SPOON과 FLIMSY-DESK를 동시에 할당할 수 있습니다. 두 제품에는 공통 불변수가 없으므로 동시에 할당하는 것이 안전합니다. 서로 일치할 필요는 없습니다.

패턴은 이러한 긴장을 해소하는 데 도움이 되는 DDD 커뮤니티의 디자인 패턴입니다. 다른 도메인 객체를 포함하는 도메인 객체일 뿐이며, 전체 컬렉션을 하나의 단위로 취급할 수 있습니다.

집계 내에서 객체를 수정하는 유일한 방법은 전체 객체를 로드하고 집계 자체에서 메서드를 호출하는 것입니다.

모델이 복잡해짐에 따라 엔티티와 값 개체가 서로를 참조하여 그래프가 얽혀 누가 무엇을 수정할 수 있는지 추적하기 어렵게 됩니다. 특히 모델에서 일부 개체를 관련 개체를 수정할 수 있는 유일한 진입점으로 지정하는 것이 좋을 때가 있습니다. 일부 개체를 다른 개체의 일관성을 책임지도록 지정하면 시스템이 개념적으로 더 간단하고 이해하기 쉬워집니다.

예를 들어 쇼핑 사이트를 구축하는 경우 장바구니는 하나의 단위로 생각할 수 있는 항목 집합이므로 좋은 애그리게이터가 될 수 있습니다. 중요한 것은 데이터스토어에서 전체 장바구니를 단일 블록으로 로드하는 것이 바람직하다는 것입니다. 두 개의 요청이 동시에 장바구니를 수정하는 것은 바람직하지 않으며, 그렇지 않으면 이상한 동시성 오류가 발생할 위험이 있습니다. 대신, 장바구니의 각 변경 사항은 단일 데이터베이스 트랜잭션에서 실행하는 것이 좋습니다.

여러 고객의 장바구니를 동시에 변경해야 하는 사용 사례는 없으므로 단일 거래에서 여러 장바구니를 수정하는 것은 바람직하지 않습니다. 각 장바구니는 자체 불변성을 유지하는 단일 장바구니입니다.

집계는 관련 개체의 모음으로, 데이터 변경의 단위로 생각하면 됩니다.

--에릭 에반스, 도메인 중심 디자인 블루북

에릭 에반스에 따르면 집계에는 항목에 대한 액세스를 캡슐화하는 루트 엔티티가 있습니다. 각 항목에는 고유한 ID가 있지만 나머지 시스템에서는 항상 카트를 분할할 수 없는 전체로 간주합니다.

STH에 주목하세요.

*_leading_underscores* 사용하여 메서드나 함수를 "비공개"로 표시하는 것처럼, 집계는 모델의 "공개" 클래스이고 다른 엔티티와 값 개체는 "비공개"라고 생각할 수 있습니다.

집계 선택

내 시스템에 어떤 집계를 사용해야 하나요? 선택은 다소 임의적이지만 중요합니다. 집계는 모든 작업이 일관된 상태로 끝나도록 보장하는 경계가 됩니다. 이는 소프트웨어에 대한 추론을 돕고 이상한 경합 문제를 방지하는 데 도움이 됩니다. 적은 수의 객체를 중심으로 경계를 그리는 것이 바람직하며(성능 향상을 위해 작을수록 좋습니다), 이러한 객체는 서로 일관성이 있어야 하고 경계에 좋은 이름을 붙여야 합니다.

배후에서 작동하는 개체는 배치입니다. 배치 그룹은 어떻게 참조해야 하나요? 시스템의 모든 배치는 어떻게 일관성이 있는 개별적인 섬으로 구분해야 할까요?

배송을 경계로 사용합니다. 각 배송에는 동시에 배송되는 여러 배치의 창고가 포함되어 있습니다. 또는 창고를 경계로 사용할 수 있습니다. 각 창고에는 여러 배치가 포함되어 있으며 모든 재고를 동시에 계산하는 것이 합리적일 수 있습니다.

그러나 이 두 가지 개념은 모두 만족할 수 없습니다. 같은 창고 또는 같은 배치에 있더라도 DEADLY-SPOONS와 FLIMSY-DESK를 모두 할당할 수 있어야 합니다. 이러한 개념은 세분화 수준이 적절하지 않습니다.

주문 라인을 할당할 때는 주문 라인과 동일한 SKU를 가진 배치만 관심 대상입니다. GlobalSkuStock과 유사한 개념인 특정 SKU에 대한 모든 배치의 집합을 사용할 수 있습니다.

하지만 다루기 어려운 이름이라 SkuStock, Stock, ProductStock 등으로 논의한 끝에 1장에서 도메인 언어를 살펴볼 때 처음 접하는 개념인 만큼 그냥 Product라고 부르기로 결정했습니다.

주문 라인을 할당하려는 경우, 그림 7-2를 사용하여 월드의 모든 배치 객체를 조회하고 이를 allocate() 필드 서비스에 전달하는 대신 다음과 같이 계획합니다.

그림 7-2.: 도메인 서비스를 사용한 모든 배치 배포
[plantuml, apwp_0702, config=plantuml.cfg]
@startuml
hide empty members
package "Service Layer" as services {
 class "allocate()" as allocate {
 }
 hide allocate circle
 hide allocate members
}
package "Domain Model" as domain_model {
 class Batch {
 }
 class "allocate()" as allocate_domain_service {
 }
 hide allocate_domain_service circle
 hide allocate_domain_service members
}
package repositories {
 class BatchRepository {
 list()
 }
}
allocate -> BatchRepository: list all batches
allocate --> allocate_domain_service: allocate(orderline, batches)
@enduml

... 는 그림 7-3의 세계로 전환되며, 여기에는 모든 배치를 담당하고 .allocate() 메서드를 호출할 수 있는 특정 SKU에 대한 새 Product 개체가 있습니다.

그림 7-3. 이후: 배치에 따라 제품을 할당해야 합니다.
[plantuml, apwp_0703, config=plantuml.cfg]
@startuml
hide empty members
package "Service Layer" as services {
 class "allocate()" as allocate {
 }
}
hide allocate circle
hide allocate members
package "Domain Model" as domain_model {
 class Product {
 allocate()
 }
 class Batch {
 }
}
package repositories {
 class ProductRepository {
 get()
 }
}
allocate -> ProductRepository: get me the product for this sku
allocate --> Product: product.allocate(orderline)
Product o- Batch: has
@enduml

코드 양식이 어떻게 생겼는지 살펴봅시다:

class Product:
 def __init__(self, sku: str, batches: List[Batch]):
 self.sku = sku #(1)
 self.batches = batches #(2)
 def allocate(self, line: OrderLine) -> str: #(3)
 try:
 batch = next(b for b in sorted(self.batches) if b.can_allocate(line))
 batch.allocate(line)
 return batch.reference
 except StopIteration:
 raise OutOfStock(f"Out of stock for sku {line.sku}")

제품의 기본 식별자는 SKU입니다.

SKU의 제품 클래스에는 해당 SKU의 배치 세트에 대한 참조가 있습니다.

(iii)

마지막으로, 할당() 필드 서비스를 제품 집계 메서드로 이동할 수 있습니다.

금액에 대한 분류기

이 제품은 제품 모델에서 기대하는 것과 다를 수 있습니다. 가격도 없고, 설명도 없고, 크기도 없습니다. 배포 서비스는 이런 것들은 신경 쓰지 않습니다. 한 애플리케이션에서 제품의 개념이 다른 애플리케이션에서 제품의 개념과 매우 다를 수 있다는 것이 경계가 있는 컨텍스트의 힘입니다. 자세한 내용은 아래 사이드바를 참조하세요.

집계 = 리포지토리

특정 엔티티를 집계로 정의한 후에는 외부에서 액세스할 수 있는 유일한 엔티티라는 규칙을 적용해야 합니다. 즉, 허용되는 유일한 리포지토리는 집계를 반환하는 리포지토리여야 합니다.

금액에 대한 분류기

집계만 반환하는 리포지토리는 집계 적용이 도메인 모델에 들어갈 수 있는 유일한 방법인 주요 장소입니다. 이를 위반하지 않도록 주의하세요!

의 경우 배치저장소에서 제품저장소로 전환됩니다:

class AbstractUnitOfWork(abc.ABC):
 products: repository.AbstractProductRepository
...
class AbstractProductRepository(abc.ABC):
 @abc.abstractmethod
 def add(self, product):
 ...
 @abc.abstractmethod
 def get(self, sku) -> model.Product:
 ...

올바른 배치가 자동으로 로드되고 '제품' 개체에 연결되도록 ORM 계층을 약간 조정해야 합니다. 좋은 점은 리포지토리 모델을 사용하면 이에 대해 걱정할 필요가 없다는 것입니다. "FakeRepository"를 사용하고 새 모델을 서비스 계층에 전달하면 "제품"에 대한 주요 진입점으로 어떻게 보이는지 확인할 수 있습니다:

def add_batch(
 ref: str, sku: str, qty: int, eta: Optional[date],
 uow: unit_of_work.AbstractUnitOfWork
):
 with uow:
 product = uow.products.get(sku=sku)
 if product is None:
 product = model.Product(sku, batches=[])
 uow.products.add(product)
 product.batches.append(model.Batch(ref, sku, qty, eta))
 uow.commit()
def allocate(
 orderid: str, sku: str, qty: int,
 uow: unit_of_work.AbstractUnitOfWork
) -> str:
 line = OrderLine(orderid, sku, qty)
 with uow:
 product = uow.products.get(sku=line.sku)
 if product is None:
 raise InvalidSku(f'Invalid sku {line.sku}')
 batchref = product.allocate(line)
 uow.commit()
 return batchref

성능은 어떤가요?

고성능 소프트웨어가 필요하기 때문에 집계를 사용하여 모델링을 수행한다고 여러 번 언급했지만 여기서는 배치가 로드되고 하나만 필요합니다. 비효율적이라고 생각할 수도 있지만, 여기에는 몇 가지 이유가 있습니다.

첫째, 데이터는 의도적으로 모델링되어 데이터베이스에 한 번의 쿼리로 데이터를 읽고, 한 번의 업데이트로 변경 사항을 저장할 수 있습니다. 이는 많은 임시 쿼리를 실행하는 시스템보다 훨씬 더 나은 성능을 발휘하는 경향이 있습니다. 이러한 방식으로 모델링되지 않은 시스템에서는 소프트웨어가 발전함에 따라 트랜잭션이 더 길고 복잡해지는 경우가 종종 있습니다.

둘째, 데이터 구조가 최소화되어 각 행에 다수의 문자열과 정수가 포함되어 있습니다. 수십 또는 수백 개의 배치를 몇 밀리초 만에 쉽게 로드할 수 있습니다.

셋째, 한 번에 각 제품의 배치가 20개 정도만 있을 것으로 예상됩니다. 배치가 모두 소진되면 계산에서 제외될 수 있습니다. 즉, 시간이 지남에 따라 수집되는 데이터의 양이 많아지지 않아야 합니다.

특정 제품에 대해 수천 개의 활성 배치가 예상되는 경우 몇 가지 옵션이 있습니다. 첫째, 제품의 배치에 지연 로딩을 사용할 수 있습니다. 코드 관점에서는 아무것도 변경되지 않지만 백그라운드에서는 SQLAlchemy가 데이터를 페이지화합니다. 이렇게 하면 더 많은 요청이 발생하고 각 요청이 더 적은 행을 가져옵니다. 찾아야 하는 주문은 처리할 수 있는 충분한 용량을 가진 배치뿐이므로 이 방법이 잘 작동할 수 있습니다.

다른 방법이 없다면 다른 집계를 찾아보세요. 지역 또는 창고별로 배치를 분할할 수도 있습니다. 배송 개념을 중심으로 데이터 액세스 전략을 재설계할 수도 있습니다. 집계 모델은 일관성 및 성능과 관련된 몇 가지 기술적 제약을 관리하는 데 도움이 되도록 설계되었습니다. 올바른 집계라는 것은 존재하지 않으며, 발견된 경계가 성능 문제를 야기하는 경우 변경하는 것에 대해 편안하게 생각해야 합니다.

버전 번호를 사용한 최적의 동시성

새로운 집계를 사용하면 일관성 경계를 책임질 객체를 선택하는 개념이 해결됩니다. 이제 데이터베이스 수준에서 데이터 무결성을 적용하는 방법에 대해 잠시 이야기해 보겠습니다.

다음 사항에 유의하십시오.

이 섹션에는 많은 구현 세부 사항이 포함되어 있으며, 그 중 일부는 Postgres에만 해당되는 내용입니다. 그러나 보다 일반적으로 동시성 문제를 관리하는 방법론이 제시되어 있지만 이는 하나의 방법론일 뿐입니다. 이 영역의 실제 요구 사항은 프로젝트마다 크게 다릅니다. 여기에서 코드를 복사하여 프로덕션 환경으로 붙여넣을 수 있을 것이라고 기대해서는 안 됩니다.

전체 '배치' 테이블에 잠그고 싶지 않은데 특정 SKU의 행에만 잠그려면 어떻게 해야 하나요?

한 가지 해결책은 전체 상태 변경의 완료를 표시하는 단일 속성을 제품 모델에 두고 이를 동시 작업자가 경합할 수 있는 단일 리소스로 사용하는 것입니다. 두 트랜잭션이 동시에 "배치"의 월드 상태를 읽고 둘 다 "할당" 테이블을 업데이트하려는 경우, 둘 다 "제품" 테이블의 "제품" 속성도 업데이트하도록 강제합니다. "테이블을 업데이트하여 한 쪽만 승리하고 월드가 일관성을 유지하도록 합니다.

그림 7-4는 두 개의 동시 트랜잭션이 동시에 읽기 작업을 수행하여 버전이 3인 제품이 표시되는 것을 보여줍니다. 이 두 트랜잭션은 모두 Product.allocate()를 호출하여 상태를 수정합니다. 그러나 데이터베이스 무결성 규칙은 둘 중 하나만 commit을 사용하여 버전이 4인 새 제품을 커밋하고 다른 업데이트는 거부되도록 설정되어 있습니다.

STH에 주목하세요.

버전 번호는 낙관적 잠금을 구현하는 한 가지 방법일 뿐입니다. Postgres 트랜잭션 격리 수준을 SERIALIZABLE로 설정하여 동일한 효과를 얻을 수 있지만 일반적으로 심각한 성능 저하가 발생합니다. 버전 번호는 또한 암시적인 개념을 명시적으로 만들어 줍니다.

그림 7-4: 시퀀스 다이어그램: "제품"을 동시에 업데이트하려는 두 개의 트랜잭션.
[plantuml, apwp_0704, config=plantuml.cfg]
@startuml
entity Model
collections Transaction1
collections Transaction2
database Database
Transaction1 -> Database: get product
Database -> Transaction1: Product(version=3)
Transaction2 -> Database: get product
Database -> Transaction2: Product(version=3)
Transaction1 -> Model: Product.allocate()
Model -> Transaction1: Product(version=4)
Transaction2 -> Model: Product.allocate()
Model -> Transaction2: Product(version=4)
Transaction1 -> Database: commit Product(version=4)
Database -[#green]> Transaction1: OK
Transaction2 -> Database: commit Product(version=4)
Database -[#red]>x Transaction2: Error! version is already 4
@enduml

버전 번호 구현 옵션

버전 번호를 구현하는 데는 기본적으로 세 가지 옵션이 있습니다:

  1. 필드에 버전 번호가 존재하므로 이를 Product 생성자에 추가하면 Product.allocate()가 이를 증분하는 작업을 처리합니다.

  2. 서비스 계층이 이 작업을 수행할 수 있습니다! 버전 번호는 도메인 문제가 아니므로 서비스 계층은 현재 버전 번호가 리포지토리에 의해 제품에 첨부되어 있고 커밋()을 실행할 때 서비스 계층이 이를 증가시킬 것이라고 가정할 수 있습니다.

  3. 이는 인프라 문제이므로 UoW와 리포지토리는 마술처럼 이 작업을 수행할 수 있습니다. 리포지토리는 검색하는 모든 제품의 버전 번호에 액세스할 수 있으며, UoW가 커밋할 때 변경된 것으로 가정하여 알고 있는 모든 제품의 버전 번호를 늘릴 수 있습니다.

옵션 3은 제품이 모두 변경되었다고 가정하지 않고는 이 작업을 수행할 수 있는 실제 방법이 없기 때문에 이상적이지 않으므로 필요하지 않은 경우 버전 번호를 증가시킵니다. ¹

옵션 2는 서비스 계층과 도메인 계층 간에 상태 변경에 대한 책임이 혼재되어 있어 다소 혼란스러울 수 있습니다.

따라서 궁극적으로 버전 번호가 반드시 도메인과 관련이 없더라도 도메인에 넣는 것이 가장 깔끔한 절충안이라고 판단할 수 있습니다:

class Product:
 def __init__(self, sku: str, batches: List[Batch], version_number: int = 0): #(1)
 self.sku = sku
 self.batches = batches
 self.version_number = version_number #(1)
 def allocate(self, line: OrderLine) -> str:
 try:
 batch = next(b for b in sorted(self.batches) if b.can_allocate(line))
 batch.allocate(line)
 self.version_number += 1 #(1)
 return batch.reference
 except StopIteration:
 raise OutOfStock(f"Out of stock for sku {line.sku}")

그거예요!

STH에 주목하세요.

버전 번호의 비즈니스에 대해 혼란스러우신다면 버전 번호는 중요하지 않다는 점을 기억하시면 도움이 될 것입니다. 중요한 것은 '제품' 집계가 변경될 때마다 '제품' 데이터베이스 행이 수정된다는 것입니다. 버전 번호는 기록될 때마다 변경되는 것을 모델링하기 위한 간단하고 사람이 이해할 수 있는 방법이지만 매번 임의의 UUID가 될 수도 있습니다.

테스트를 위한 데이터 무결성 규칙

이제 원하는 동작을 얻을 수 있는지 확인하세요. 동일한 제품에 대해 두 번의 동시 할당 시도가 있는 경우 버전 번호를 동시에 업데이트할 수 없으므로 그 중 하나는 실패해야 합니다.

먼저 할당을 수행한 다음 명시적으로 잠자기 상태로 전환하는 함수를 사용하여 '느린' 트랜잭션을 시뮬레이션해 보겠습니다.

def try_to_allocate(orderid, sku, exceptions):
 line = model.OrderLine(orderid, sku, 10)
 try:
 with unit_of_work.SqlAlchemyUnitOfWork() as uow:
 product = uow.products.get(sku=sku)
 product.allocate(line)
 time.sleep(0.2)
 uow.commit()
 except Exception as e:
 print(traceback.format_exc())
 exceptions.append(e)

그런 다음 테스트는 스레드를 사용하여 이 느린 할당을 동시에 두 번 호출합니다:

def test_concurrent_updates_to_version_are_not_allowed(postgres_session_factory):
 sku, batch = random_sku(), random_batchref()
 session = postgres_session_factory()
 insert_batch(session, batch, sku, 100, eta=None, product_version=1)
 session.commit()
 order1, order2 = random_orderid(1), random_orderid(2)
 exceptions = [] # type: List[Exception]
 try_to_allocate_order1 = lambda: try_to_allocate(order1, sku, exceptions)
 try_to_allocate_order2 = lambda: try_to_allocate(order2, sku, exceptions)
 thread1 = threading.Thread(target=try_to_allocate_order1) #(1)
 thread2 = threading.Thread(target=try_to_allocate_order2) #(1)
 thread1.start()
 thread2.start()
 thread1.join()
 thread2.join()
 [[version]] = session.execute(
 "SELECT version_number FROM products WHERE sku=:sku",
 dict(sku=sku),
 )
 assert version == 2 #(2)
 [exception] = exceptions
 assert "could not serialize access due to concurrent update" in str(exception) #(3)
 orders = session.execute(
 "SELECT orderid FROM allocations"
 " JOIN batches ON allocations.batch_id = batches.id"
 " JOIN order_lines ON allocations.orderline_id = order_lines.id"
 " WHERE order_lines.sku=:sku",
 dict(sku=sku),
 )
 assert orders.rowcount == 1 #(4)
 with unit_of_work.SqlAlchemyUnitOfWork() as uow:
 uow.session.execute("select 1")

원하는 것을 안정적으로 생성하는 두 개의 스레드를 시작하고 read1, read2, write1, write2 같이 발행합니다.

버전 번호가 한 번만 증가했음을 주장합니다.

(iii)

원하는 경우 특정 이상 징후를 확인할 수도 있습니다.

그런 다음 할당이 하나만 완료되었는지 다시 확인합니다.

데이터베이스 트랜잭션 격리 수준을 사용하여 동시성 규칙 적용

테스트를 통과하려면 세션에서 트랜잭션 격리 수준을 설정하면 됩니다:

DEFAULT_SESSION_FACTORY = sessionmaker(bind=create_engine(
 config.get_postgres_uri(),
 isolation_level="REPEATABLE READ",
))
STH에 주목하세요.

트랜잭션 격리 수준은 까다로운 문제이므로 시간을 내서 Postgres 설명서를 이해하는 것이 좋습니다. ³

비관적 동시성 제어 예: SELECT FOR UPDATE

이 문제를 해결하는 방법에는 여러 가지가 있지만 한 가지를 시연하겠습니다 디자인을 효과적으로 통합 두 개의 동시 트랜잭션이 동시에 같은 행을 읽는 것이 허용되지 않는 다른 동작을 생성합니다.

SELECT FOR UPDATE는 한 행을 잠금으로 선택하는 방법입니다. 두 트랜잭션이 동시에 SELECT FOR UPDATE를 시도하면 한 트랜잭션이 승리하고 다른 트랜잭션은 잠금이 해제될 때까지 기다립니다. 따라서 이것은 비관적 동시성 제어의 예입니다.

다음은 SQLAlchemy DSL을 사용하여 쿼리 시점에 FOR UPDATE를 지정하는 방법입니다:

 def get(self, sku):
 return self.session.query(model.Product) \
 .filter_by(sku=sku) \
 .with_for_update() \
 .first()

이렇게 하면 동시성 모델이 변경됩니다.

read1, read2, write1, write2(fail)

최대

read1, write1, read2, write2(succeed)

어떤 사람들은 이를 "읽기-수정-쓰기" 오류 모드라고 합니다. 자세한 개요는 "PostgreSQL 안티 모드: 읽기-수정-쓰기 주기"를 참조하세요.

'반복 읽기'와 '업데이트 시 선택' 또는 낙관적 잠금과 비관적 잠금 사이의 모든 장단점을 논의할 시간은 없습니다. 하지만 표시된 것과 같은 테스트가 있다면 원하는 동작을 지정하고 어떻게 변경되는지 확인할 수 있습니다. 이 테스트를 일부 성능 실험을 수행하기 위한 기초로 사용할 수도 있습니다.

요약

동시성 제어와 관련된 구체적인 선택은 비즈니스 상황과 스토리지 기술 선택에 따라 크게 다르지만, 이 장에서는 모델의 일부 하위 집합에 대한 기본 진입점으로 개체를 명시적으로 모델링하고 해당 모든 개체에 적용되는 불변성과 비즈니스 규칙을 시행하는 책임을 지는 집계 개념으로 돌아가 보려고 합니다.

올바른 집계를 선택하는 것이 중요하며, 시간이 지나면서 다시 생각해 볼 수 있는 결정입니다. 자세한 내용은 여러 DDD 책에서 확인할 수 있습니다. 효과적인 집계 설계에 관한 본 버논의 세 가지 온라인 논문도 추천합니다.

표 7-1에는 집계 모델을 구현할 때의 장단점에 대한 몇 가지 생각이 나와 있습니다.

표 7-1. 집계: 장단점

파이썬에는 "공식적인" 공개 메서드와 비공개 메서드가 없지만, "내부적으로" 사용되는 것과 "외부적으로" 사용되는 것을 표시하는 것이 유용하기 때문에 밑줄 규칙이 있습니다. " 사용. 옵션 집계는 한 단계 더 나아가 도메인 모델 클래스에서 무엇이 공개이고 무엇이 공개되지 않는지 결정할 수 있게 해줍니다.초보 개발자를 위한 또 하나의 새로운 개념. 엔티티와 값 개체를 설명하는 것만으로도 정신적인 부담이 큰데, 이제 세 번째 도메인 모델 개체가 있다고요?
명시적 일관성 경계를 중심으로 작업을 모델링하면 ORM의 성능 문제를 방지하는 데 도움이 됩니다.한 번에 하나의 집계를 수정한다는 규칙을 엄격하게 준수하는 것은 큰 정신적 전환입니다.
집계가 하위 모델의 상태 변경에 대해 전적으로 책임을 지도록 하면 시스템을 더 쉽게 이해하고 불변성을 더 쉽게 제어할 수 있습니다.집계 간의 최종 일관성을 처리하는 것은 복잡할 수 있습니다.

1부 요약

그림 7-5를 기억하시나요? 이것은 CC BY-NC-SA 4.0 시작 부분에 표시된 차트로, 방향성을 미리 보여주는 차트입니다.

그림 7-5: 파트 1의 마지막에 적용된 구성 요소 다이어그램

여기까지 1부의 마지막 부분입니다. 무엇을 성취했나요? 일련의 높은 수준의 단위 테스트를 통해 검증되는 도메인 모델을 구축하는 방법을 살펴보았습니다. 테스트는 살아있는 문서로, 비즈니스 이해관계자와 합의한 규칙에 따라 시스템의 동작을 읽기 쉬운 코드로 설명합니다. 비즈니스 요구 사항이 변경되면 확실한 테스트를 통해 새로운 기능을 입증할 수 있으며, 새로운 개발자가 프로젝트에 합류할 때 테스트를 읽고 작동 방식을 이해할 수 있습니다.

데이터베이스 및 API 핸들러와 같은 시스템의 기본 부분은 애플리케이션 외부에서 연결할 수 있도록 분리되어 있습니다. 이렇게 하면 코드 기반이 잘 정리되어 큰 진흙덩어리가 만들어지는 것을 막을 수 있습니다.

의존성 반전 원칙을 적용하고 저장소 및 작업 단위와 같은 포트 및 어댑터에서 영감을 얻은 패턴을 사용하여 높은 수준과 낮은 수준 모두에서 TDD를 수행하고 건강한 테스트 피라미드를 유지할 수 있게 되었습니다. 엔드투엔드 테스트가 가능한 시스템, 통합 및 엔드투엔드 테스트의 필요성을 최소한으로 유지합니다.

마지막으로 일관성 경계라는 개념이 다루어졌습니다. 변경이 이루어질 때 전체 시스템을 잠그는 것은 바람직하지 않으므로 서로 일관성이 있는 부분을 선택해야 합니다.

소규모 시스템의 경우 도메인 중심 디자인 개념을 실험하는 데 이 정도면 충분합니다. 이제 비즈니스 전문가를 위한 공유 언어를 나타내는 데이터베이스에 구애받지 않는 도메인 모델을 구축할 수 있는 도구가 생겼습니다. 만세!

다음 사항에 유의하십시오.

반복될 위험이 있지만, 각 패턴에는 비용이 든다는 점을 반복해서 강조합니다. 각 간접 지시 계층은 코드의 복잡성과 중복을 야기하며, 이러한 패턴을 처음 접하는 프로그래머에게는 혼란을 줄 수 있습니다. 애플리케이션이 본질적으로 데이터베이스를 둘러싼 단순한 CRUD 래퍼인 경우, 앞으로는 다른 어떤 것도 될 가능성이 낮습니다. 장고를 계속 사용하면 많은 문제를 해결할 수 있습니다.

2부에서는 더 큰 주제인 '집계가 한 번에 하나씩만 업데이트할 수 있는 경계인 경우 일관성 경계를 넘나드는 프로세스를 어떻게 모델링할 수 있는가'에 대해 자세히 살펴보고 논의할 것입니다.

¹ 오브젝트가 더럽혀진 시점을 ORM/SQLAlchemy 마법을 통해 알 수 있지만, 일반적인 상황(예: Csv 리포지토리)에서는 어떻게 작동할 수 있을까요?

²time.sleep()은 사용 사례에서 잘 작동하지만, 동시성 오류를 재현하는 가장 안정적이거나 효율적인 방법은 아닙니다. 더 나은 동작 보장을 위해 스레드 간에 공유되는 신호 또는 유사한 동기화 프리미티브를 사용하는 것을 고려하세요.

Postgres를 사용하지 않는다면 다른 설명서를 읽어야 합니다. 짜증스럽게도 데이터베이스마다 정의가 상당히 다릅니다. 예를 들어, Oracle의 SERIALIZABLE은 Postgres의 REPEATABLE READ와 동일합니다.

부: 이벤트 중심 아키텍처

원본:

번역자: 플라잉 드래곤

프로토콜: CC BY-NC-SA 4.0

이 스레드에 '객체'라는 용어를 오래 전에 만들어서 많은 사람들이 부차적인 아이디어에 집중하게 만든 것 같아 죄송합니다.

중요한 아이디어는 "메시징" ...... 훌륭하고 확장 가능한 시스템을 설계하는 데 있어 핵심은 내부 속성과 동작보다는 모듈이 서로 통신하는 방식을 설계하는 데 있습니다.

--앨런 케이

비즈니스 프로세스의 작은 부분을 관리하기 위해 도메인 모델을 작성할 수 있다면 좋겠지만, 모델을 작성해야 할 때는 어떻게 될까요? 실제 세계에서 애플리케이션은 조직 내에 위치하며 시스템의 다른 부분과 정보를 교환해야 합니다. 그림 II-1에 표시된 컨텍스트 다이어그램을 기억하실 수 있습니다.

이러한 요구사항에 직면한 많은 팀이 HTTP API를 통해 통합된 마이크로서비스를 선택합니다. 하지만 주의하지 않으면 분산된 진흙 덩어리처럼 지저분한 결과를 초래할 수 있습니다.

두 번째 파트에서는 첫 번째 파트의 기술을 분산 시스템으로 확장하는 방법을 보여드립니다. 비동기 메시징을 통해 시스템의 여러 위젯 간의 상호작용을 어떻게 결합할 수 있는지 자세히 살펴볼 것입니다.

서비스 계층과 작업 단위 패턴을 통해 재구성된 애플리케이션을 비동기 메시지 프로세서로 실행할 수 있는 방법과 이벤트 중심 시스템이 집계와 애플리케이션을 서로 분리하는 데 어떻게 도움이 되는지 살펴볼 것입니다.

그림 II-1: 그렇다면 이 모든 시스템은 정확히 어떻게 서로 통신할까요?

다음 모델과 기술을 연구합니다:

현장 이벤트

일관성 경계를 넘어 워크플로를 트리거하세요.

메시지 버스

모든 엔드포인트에서 사용 사례를 호출할 수 있는 통합된 방법을 제공합니다.

CQRS

읽기와 쓰기를 분리하면 이벤트 중심 아키텍처의 불편한 타협을 피하고 성능과 확장성을 개선할 수 있습니다.

장: 이벤트 및 메시지 버스

원본: 8: 이벤트 및 메시지 버스

번역자: View

프로토콜: CC BY-NC-SA 4.0

지금까지는 장고로 쉽게 해결할 수 있는 간단한 문제를 해결하기 위해 많은 시간과 노력을 들였습니다. 추가된 테스트 가능성과 표현력이 그만한 가치가 있는지 의문이 들 수도 있습니다.

그러나 실제로는 코드베이스를 엉망으로 만드는 것은 명백한 기능이 아니라 가장자리 주변의 혼란이라는 것이 밝혀졌습니다. 수많은 개체가 관련된 보고, 권한 및 워크플로우가 바로 그것입니다.

예를 들어 재고가 부족하여 주문을 할당할 수 없는 경우 구매팀에 알려야 하는 것이 일반적인 알림 요건입니다. 그러면 구매팀에서 문제를 해결하고 재고를 추가 구매하여 모든 것이 잘 해결될 것입니다.

첫 번째 버전의 경우 제품 소유자는 이메일을 통해 알림을 보낼 수 있다고 말합니다.

시스템의 대부분을 구성하는 일상적인 것들을 연결할 때 아키텍처가 어떻게 작동하는지 살펴보겠습니다.

가장 쉽고 빠른 일을 먼저 하고 왜 그런 결정으로 인해 큰 진흙탕이 되었는지 논의할 것입니다.

그런 다음 패턴을 사용하여 사용 사례에서 부작용을 분리하는 방법과 이러한 이벤트를 기반으로 동작을 트리거하기 위해 간단한 패턴을 사용하는 방법을 보여드리겠습니다. 이러한 이벤트를 생성하는 몇 가지 옵션과 메시지 버스로 전달하는 방법을 보여주고, 마지막으로 그림 8-1에서 미리 본 것처럼 작업 단위 패턴을 수정하여 두 가지를 우아하게 연결하는 방법을 보여 드리겠습니다.

그림 8-1: 시스템을 통한 이벤트 흐름
STH에 주목하세요.

이 챕터의 코드는 GitHub의 chapter_08_events_and_message_bus 브랜치에 있습니다:

git clone https://.com/cosmicpython/.git
cd code
git checkout chapter_08_events_and_message_bus
# or to code along, checkout the previous chapter:
git checkout chapter_07_aggregate

엉망으로 만들지 않기

그래서. 인벤토리가 부족할 때 이메일 알림을 보내세요. 핵심 도메인과 관련이 없는 새로운 요구 사항이 발생하면 네트워크 컨트롤러에 이러한 요구 사항이 쏟아지기 쉽습니다.

먼저, 네트워크 컨트롤러를 엉망으로 만들지 않도록 하세요.

일회성 해킹이라면 괜찮습니다:

@app.route("/allocate", methods=['POST'])
def allocate_endpoint():
 line = model.OrderLine(
 request.json['orderid'],
 request.json['sku'],
 request.json['qty'],
 )
 try:
 uow = unit_of_work.SqlAlchemyUnitOfWork()
 batchref = services.allocate(line, uow)
 except (model.OutOfStock, services.InvalidSku) as e:
 send_mail(
 'out of stock',
 'stock_admin@made.com',
 f'{line.orderid} - {line.sku}'
 )
 return jsonify({'message': str(e)}), 400
 return jsonify({'batchref': batchref}), 201

... 하지만 이런 식으로 땜질하면 얼마나 빨리 엉망이 될 수 있는지 쉽게 알 수 있습니다. 이메일 전송은 HTTP 계층에서 작동하는 것이 아니며, 이 새로운 기능에 대한 단위 테스트가 있기를 바랍니다.

그리고 모델을 망치지 말자.

이 코드를 가능한 한 얇게 만들고 싶어서 네트워크 컨트롤러에 넣지 않으려는 경우 소스, 즉 모델에 넣는 것을 고려하세요:

 def allocate(self, line: OrderLine) -> str:
 try:
 batch = next(
 b for b in sorted(self.batches) if b.can_allocate(line)
 )
 #...
 except StopIteration:
 email.send_mail('stock@made.com', f'Out of stock for {line.sku}')
 raise OutOfStock(f'Out of stock for sku {line.sku}')

하지만 이것은 더 심각합니다! 원치 않는 모델은 email.send_mail과 같은 인프라 문제에 대한 종속성이 있습니다.

이러한 이메일은 환영받지 못하고 시스템의 깔끔한 흐름을 깨뜨리는 요소입니다. 도메인 모델이 "실제 사용 가능한 용량보다 더 많이 할당할 수 없다"는 규칙에 초점을 맞추는 것이 바람직합니다.

도메인 모델은 재고 부족을 인식하기 위해 작동하지만 알림을 보내는 책임은 다른 곳에 있습니다. 도메인 모델의 규칙을 변경하지 않고도 이 기능을 켜거나 끄거나 SMS 알림으로 전환할 수 있어야 합니다.

아니면 서비스 계층!

"인벤토리를 할당하고 실패 시 이메일을 보내야 한다"는 요구 사항은 워크플로 오케스트레이션의 한 예로, 목표를 달성하기 위해 시스템이 따라야 하는 일련의 단계입니다.

관리를 조율하기 위해 서비스 계층이 작성되었지만 여기에서도 기능이 적절하지 않은 느낌이 듭니다:

def allocate(
 orderid: str, sku: str, qty: int,
 uow: unit_of_work.AbstractUnitOfWork
) -> str:
 line = OrderLine(orderid, sku, qty)
 with uow:
 product = uow.products.get(sku=line.sku)
 if product is None:
 raise InvalidSku(f'Invalid sku {line.sku}')
 try:
 batchref = product.allocate(line)
 uow.commit()
 return batchref
 except model.OutOfStock:
 email.send_mail('stock@made.com', f'Out of stock for {line.sku}')
 raise

예외를 잡아내서 다시 트리거한다고요? 이보다 더 나쁠 수는 없지만 확실히 불행을 초래할 수 있습니다. 이 코드에 적합한 집을 찾기가 왜 그렇게 어려운가요?

단일 책임 원칙

사실, 이것이 위반입니다. ¹의 사용 사례는 할당입니다. 엔드포인트, 서비스 함수 및 도메인 메서드는 allocate_and_send_mail_if_out_of_stock 아닌 allocate라고 호출됩니다.

STH에 주목하세요.

경험 법칙: "then" 또는 "and"와 같은 단어를 사용하지 않고 함수가 수행하는 작업을 설명할 수 없는 경우 SRP를 위반하는 것일 수 있습니다.

SRP의 한 가지 공식은 각 클래스마다 변경 사유가 하나만 있어야 한다는 것입니다. 이메일에서 SMS로 전환하는 경우, 이는 명백히 별도의 책임이므로 업데이트된 allocate() 함수가 없어야 합니다.

이 문제를 해결하기 위해 여러 가지 문제가 서로 얽히지 않도록 안무를 별도의 단계로 세분화합니다. ² 도메인 모델의 역할은 재고 부족을 인지하는 것이지만, 알림을 보내는 책임은 다른 곳에 있습니다. 도메인 모델의 규칙을 변경하지 않고도 언제든지 이 기능을 켜거나 끄거나 SMS 알림으로 전환할 수 있어야 합니다.

또한 서비스 계층에 구현 세부 사항이 없도록 유지하는 것이 바람직합니다. 작업 단위를 사용하여 데이터베이스 종속성을 피하는 것과 같은 방식으로 서비스 계층이 추상화에 종속되도록 종속성 반전 원칙을 알림에 적용하는 것이 바람직합니다.

모든 승객이 버스에 탑승합니다!

표시되는 모드는 및입니다. 여러 가지 방법으로 구현할 수 있으므로 몇 가지를 표시한 다음 가장 선호하는 모드를 선택합니다.

모델 레코드 이벤트

첫째, 이 모델은 더 이상 이메일에 신경 쓰지 않고 일어난 일에 대한 사실, 즉 기록을 담당합니다. 메시지 버스는 이벤트에 응답하고 새로운 작업을 호출하는 데 사용됩니다.

이벤트는 간단한 데이터 클래스입니다.

는 특별합니다. 이벤트는 순수한 데이터 구조이므로 동작이 없습니다. 항상 도메인 언어로 이벤트 이름을 지정하고 도메인 모델의 일부로 취급하세요.

저장할 수는 있지만 자체 파일에 보관하는 것이 가능할 수도 있습니다:

from dataclasses import dataclass
class Event: #(1)
 pass
@dataclass
class OutOfStock(Event): #(2)
 sku: str

이벤트가 몇 개 있으면 공통 속성을 저장할 수 있는 부모 클래스가 있으면 유용합니다. 곧 보게 되겠지만 메시지 버스에서 유형 힌트에도 유용합니다.

데이터클래스는 현장 이벤트에도 유용합니다.

모델 트리거 이벤트

도메인 모델에서 이벤트가 발생했다는 사실을 기록하면 이벤트가 발생했다고 합니다.

외부에서는 다음과 같이 표시되며, 제품 할당이 요청되었지만 할당할 수 없는 경우 이벤트가 발생해야 합니다:

def test_records_out_of_stock_event_if_cannot_allocate():
 batch = Batch("batch1", "SMALL-FORK", 10, eta=today)
 product = Product(sku="SMALL-FORK", batches=[batch])
 product.allocate(OrderLine("order1", "SMALL-FORK", 10))
 allocation = product.allocate(OrderLine("order2", "SMALL-FORK", 1))
 assert product.events[-1] == events.OutOfStock(sku="SMALL-FORK") #(1)
 assert allocation is None

집계하면 이벤트 객체의 형태로 발생한 사실 목록을 포함하는 .events라는 새 속성이 노출됩니다.

모델은 내부에서 다음과 같이 보입니다:

class Product:
 def __init__(self, sku: str, batches: List[Batch], version_number: int = 0):
 self.sku = sku
 self.batches = batches
 self.version_number = version_number
 self.events = [] # type: List[events.Event] #(1)
 def allocate(self, line: OrderLine) -> str:
 try:
 #...
 except StopIteration:
 self.events.append(events.OutOfStock(line.sku)) #(2)
 # raise OutOfStock(f"Out of stock for sku {line.sku}") #(3)
 return None

이것이 새로운 .events 속성의 사용법입니다.

이메일을 직접 보내는 코드를 호출하는 대신 도메인의 언어만 사용하여 이벤트가 발생하는 위치를 기록하세요.

(iii)

또한 품절 상황에 대한 예외 발생도 중지됩니다. 이벤트가 예외 작업을 수행합니다.

다음 사항에 유의하십시오.

사실 지금까지 존재했던 코드의 악취, 즉 문제가 해결되고 있습니다. 일반적으로 도메인 이벤트를 구현하는 경우 동일한 도메인 개념을 설명하기 위해 예외를 발생시키지 마세요. 나중에 작업 단위 패턴에서 이벤트 처리를 다룰 때 보게 되겠지만 이벤트와 예외를 모두 고려해야 하는 것은 혼란스럽습니다.

메시지 버스가 이벤트를 핸들러에 매핑

메시지 버스는 기본적으로 "이 이벤트가 보이면 다음 핸들러 함수를 호출해야 합니다."라고 말합니다. 즉, 단순한 게시-구독 시스템입니다. 핸들러는 이벤트를 수신하고 이를 버스에 게시합니다. 생각보다 어렵게 들리며 일반적으로 사전을 사용하여 구현합니다:

def handle(event: events.Event):
 for handler in HANDLERS[type(event)]:
 handler(event)
def send_out_of_stock_notification(event: events.OutOfStock):
 email.send_mail(
 'stock@made.com',
 f'Out of stock for {event.sku}',
 )
HANDLERS = {
 events.OutOfStock: [send_out_of_stock_notification],
} # type: Dict[Type[events.Event], List[Callable]]
다음 사항에 유의하십시오.

구현된 메시지 버스는 한 번에 하나의 핸들러만 실행되므로 동시성을 제공하지 않는다는 점에 유의하세요. 병렬 스레드를 지원하는 것이 아니라 개념적으로 작업을 분리하고 각 UoW를 가능한 한 작게 만드는 것이 목표입니다. 이렇게 하면 각 사용 사례를 실행하기 위한 '레시피'가 한 곳에 작성되므로 코드 기반을 이해하는 데 도움이 됩니다. 아래 사이드바를 참조하세요.

옵션 1: 서비스 계층이 모델에서 이벤트를 가져와 메시지 버스에 넣습니다.

의 도메인 모델이 이벤트를 트리거하고, 이벤트가 발생하면 메시지 버스가 올바른 핸들러를 호출합니다. 이제 필요한 것은 이 둘을 연결하는 것입니다. 모델에서 이벤트를 캡처하여 메시지 버스로 전달할 수 있는 방법, 즉 단계가 필요합니다.

이를 수행하는 가장 쉬운 방법은 서비스 계층에 코드를 추가하는 것입니다:

from . import messagebus
...
def allocate(
 orderid: str, sku: str, qty: int,
 uow: unit_of_work.AbstractUnitOfWork,
) -> str:
 line = OrderLine(orderid, sku, qty)
 with uow:
 product = uow.products.get(sku=line.sku)
 if product is None:
 raise InvalidSku(f"Invalid sku {line.sku}")
 try: #(1)
 batchref = product.allocate(line)
 uow.commit()
 return batchref
 finally: #(1)
 messagebus.handle(product.events) #(2)

추악한 초기 구현에서 시도/마지막을 유지합니다.

하지만 이제는 이메일 인프라에 직접 의존하는 대신 서비스 계층이 모델에서 메시지 버스로 이벤트를 전달하는 역할을 담당합니다.

이렇게 하면 순진한 구현에서 발생하는 일부 추악한 문제를 피할 수 있으며, 서비스 계층이 집계에서 이벤트를 명시적으로 수집하여 메시지 버스로 전달하는 이런 방식으로 작동하는 여러 시스템이 있습니다.

옵션 2: 서비스 계층이 자체 이벤트를 발생시킵니다.

또 다른 변형은 도메인 모델에서 이벤트를 트리거하는 대신 서비스 계층이 직접 이벤트를 생성하고 트리거하도록 하는 것입니다:

def allocate(
 orderid: str, sku: str, qty: int,
 uow: unit_of_work.AbstractUnitOfWork,
) -> str:
 line = OrderLine(orderid, sku, qty)
 with uow:
 product = uow.products.get(sku=line.sku)
 if product is None:
 raise InvalidSku(f"Invalid sku {line.sku}")
 batchref = product.allocate(line)
 uow.commit() #(1)
 if batchref is None:
 messagebus.handle(events.OutOfStock(line.sku))
 return batchref

이전과 마찬가지로 할당할 수 없는 경우에도 커밋을 수행하는데, 이렇게 하면 코드가 더 간단하고 이해하기 쉬우므로 문제가 발생하지 않는 한 항상 커밋합니다. 변경 사항이 없을 때 커밋하면 안전하며 코드를 깔끔하게 유지할 수 있습니다.

다시 말하지만, 프로덕션 애플리케이션은 이런 방식으로 패턴을 구현합니다. 어떤 방식이 가장 적합한지는 특정 장단점에 따라 다르겠지만, 가장 우아한 솔루션으로 간주되는 것을 보여드리고 싶었는데, 이는 이벤트 수집 및 발생을 담당하는 작업 단위를 만드는 것입니다.

옵션 3: 메시지 버스에 이벤트 게시하기

UoW에는 이미 시도/종료 기능이 있으며, 저장소에 대한 액세스를 제공하기 때문에 현재 사용 중인 모든 집계에 대해 알고 있습니다. 따라서 이벤트를 발견하고 메시지 버스로 전달하기에 좋은 장소입니다:

class AbstractUnitOfWork(abc.ABC):
 ...
 def commit(self):
 self._commit() #(1)
 self.publish_events() #(2)
 def publish_events(self): #(2)
 for product in self.products.seen: #(3)
 while product.events:
 event = product.events.pop(0)
 messagebus.handle(event)
 @abc.abstractmethod
 def _commit(self):
 raise NotImplementedError
...
class SqlAlchemyUnitOfWork(AbstractUnitOfWork):
 ...
 def _commit(self): #(1)
 self.session.commit()

변경될 커밋 메서드를 사용하려면 서브클래스에 비공개 . _commit() 메서드가 필요합니다.

커밋 시 리포지토리에서 보이는 모든 오브젝트를 반복하고 해당 이벤트를 메시지 버스로 전달합니다.

(iii)

이는 다음 목록에서 볼 수 있듯이 새 속성 .seen을 사용하여 로드된 어그리게이션을 추적하는 리포지토리에 의존합니다.

다음 사항에 유의하십시오.

핸들러 중 하나가 실패하면 어떻게 되는지 궁금하신가요? 오류 처리는 10장에서 자세히 설명합니다.

class AbstractRepository(abc.ABC):
 def __init__(self):
 self.seen = set() # type: Set[model.Product] #(1)
 def add(self, product: model.Product): #(2)
 self._add(product)
 self.seen.add(product)
 def get(self, sku) -> model.Product: #(3)
 product = self._get(sku)
 if product:
 self.seen.add(product)
 return product
 @abc.abstractmethod
 def _add(self, product: model.Product): #(2)
 raise NotImplementedError
 @abc.abstractmethod #(3)
 def _get(self, sku) -> model.Product:
 raise NotImplementedError
class SqlAlchemyRepository(AbstractRepository):
 def __init__(self, session):
 super().__init__()
 self.session = session
 def _add(self, product): #(2)
 self.session.add(product)
 def _get(self, sku): #(3)
 return self.session.query(model.Product).filter_by(sku=sku).first()

UoW가 새 이벤트를 게시하려면 리포지토리에 이 세션 중에 사용된 제품 오브젝트를 요청할 수 있어야 합니다. 이를 저장하기 위해 .seen이라는 집합이 사용됩니다. 이는 슈퍼()를 호출해야 하는 구현을 의미합니다. __init__().

부모 add() 메서드는 .seen에 사물을 추가하며 이제 자식 클래스에서 . _add().

(iii)

마찬가지로 .get()은 하위 클래스로 구현된 . _get() 함수에 위임하여 서브클래스로 구현된 객체를 캡처합니다.

다음 사항에 유의하십시오.

*. 밑줄()* 메서드와 서브클래싱을 사용하는 것만이 이러한 패턴을 구현할 수 있는 유일한 방법은 아닙니다. 이 장의 독해 연습을 통해 몇 가지 대안을 실험해 보세요.

이 접근 방식에서는 UoW와 리포지토리가 협업하여 실시간 오브젝트를 자동으로 추적하고 이벤트를 처리한 후 서비스 계층에서 이벤트 처리 문제를 완전히 해결할 수 있습니다:

def allocate(
 orderid: str, sku: str, qty: int,
 uow: unit_of_work.AbstractUnitOfWork
) -> str:
 line = OrderLine(orderid, sku, qty)
 with uow:
 product = uow.products.get(sku=line.sku)
 if product is None:
 raise InvalidSku(f'Invalid sku {line.sku}')
 batchref = product.allocate(line)
 uow.commit()
 return batchref

서비스 수준에서 위조를 변경하고 올바른 위치에서 super()를 호출하고 밑줄 메서드를 구현하는 것도 중요하지만 변경 사항은 미미합니다:

class FakeRepository(repository.AbstractRepository):
 def __init__(self, products):
 super().__init__()
 self._products = set(products)
 def _add(self, product):
 self._products.add(product)
 def _get(self, sku):
 return next((p for p in self._products if p.sku == sku), None)
...
class FakeUnitOfWork(unit_of_work.AbstractUnitOfWork):
 ...
 def _commit(self):
 self.committed = True

이러한 위조품을 유지 관리하는 것이 유지 관리 부담이 되지 않을까 걱정할 수 있습니다. 의심할 여지가 없는 일이지만 경험상 그렇게 많은 작업은 아닙니다. 일단 프로젝트가 실행되고 나면 리포지토리와 UoW 추상화에 대한 인터페이스는 크게 달라지지 않습니다. ABC를 사용하면 동기화되지 않을 때 알림을 받을 수 있습니다.

요약

도메인 이벤트는 시스템에서 워크플로우를 처리하는 방법을 제공합니다. "재고를 할당하려고 시도했지만 사용할 수 없을 때 구매팀에 이메일을 보내야 합니다."와 같이 인과적 또는 시간적 방식으로 요구 사항을 표현하는 도메인 전문가의 의견을 경청하는 경우가 종종 있었습니다.

"X가 발생하면 Y가 발생한다"는 마법의 단어는 종종 시스템에서 구체화될 수 있는 이벤트에 대해 알려줍니다. 이벤트를 모델에서 사물의 첫 번째 클래스로 사용하면 코드를 더 쉽게 테스트하고 관찰할 수 있으며 우려 사항을 격리하는 데 도움이 됩니다.

와 표 8-1은 인식된 장단점을 보여줍니다.

도메인 이벤트: 장단점

요청에 대한 응답으로 여러 작업을 수행해야 하는 경우 메시지 버스는 책임을 분리하는 좋은 방법을 제공합니다.메시지 버스는 이벤트를 발생시키는 작업 단위가 마법과도 같은 구현이라는 점을 이해해야 합니다. 커밋이 호출될 때 사람들에게 이메일을 보낼 것이라는 것은 분명하지 않습니다.
이벤트 핸들러는 '핵심' 애플리케이션 로직과 잘 분리되어 있어 나중에 구현을 쉽게 변경할 수 있습니다.더 중요한 것은 숨겨진 이벤트 처리 코드 실행은 모든 이벤트 처리기가 완료될 때까지 서비스 계층 기능이 완료되지 않는다는 것을 의미합니다. 이로 인해 웹 엔드포인트에서 예기치 않은 성능 문제가 발생할 수 있습니다.
도메인 이벤트는 실제 세계를 모델링하는 좋은 방법이며 이해관계자와 함께 모델링할 때 비즈니스 언어의 일부로 사용할 수 있습니다.일반적으로 이벤트 기반 워크플로는 여러 처리기 체인에 분산되어 있기 때문에 요청이 어떻게 처리되는지 파악할 수 있는 단일 위치가 시스템에 없기 때문에 혼란스러울 수 있습니다.
또한 이벤트 핸들러 간의 순환 종속성 및 무한 루프 가능성에도 직면하게 됩니다.

이벤트는 단순히 이메일을 보내기 위한 것이 아닙니다. 7장에서는 집계를 정의하거나 일관성 경계를 보장해야 한다는 점을 설득하는 데 많은 시간을 할애했습니다. 사람들은 종종 "하나의 요청이 진행되는 동안 여러 개의 집계를 변경해야 하는 경우 어떻게 해야 하나요?"라고 질문합니다. 이제 이 질문에 답하는 데 필요한 도구를 사용할 수 있습니다.

트랜잭션에서 분리할 수 있는 두 가지가 있는 경우 이벤트를 사용하여 분리할 수 있습니다. 주문이 취소되면 해당 주문에 할당된 제품을 찾아서 해당 할당을 제거해야 합니다.

9장에서는 보다 복잡한 워크플로우를 구축하는 데 사용될 새로운 메시지 버스로서 이 아이디어를 더 자세히 살펴봅니다.

¹이 원칙은 "풋건.” 있습니다.

²의 기술 리뷰어인 Ed Jung은 명령어에서 이벤트 기반 프로세스 제어로의 전환으로 인해 전자가 다음과 같이 바뀌었다고 말합니다.

장: 메시지 버스에 대해 자세히 알아보기

원본: 9: 메시지 버스를 타고 마을로 가다

번역자: 이벤트 대 명령어

프로토콜: CC BY-NC-SA 4.0

이 장에서는 애플리케이션의 내부 구조에 이벤트를 더 중요하게 만들겠습니다. 그림 9-1의 현재 상태부터 시작하겠습니다. 여기서 이벤트는 선택적 부수 효과입니다.

그림 9-1:: 메시지 버스는 선택적 애드온입니다.

... 모든 것이 메시지 버스를 통과하고 애플리케이션이 메시지 프로세서로 근본적으로 변환된 그림 9-2의 상황으로 바뀝니다.

그림 9-2: 메시지 버스는 이제 서비스 계층의 주요 진입점입니다.
STH에 주목하세요.

이 챕터의 코드는 GitHub의 chapter_09_all_messagebus 브랜치에서 확인할 수 있습니다:

git clone https://.com/cosmicpython/.git
cd code
git checkout chapter_09_all_messagebus
# or to code along, checkout the previous chapter:
git checkout chapter_08_events_and_message_bus

새로운 요구 사항이 새로운 아키텍처로 이어집니다.

리치 히키는 오랜 기간 실행되고 실제 프로세스를 관리하는 소프트웨어를 언급하며 이야기했습니다. 창고 관리 시스템, 물류 스케줄러, 급여 시스템 등이 그 예입니다.

이 소프트웨어는 실제 물리적 물체와 신뢰할 수 없는 사람에게서 자주 발생하는 사고로 인해 작성하기가 어렵습니다. 예시:

  • 인벤토리 조사 중 지붕 누수로 인해 스프링 매트리스 3개가 손상된 것으로 확인되었습니다.

  • 한 발송물에 필요한 서류가 부족하여 세관에서 몇 주 동안 보류되었습니다. 이후 세 개의 RELIABLE-FORK가 보안 테스트에서 불합격하여 폐기되었습니다.

  • 전 세계적으로 리본이 부족하면 다음 SPARKLY-BOOKCASE 배치를 만들 수 없습니다.

이러한 경우 배치가 이미 시스템에 있는 경우 배치 수량을 변경해야 한다는 점을 이해하세요. 누군가 적하목록에 숫자를 잘못 입력했거나 트럭에서 소파가 떨어졌을 수도 있습니다. 비즈니스와 대화를 나눈 후 그림 9-3과 같은 상황을 모델링했습니다.

그림 9-3: 배치 크기 변경은 할당 취소 및 재할당을 의미합니다.
[ditaa, apwp_0903]
+----------+ /----\ +------------+ +--------------------+
| Batch |--> |RULE| --> | Deallocate | ----> | AllocationRequired |
| Quantity | \----/ +------------+-+ +--------------------+-+
| Changed | | Deallocate | ----> | AllocationRequired |
+----------+ +------------+-+ +--------------------+-+
 | Deallocate | ----> | AllocationRequired |
 +------------+ +--------------------+

호출될 BatchQuantityChanged 이벤트는 배치 수량을 변경해야 하지만, 새 수량이 총 할당량 아래로 떨어지면 해당 배치에서 해당 주문을 일괄 처리해야 하는 경우도 적용됩니다. 그러면 각 주문에는 새 할당이 필요하며, 이는 AllocationRequired 이벤트로 캡처할 수 있습니다.

아마도 여러분이 예상한 내부 메시지 버스와 이벤트가 이 요구 사항을 충족하는 데 도움이 될 수 있습니다. 배치 크기와 초과 주문 라인을 조정하는 방법을 알고 있는 change_batch_quantity 서비스를 정의한 다음, 할당을 취소할 때마다 별도의 트랜잭션으로 기존 할당 서비스에 전달할 수 있는 AllocationRequired 이벤트를 발행할 수 있습니다. 다시 한 번 강조하지만, 메시지 버스는 단일 책임 원칙을 적용하는 데 도움이 되며 트랜잭션과 데이터 무결성에 대한 선택이 가능합니다.

아키텍처의 변화를 상상해 보세요. 모든 것이 이벤트 핸들러가 될 것입니다.

그러나 계속 진행하면서 어디로 가야할지 생각해보세요. 시스템에는 두 가지 종류의 프로세스가 있습니다.

  • 서비스 계층 함수가 처리하는 API 호출

  • 내부 인시던트 및 처리 절차

모든 것이 이벤트 핸들러라면 더 쉬울까요? 재검토된 API 호출이 캡처 이벤트 역할을 한다면 서비스 계층 함수도 이벤트 핸들러가 될 수 있으며, 더 이상 내부 이벤트 핸들러와 외부 이벤트 핸들러를 구분할 필요가 없어질 것입니다:

  • services.add_batch() 아마도 BatchCreated 이벤트의 핸들러일 것입니다. ²

새로운 요구 사항도 동일한 패턴을 따릅니다:

  • 라는 이벤트는 BatchQuantityChanged 핸들러를 호출할 수 있습니다.

  • 발생할 수 있는 새 할당 필수 이벤트도 services.allocate()로 전달할 수 있으므로 API에서 생성된 새 할당과 취소에 의해 내부적으로 트리거되는 재할당 사이에 개념적인 차이가 없습니다.

조금 과한 것 같나요? 차근차근 그 방향으로 작업해 봅시다. "쉽게 변경한 다음 쉽게 변경하기"라고도 하는 준비된 리팩토링 워크플로우를 따를 것입니다:

  1. 서비스 계층은 이벤트 핸들러로 재구성됩니다. 이벤트는 시스템 입력을 설명하는 방식이라는 개념에 익숙해질 수 있습니다. 특히 기존의 services.allocate() 함수는 AllocationRequired라는 이벤트의 핸들러가 될 것입니다.

  2. change_batch_quantity() 시스템에 넣고 출력 할당된 이벤트를 찾는 엔드투엔드 테스트를 구축했습니다.

  3. 구현은 개념적으로 매우 간단합니다. collect_new_events() 대한 새 핸들러를 구현하면 AllocationRequired 이벤트가 발생하고, 이 이벤트는 API가 사용하는 할당에 대해 정확히 동일한 핸들러가 처리하게 됩니다.

이 과정에서 메시지 버스와 UoW에 약간의 조정이 이루어지며, 새 이벤트를 메시지 버스에 배치하는 책임이 메시지 버스 자체로 이동합니다.

서비스 함수를 메시지 핸들러로 리팩토링하기

먼저 현재 API 입력을 캡처하는 두 가지 이벤트, 즉 AllocationRequired와 BatchCreated가 정의됩니다:

@dataclass
class BatchCreated(Event):
 ref: str
 sku: str
 qty: int
 eta: Optional[date] = None
...
@dataclass
class AllocationRequired(Event):
 orderid: str
 sku: str
 qty: int

그런 다음 이름을 바꾸고, send_out_of_stock_notification 대한 기존 메시지 핸들러를 추가하고, 가장 중요한 것은 모든 핸들러가 동일한 입력(예: 이벤트 및 UoW)을 갖도록 모든 핸들러를 변경했습니다:

def add_batch(
 event: events.BatchCreated, uow: unit_of_work.AbstractUnitOfWork
):
 with uow:
 product = uow.products.get(sku=event.sku)
 ...
def allocate(
 event: events.AllocationRequired, uow: unit_of_work.AbstractUnitOfWork
) -> str:
 line = OrderLine(event.orderid, event.sku, event.qty)
 ...
def send_out_of_stock_notification(
 event: events.OutOfStock, uow: unit_of_work.AbstractUnitOfWork,
):
 email.send(
 'stock@made.com',
 f'Out of stock for {event.sku}',
 )

이러한 변화는 차이로 더 명확하게 나타날 수 있습니다:

 def add_batch(
- ref: str, sku: str, qty: int, eta: Optional[date],
- uow: unit_of_work.AbstractUnitOfWork
+ event: events.BatchCreated, uow: unit_of_work.AbstractUnitOfWork
 ):
 with uow:
- product = uow.products.get(sku=sku)
+ product = uow.products.get(sku=event.sku)
 ...
 def allocate(
- orderid: str, sku: str, qty: int,
- uow: unit_of_work.AbstractUnitOfWork
+ event: events.AllocationRequired, uow: unit_of_work.AbstractUnitOfWork
 ) -> str:
- line = OrderLine(orderid, sku, qty)
+ line = OrderLine(event.orderid, event.sku, event.qty)
 ...
+
+def send_out_of_stock_notification(
+ event: events.OutOfStock, uow: unit_of_work.AbstractUnitOfWork,
+):
+ email.send(
 ...

그 과정에서 서비스 계층 API가 보다 체계적이고 일관성 있게 만들어졌습니다. 과거에는 여러 개의 프리미티브가 사용되던 것이 이제는 잘 정의된 객체를 사용합니다.

메시지 버스가 이제 UoW에서 이벤트를 수집합니다.

또한 메시지 버스가 애플리케이션의 중심이 되면서 새로운 이벤트 수집 및 처리에 대한 명시적인 책임을 지는 것이 현명합니다. 지금까지는 UoW와 메시지 버스 사이에 순환적 종속성이 있었기 때문에 단방향성이 될 수 있었습니다:

def handle(
 event: events.Event,
 uow: unit_of_work.AbstractUnitOfWork, #(1)
):
 queue = [event] #(2)
 while queue:
 event = queue.pop(0) #(3)
 for handler in HANDLERS[type(event)]: #(3)
 handler(event, uow=uow) #(4)
 queue.extend(uow.collect_new_events()) #(5)

이제 메시지 버스는 시작 시마다 UoW를 전달합니다.

첫 번째 이벤트가 처리되기 시작하면 대기열이 시작됩니다.

(iii)

대기열 맨 앞쪽에서 이벤트를 팝업하고 해당 처리자에게 전화를 걸 수 있습니다.

메시지 버스는 각 핸들러에 UoW를 전달합니다.

각 처리기가 완료되면 생성된 모든 새 이벤트를 수집하여 대기열에 추가합니다.

에서 collect_new_events() ()는 덜 활성화된 메서드가 됩니다:

-from . import messagebus #(1)
 class AbstractUnitOfWork(abc.ABC):
@@ -22,13 +21,11 @@ class AbstractUnitOfWork(abc.ABC):
 def commit(self):
 self._commit()
- self.publish_events() #(2)
- def publish_events(self):
+ def collect_new_events(self):
 for product in self.products.seen:
 while product.events:
- event = product.events.pop(0)
- messagebus.handle(event)
+ yield product.events.pop(0) #(3)

이제 unit_of_work 모듈은 더 이상 메시지버스에 종속되지 않습니다.

더 이상 커밋 시 자동으로 publish_events가 발생하지 않습니다. 이제 메시지 버스가 이벤트 대기열을 추적합니다.

(iii)

그리고 UoW는 더 이상 이벤트를 메시지 버스에 적극적으로 올리지 않고 단지 사용할 수 있게만 합니다.

테스트는 모두 이벤트 기반으로 작성됩니다.

테스트는 이제 서비스 계층 함수를 직접 호출하지 않고 이벤트를 생성하여 메시지 버스에 배치하는 방식으로 작동합니다:

 class TestAddBatch:
 def test_for_new_product(self):
 uow = FakeUnitOfWork()
- services.add_batch("b1", "CRUNCHY-ARMCHAIR", 100, None, uow)
+ messagebus.handle(
+ events.BatchCreated("b1", "CRUNCHY-ARMCHAIR", 100, None), uow
+ )
 assert uow.products.get("CRUNCHY-ARMCHAIR") is not None
 assert uow.committed
...
 class TestAllocate:
 def test_returns_allocation(self):
 uow = FakeUnitOfWork()
- services.add_batch("batch1", "COMPLICATED-LAMP", 100, None, uow)
- result = services.allocate("o1", "COMPLICATED-LAMP", 10, uow)
+ messagebus.handle(
+ events.BatchCreated("batch1", "COMPLICATED-LAMP", 100, None), uow
+ )
+ result = messagebus.handle(
+ events.AllocationRequired("o1", "COMPLICATED-LAMP", 10), uow
+ )
 assert result == "batch1"

일시적인 어글리 해킹: 메시지 버스는 반드시 결과를 반환해야 합니다.

 def handle(event: events.Event, uow: unit_of_work.AbstractUnitOfWork):
+ results = []
 queue = [event]
 while queue:
 event = queue.pop(0)
 for handler in HANDLERS[type(event)]:
- handler(event, uow=uow)
+ results.append(handler(event, uow=uow))
 queue.extend(uow.collect_new_events())
+ return results

이는 시스템에서 읽기 및 쓰기 책임이 혼합되어 있기 때문입니다. 이 결함을 해결하기 위해 12장에서 다시 설명하겠습니다.

이벤트와 함께 작동하도록 수정된 API

 @app.route("/allocate", methods=["POST"])
 def allocate_endpoint():
 try:
- batchref = services.allocate(
- request.json["orderid"], #(1)
- request.json["sku"],
- request.json["qty"],
- unit_of_work.SqlAlchemyUnitOfWork(),
+ event = events.AllocationRequired( #(2)
+ request.json["orderid"], request.json["sku"], request.json["qty"]
 )
+ results = messagebus.handle(event, unit_of_work.SqlAlchemyUnitOfWork()) #(3)
+ batchref = results.pop(0)
 except InvalidSku as e:

요청 JSON에서 추출한 여러 프리미티브를 사용하여 서비스 계층을 호출하는 대신...

이벤트를 즉시 실행합니다.

(iii)

그런 다음 메시지 버스로 전달됩니다.

모든 기능을 갖춘 앱으로 돌아갔어야 했지만 이제는 완전히 이벤트 중심입니다:

  • 한때 서비스 계층 기능이었던 것이 이제는 이벤트 핸들러가 되었습니다.

  • 따라서 도메인 모델에 의해 트리거되는 내부 이벤트 처리와 동일한 기능을 수행합니다.

  • 이벤트를 시스템 입력을 캡처하고 내부 작업 패키지를 인계하기 위한 데이터 구조로 사용하세요.

  • 이제 전체 애플리케이션을 메시지 프로세서 또는 원하는 경우 이벤트 프로세서로 설명하는 것이 가장 좋습니다. 그 차이점은 다음 장에서 설명하겠습니다.

구현을 위한 새로운 요구 사항

리팩토링 단계가 완료되었습니다. 정말 "변경이 쉬워졌는지" 확인해 봅시다. 그림 9-4와 같이 새로운 요구 사항을 구현해 보겠습니다. 일부 새로운 collect_new_events() 입력으로 수신되어 핸들러로 전달되고, 핸들러는 일부 AllocationRequired 이벤트를 발행한 다음 기존 재할당 핸들러로 반환할 수 있습니다.

그림 9-4: 재할당 프로세스의 시퀀스 다이어그램
[plantuml, apwp_0904, config=plantuml.cfg]
@startuml
API -> MessageBus : BatchQuantityChanged event
group BatchQuantityChanged Handler + Unit of Work 1
 MessageBus -> Domain_Model : change batch quantity
 Domain_Model -> MessageBus : emit AllocationRequired event(s)
end
group AllocationRequired Handler + Unit of Work 2 (or more)
 MessageBus -> Domain_Model : allocate
end
@enduml
경고

이러한 작업을 두 개의 작업 단위로 분리하면 이제 두 개의 데이터베이스 트랜잭션이 있으므로 첫 번째 트랜잭션은 완료되지만 두 번째 트랜잭션은 완료되지 않는 무결성 문제에 직면할 수 있습니다. 이러한 상황이 허용되는지 여부와 이러한 상황이 발생했을 때 이를 인지하고 조치를 취해야 하는지 여부를 고려해야 합니다. 자세한 내용은 풋건을 참조하세요.

새 이벤트

@dataclass
class BatchQuantityChanged(Event):
 ref: str
 qty: int

테스트를 통해 새로운 핸들러 구동

4장에서 배운 교훈에 따라 단위 테스트를 '고속'으로 실행하고 최고 수준의 이벤트 기반 추상화로 작성할 수 있습니다. 다음과 같이 보일 수 있습니다:

class TestChangeBatchQuantity:
 def test_changes_available_quantity(self):
 uow = FakeUnitOfWork()
 messagebus.handle(
 events.BatchCreated("batch1", "ADORABLE-SETTEE", 100, None), uow
 )
 [batch] = uow.products.get(sku="ADORABLE-SETTEE").batches
 assert batch.available_quantity == 100 #(1)
 messagebus.handle(events.BatchQuantityChanged("batch1", 50), uow)
 assert batch.available_quantity == 50 #(1)
 def test_reallocates_if_necessary(self):
 uow = FakeUnitOfWork()
 event_history = [
 events.BatchCreated("batch1", "INDIFFERENT-TABLE", 50, None),
 events.BatchCreated("batch2", "INDIFFERENT-TABLE", 50, date.today()),
 events.AllocationRequired("order1", "INDIFFERENT-TABLE", 20),
 events.AllocationRequired("order2", "INDIFFERENT-TABLE", 20),
 ]
 for e in event_history:
 messagebus.handle(e, uow)
 [batch1, batch2] = uow.products.get(sku="INDIFFERENT-TABLE").batches
 assert batch1.available_quantity == 10
 assert batch2.available_quantity == 50
 messagebus.handle(events.BatchQuantityChanged("batch1", 25), uow)
 # order1 or order2 will be deallocated, so we'll have 25 - 20
 assert batch1.available_quantity == 5 #(2)
 # and 20 will be reallocated to the next batch
 assert batch2.available_quantity == 30 #(2)

간단한 케이스는 수량만 수정하면 되므로 매우 쉽게 달성할 수 있습니다.

그러나 수량을 할당된 수량보다 적게 변경하려는 경우 새 배치에 다시 할당될 것으로 예상하여 최소 하나의 주문 할당을 취소해야 합니다.

구현

새 핸들러는 매우 간단합니다:

def change_batch_quantity(
 event: events.BatchQuantityChanged, uow: unit_of_work.AbstractUnitOfWork
):
 with uow:
 product = uow.products.get_by_batchref(batchref=event.ref)
 product.change_batch_quantity(ref=event.ref, qty=event.qty)
 uow.commit()

리포지토리에 새로운 쿼리 유형이 필요하다는 것을 깨닫습니다.

class AbstractRepository(abc.ABC):
 ...
 def get(self, sku) -> model.Product:
 ...
 def get_by_batchref(self, batchref) -> model.Product:
 product = self._get_by_batchref(batchref)
 if product:
 self.seen.add(product)
 return product
 @abc.abstractmethod
 def _add(self, product: model.Product):
 raise NotImplementedError
 @abc.abstractmethod
 def _get(self, sku) -> model.Product:
 raise NotImplementedError
 @abc.abstractmethod
 def _get_by_batchref(self, batchref) -> model.Product:
 raise NotImplementedError
 ...
class SqlAlchemyRepository(AbstractRepository):
 ...
 def _get(self, sku):
 return self.session.query(model.Product).filter_by(sku=sku).first()
 def _get_by_batchref(self, batchref):
 return self.session.query(model.Product).join(model.Batch).filter(
 orm.batches.c.reference == batchref,
 ).first()

가짜 저장소도 있습니다:

class FakeRepository(repository.AbstractRepository):
 ...
 def _get(self, sku):
 return next((p for p in self._products if p.sku == sku), None)
 def _get_by_batchref(self, batchref):
 return next((
 p for p in self._products for b in p.batches
 if b.reference == batchref
 ), None)
다음 사항에 유의하십시오.

이 사용 사례를 더 쉽게 구현할 수 있도록 리포지토리에 쿼리가 추가되고 있습니다. 쿼리가 단일 집계를 반환하는 한, 규칙을 위반하지 않습니다. 리포지토리에 복잡한 쿼리를 작성하는 경우, 다른 디자인을 고려하는 것이 좋습니다. 특히 get_most_popular_products 또는 find_products_by_order_id 같은 메서드는 반드시 경고를 트리거합니다. 11장과 결론에 복잡한 쿼리 관리에 대한 몇 가지 팁이 나와 있습니다.

도메인 모델링에 대한 새로운 접근 방식

모델에 내부적으로 할당 수량을 변경 및 취소하고 새 이벤트를 게시하는 새로운 메서드를 추가했습니다. 기존 할당 함수도 이벤트를 게시하도록 수정되었습니다:

class Product:
 ...
 def change_batch_quantity(self, ref: str, qty: int):
 batch = next(b for b in self.batches if b.reference == ref)
 batch._purchased_quantity = qty
 while batch.available_quantity < 0:
 line = batch.deallocate_one()
 self.events.append(
 events.AllocationRequired(line.orderid, line.sku, line.qty)
 )
...
class Batch:
 ...
 def deallocate_one(self) -> OrderLine:
 return self._allocations.pop()

새 핸들러를 연결했습니다:

HANDLERS = {
 events.BatchCreated: [handlers.add_batch],
 events.BatchQuantityChanged: [handlers.change_batch_quantity],
 events.AllocationRequired: [handlers.allocate],
 events.OutOfStock: [handlers.send_out_of_stock_notification],
} # type: Dict[Type[events.Event], List[Callable]]

그리고 새로운 요구사항이 완전히 실현되었습니다.

선택 사항: 스퓨리어스 메시지 버스를 사용하여 격리된 단위 테스트 이벤트 핸들러

재할당 워크플로우의 주요 테스트는 다음과 같습니다. 이 테스트는 실제 메시지 버스를 사용하여 collect_new_events() 핸들러가 할당 취소를 트리거하고 새 AllocationRequired 이벤트를 발행한 다음 자체 핸들러가 이를 처리하는 전체 프로세스를 테스트합니다. 하나의 테스트는 일련의 여러 이벤트와 핸들러를 다룹니다.

이벤트 체인의 복잡성에 따라 일부 핸들러를 따로 테스트하고 싶을 수도 있습니다. "더미" 메시지 버스를 사용하여 이를 수행할 수 있습니다.

의 경우, 실제로 FakeUnitOfWork의 publish_events() 메서드를 수정하여 개입하고 실제 메시지 버스에서 분리하는 대신 보이는 이벤트를 기록하도록 합니다:

class FakeUnitOfWorkWithFakeMessageBus(FakeUnitOfWork):
 def __init__(self):
 super().__init__()
 self.events_published = [] # type: List[events.Event]
 def publish_events(self):
 for product in self.products.seen:
 while product.events:
 self.events_published.append(product.events.pop(0))

이제 FakeUnitOfWorkWithFakeMessageBus 함께 messagebus.handle()를 호출하면 해당 이벤트에 대한 핸들러만 실행합니다. 결과적으로 모든 부작용을 검사하는 대신 BatchQuantityChanged 인해 할당된 총량이 다음 AllocationRequired로 떨어지는지 여부만 검사하는 등 보다 격리된 단위 테스트를 작성할 수 있습니다:

def test_reallocates_if_necessary_isolated():
 uow = FakeUnitOfWorkWithFakeMessageBus()
 # test setup as before
 event_history = [
 events.BatchCreated("batch1", "INDIFFERENT-TABLE", 50, None),
 events.BatchCreated("batch2", "INDIFFERENT-TABLE", 50, date.today()),
 events.AllocationRequired("order1", "INDIFFERENT-TABLE", 20),
 events.AllocationRequired("order2", "INDIFFERENT-TABLE", 20),
 ]
 for e in event_history:
 messagebus.handle(e, uow)
 [batch1, batch2] = uow.products.get(sku="INDIFFERENT-TABLE").batches
 assert batch1.available_quantity == 10
 assert batch2.available_quantity == 50
 messagebus.handle(events.BatchQuantityChanged("batch1", 25), uow)
 # assert on new events emitted rather than downstream side-effects
 [reallocation_event] = uow.events_published
 assert isinstance(reallocation_event, events.AllocationRequired)
 assert reallocation_event.orderid in {'order1', 'order2'}
 assert reallocation_event.sku == 'INDIFFERENT-TABLE'

이 방법을 사용할지 여부는 이벤트 체인의 복잡성에 따라 달라집니다. 에지 투 에지 테스트를 먼저 수행하고 이 방법은 필요한 경우에만 사용하는 것이 좋습니다.

요약

달성한 성과를 검토하고 그 이유에 대해 생각해 보세요.

무엇을 달성했나요?

이벤트는 시스템에서 입력과 내부 메시지의 데이터 구조를 정의하는 간단한 데이터 클래스입니다. 이벤트는 일반적으로 비즈니스 언어로 매우 잘 표현되기 때문에 DDD 관점에서 매우 강력합니다.

핸들러는 이벤트에 반응하는 방식입니다. 호출되는 모델이거나 호출되는 외부 서비스일 수 있습니다. 원하는 경우 단일 이벤트에 대해 여러 개의 핸들러를 정의할 수 있습니다. 핸들러는 다른 이벤트를 트리거할 수도 있습니다. 이를 통해 핸들러 작업을 매우 세밀하게 제어하고 진정한 SRP 규정을 준수할 수 있습니다.

무엇을 달성했나요?

이러한 아키텍처 패턴을 사용하는 지속적인 목표는 애플리케이션의 복잡성이 애플리케이션의 크기보다 느리게 증가하도록 하는 것입니다. 메시지 버스에 올인할 때는 항상 아키텍처 복잡성에 대한 비용이 발생하지만, 작업 수행 방식에 대한 추가적인 개념적 또는 아키텍처적 변경 없이 거의 모든 복잡한 요구 사항을 처리할 수 있는 패턴을 구입하는 것이 좋습니다.

여기에는 상당히 복잡한 사용 사례가 추가되었지만, 아키텍처적으로 복잡성에는 비용이 들지 않습니다. 새로운 이벤트, 새로운 핸들러, 새로운 외부 어댑터가 추가되었는데, 모두 아키텍처의 기존 클래스로 이해와 추론 방법을 알고 있으며 초보자도 쉽게 설명할 수 있습니다. 아키텍처의 다양한 부분은 각각 해야 할 일이 있으며, 의도하지 않은 부작용 없이 잘 정의된 방식으로 상호 연결됩니다.

표 9-1. 메시지 버스로서의 전체 애플리케이션: 장단점

핸들러와 서비스가 동일하므로 더 간단합니다.메시지 버스는 웹 관점에서 볼 때 여전히 예측하기 어렵습니다. 언제 끝날지 미리 알 수 없기 때문입니다.
시스템에 입력할 수 있는 좋은 데이터 구조가 있습니다.모델 객체와 이벤트 간에 필드와 구조가 중복되어 유지 관리 비용이 발생합니다. 한 개체에 필드를 추가한다는 것은 일반적으로 적어도 하나 이상의 다른 개체에 필드를 추가하는 것을 의미합니다.

이제 그 BatchQuantityChanged 어디에서 오는 것인지 궁금하실 것입니다. 그 답은 몇 장에 걸쳐 공개될 것입니다. 하지만 먼저 이벤트와 명령어에 대해 이야기해 보겠습니다.

¹ 이벤트 기반 모델링이 인기를 끌면서 이벤트 기반 요구 사항 수집 및 도메인 모델 정교화를 용이하게 하는 관행이 등장했습니다.

이벤트 중심 아키텍처에 대해 조금이라도 알고 있다면 "저 이벤트 중 일부는 명령어처럼 들린다!"라고 생각할 수 있습니다. 조금만 참아주세요! 한 번에 하나의 개념을 소개하려고 합니다. 다음 장에서는 명령과 이벤트의 차이점을 소개하겠습니다.

이 장의 "간단한" 구현은 기본적으로 모듈 자체를 사용하여 싱글톤 패턴을 구현합니다.

Read next

최상의 환경을 위한 윈도우11 우분투2204 설정 (설치/우클릭 메뉴/루트/도커)

서문 윈도우에서 우분투 명령줄을 사용하지 않는다면 매우 불편할 텐데, 다행히 마이크로소프트에서 우분투 터미널을 제공하고 있어서 다운로드해서 설치한 후 설정만 하면 즐겁게 사용할 수 있습니다. 이 글에서는 설치, 우클릭 메뉴 설정, 루트에 대해 알아보겠습니다.

Oct 17, 2025 · 2 min read