blog

로켓엠큐 체계적 학습 - 스프링클라우드 알리바바 연동 로켓엠큐와 주문 소비 실습 - 블로그

@EnableBinding({CustomSink.@Value("${server.SpringApplication.run(OrderlyConsumerApplication. 입력 채널....

Oct 15, 2025 · 5 min. read
シェア

순차적 소비

순차적 소비에는 두 가지 유형이 있습니다:

  • 글로벌 주문: 동시성이 거의 없고 메시지 일관성 요구 사항이 엄격한 시나리오용

    하나의 대기열만 있는 토픽을 만들어서 생산자는 대기열에 메시지를 보내고 소비자는 해당 대기열에서 메시지를 소비하는 방식으로 메시지 순서가 보장됩니다.

  • 부분적으로 주문됨: 높은 성능 요구 사항이 있는 시나리오의 경우 디자인 수준에서 주문해야 하는 메시지가 토픽 아래의 동일한 대기열에 배치되어 순서를 보장합니다.

글로벌 주문

글로벌 주문을 보장하려면 위에서 시작한 대시보드 프로젝트를 통해 대기열이 하나만 있는 토픽을 만드세요.

쓰기 및 읽기 대기열을 모두 1로 설정하고 파마는 6으로 설정합니다.

글로벌 주문 흐름도는 아래와 같습니다:

먼저 주요 소비자 스타트업 클래스는 다음과 같습니다:

java
@SpringBootApplication @EnableBinding({CustomSink.class }) public class OrderlyConsumerApplication { @Value("${server.port}") private int port; public static void main(String[] args) { SpringApplication.run(OrderlyConsumerApplication.class, args); System.out.println(" OrderlyConsumerApplication 성공적으로 시작되었습니다! "); } // 입력은 전역 주문 메시지를 수신하고 입력2는 로컬 주문 메시지를 수신하는 두 개의 채널을 정의합니다. @StreamListener("input") public void receiveInput(String receiveMsg) { System.out.println(port + " port, input receive: " + receiveMsg); } @StreamListener("input2") public void receiveInput2(String receiveMsg) { System.out.println(port + " port, input2 receive: " + receiveMsg); } }

다음과 같이 CustomSink를 사용자 지정합니다:

java
public interface CustomSink extends Sink { /** * Input channel name. */ String INPUT2 = "input2"; /** * @return input channel. */ @Input(CustomSink.INPUT2) SubscribableChannel input2(); }

구성 클래스 application.properties; 다음과 같습니다:

properties
spring.application.name=mq_orderly_consumer server.port=9530 # configure the nameserver of rocketmq spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876 spring.cloud.stream.rocketmq.binder.group=mq_producer_group # configure the input binding named input spring.cloud.stream.bindings.input.destination=Global-Orderly-Topic spring.cloud.stream.bindings.input.content-type=application/json spring.cloud.stream.bindings.input.group=Global-Orderly-Topic-group spring.cloud.stream.rocketmq.bindings.input.consumer.orderly=true # configure the input binding named input spring.cloud.stream.bindings.input2.destination=Partly-Orderly-Topic spring.cloud.stream.bindings.input2.content-type=application/json spring.cloud.stream.bindings.input2.group=Partly-Orderly-Topic-group spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=true

글로벌 주문 생산자 코드는 다음과 같습니다:

java
public class GlobalProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer( "producer_group", true); producer.setNamesrvAddr("218.95.37.160:9876"); producer.start(); for (int i = 0; i < 12; i++) { Message msg = new Message( "Global-Orderly-Topic", "Global_Orderly_Tag", ("( " + i + " )message from GlobalProducer").getBytes()); msg.setKeys("Global_Orderly_Tag"); producer.send(msg); } System.out.println("Send Finished."); } }

소비자를 먼저 시작한 다음 생산자를 시작하면 소비자 측에서 메시지가 질서정연하게 소비되는 것을 볼 수 있습니다.

로컬 주문

로컬 순서를 보장하려면 순서가 필요한 메시지를 동일한 주제 아래에 같은 큐에 넣어 순서를 보장할 수 있으며, 여기서는 OrderId가 동일한 메시지를 동일한 큐에 보낼 수 있도록 설계했으며, 플로차트는 다음과 같습니다:

로컬 주문에서 소비자는 여전히 글로벌 주문에서 소비자를 사용하며 로컬 생산자 코드는 다음과 같습니다:

java
public class PartlyProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer( "producer_group", true); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); /** * orderId = 1 로컬 주문에서 소비자는 여전히 글로벌 주문의 소비자를 사용하며 로컬 생산자 코드는 다음과 같습니다. * orderId = 2 로컬 주문에서 소비자는 여전히 글로벌 주문의 소비자를 사용하며 로컬 생산자 코드는 다음과 같습니다. */ List<Order> list = new ArrayList<>(); for (int i = 1; i <= 3; i ++) { Order order = new Order(); order.orderId = 1; order.step = i; list.add(order); } for (int i = 5; i <= 8; i ++) { Order order = new Order(); order.orderId = 2; order.step = i; list.add(order); } System.out.println(list); int size = list.size(); for (int i = 0; i < size; i++) { Order order = list.get(i); Message msg = new Message( "Partly-Orderly-Topic", "Partly_Orderly_Tag", (order.toString()).getBytes()); msg.setKeys("Partly_Orderly_Tag"); /** * 여기서 메시지를 보낼 때 주문아이디에 따라 해당 대기열이 선택됩니다. */ producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int orderId = (int)arg; int idx = orderId % mqs.size(); return mqs.get(idx); } }, order.orderId); } System.out.println("Send Finished."); } public static class Order { int orderId; int step; @Override public String toString() { return "Order{" + "orderId=" + orderId + ", step=" + step + '}'; } } }
Read next

작고, 빠르고, 가벼운 .

머리말 오늘은 작고 빠르며 가벼운 .NET NoSQL 임베디드 데이터베이스 인 LiteDB를 소개합니다. 이 게시물은 주로 LiteDB를 소개하고 에서 사용하는 방법을 소개하는 것입니다. LiteDB 소개 LiteDB는 작고 빠른

Oct 15, 2025 · 7 min read