FIFO Queue Support in Spring Cloud AWS
1. 개요
AWS SQS의 FIFO (First-In-First-Out) 큐는 메시지가 전송되는 정확한 순서로 처리되도록 설계되어 있으며 각 메시지는 한 번만 전달됩니다.
Spring Cloud AWS v3는 개발자가 최소한의 보일러플레이트 코드로 메시지 정렬 및 중복 제거와 같은 FIFO 큐 기능을 처리할 수 있도록 쉽게 사용할 수 있는 추상화를 지원합니다.
이번 튜토리얼에서는 금융 거래 처리 시스템의 맥락에서 FIFO 큐의 세 가지 실용적인 사용 사례를 탐구해 보겠습니다:
- 동일한 계정 내에서 거래에 대한 엄격한 메시지 정렬 보장
- 서로 다른 계정의 거래를 병렬 처리하면서 각 계정에 대한 FIFO 의미론 유지
- 처리 실패 시 메시지 재시도 처리, 재시도가 원래 메시지 순서를 준수하도록 보장
이러한 시나리오를 시연하기 위해 이벤트 기반 애플리케이션을 설정하고 행동이 예상대로 나타나는지 확인하기 위한 라이브 테스트를 만들 것입니다. 이 과정에서는 Spring Cloud AWS SQS V3 소개 글의 환경과 테스트 설정을 활용합니다.
2. 의존성
우선, 의존성을 관리하고 버전 호환성을 보장하기 위해 Spring Cloud AWS BOM을 가져옵니다:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws</artifactId>
<version>3.2.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
다음으로, 핵심 기능 및 SQS 통합을 위한 필요한 Spring Cloud AWS 스타터를 추가합니다:
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter</artifactId>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>
우리는 또한 Spring Boot Web Starter를 포함할 것입니다. Spring Cloud AWS BOM을 사용하므로 버전을 명시할 필요가 없습니다.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
마지막으로, 테스트를 위해 JUnit 5, 비동기 작업 검증을 위한 Awaitility, 및 Spring Boot Test Starter에 대한 의존성을 추가합니다:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
3. 로컬 테스트 환경 설정
다음으로, Testcontainers와 LocalStack를 사용하여 로컬 테스트 환경을 구성합니다. SqsLiveTestConfiguration 클래스를 만들어 보겠습니다:
@Configuration
public class SqsLiveTestConfiguration {
private static final String LOCAL_STACK_VERSION = "localstack/localstack:3.4.0";
@Bean
@ServiceConnection
LocalStackContainer localStackContainer() {
return new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));
}
}
이 클래스에서는 우리의 LocalStack 테스트 컨테이너를 Spring Bean으로 선언하고 @ServiceConnection 주석을 사용하여 연결을 처리합니다.
4. 큐 이름 설정
SQS 큐 이름을 설정하기 위해 application.yml 파일 내에 정의하고 Spring Boot의 구성 외부화 기능을 활용합니다:
events:
queues:
fifo:
transactions-queue: "transactions-queue.fifo"
slow-queue: "slow-queue.fifo"
failure-queue: "failure-queue.fifo"
이 구조는 우리의 큐 이름을 계층 구조로 조직하여 애플리케이션 코드에서 쉽게 관리하고 접근할 수 있도록 합니다. .fifo 접미사는 SQS에서 FIFO 큐에 필수적입니다.
5. 애플리케이션 설정
이제 Transaction 마이크로서비스를 사용하여 이러한 개념을 실제 예시로 설명해 보겠습니다. 이 서비스는 TransactionEvent 메시지를 처리할 것입니다. 이 메시지는 각 계정 내에서 순서가 유지되어야 하는 금융 거래를 나타냅니다.
먼저, Transaction 엔터티를 정의합니다:
public record Transaction(UUID transactionId, UUID accountId, double amount, TransactionType type) {}
그리고 TransactionType 열거형을 만듭니다:
public enum TransactionType {
DEPOSIT,
WITHDRAW
}
다음으로, TransactionEvent를 생성합니다:
public record TransactionEvent(UUID transactionId, UUID accountId, double amount, TransactionType type) {
public Transaction toEntity() {
return new Transaction(transactionId, accountId, amount, type);
}
}
TransactionService 클래스는 처리 로직을 처리하고 테스트 용도로 시뮬레이션된 저장소를 유지합니다:
@Service
public class TransactionService {
private static final Logger logger = LoggerFactory.getLogger(TransactionService.class);
private final ConcurrentHashMap<UUID, List<Transaction>> processedTransactions = new ConcurrentHashMap<>();
public void processTransaction(Transaction transaction) {
logger.info("Processing transaction: {} for account {}", transaction.transactionId(), transaction.accountId());
processedTransactions.computeIfAbsent(transaction.accountId(), k -> new ArrayList<>()).add(transaction);
}
public List<Transaction> getProcessedTransactionsByAccount(UUID accountId) {
return processedTransactions.getOrDefault(accountId, new ArrayList<>());
}
}
6. 순서대로 이벤트 처리
첫 번째 시나리오에서는 이벤트를 처리하는 리스너를 생성하고 메시지를 보낸 순서와 동일한 순서로 이벤트를 수신하는지 확인하는 테스트를 만듭니다. 우리는 @RepeatedTest 주석을 사용하여 테스트를 100번 실행하여 일관성을 확인하고 FIFO 대신 표준 SQS 큐가 어떻게 동작하는지 확인할 것입니다.
6.1. 리스너 생성
이제 이벤트를 수신하고 순서대로 처리하는 첫 번째 리스너를 만들어 보겠습니다. 우리는 @SqsListener 주석을 사용하여 application.yml 파일에서 큐 이름을 해석할 것입니다:
@Component
public class TransactionListener {
private final TransactionService transactionService;
public TransactionListener(TransactionService transactionService) {
this.transactionService = transactionService;
}
@SqsListener("${events.queues.fifo.transactions-queue}")
public void processTransaction(TransactionEvent transactionEvent) {
transactionService.processTransaction(transactionEvent.toEntity());
}
}
추가적인 설정이 필요하지 않습니다. 프레임워크는 백그라운드에서 큐 유형이 FIFO임을 감지하고 리스너 메서드가 메시지를 올바른 순서로 받을 수 있도록 필요한 모든 조정을 합니다.
6.2. 테스트 생성
이제 수신된 메시지의 순서가 전송된 순서와 정확히 일치하는지 검증하는 테스트를 생성하겠습니다. 먼저 우리가 전에 만든 BaseSqsLiveTest를 확장하는 테스트 스위트를 시작합니다:
@SpringBootTest
public class SpringCloudAwsSQSTransactionProcessingTest extends BaseSqsLiveTest {
@Autowired
private SqsTemplate sqsTemplate;
@Autowired
private TransactionService transactionService;
@Value("${events.queues.fifo.transactions-queue}")
String transactionsQueue;
@Test
void givenTransactionsFromSameAccount_whenSend_shouldReceiveInOrder() {
var accountId = UUID.randomUUID();
var transactions = List.of(createDeposit(accountId, 100.0),
createWithdraw(accountId, 50.0),
createDeposit(accountId, 25.0));
var messages = createTransactionMessages(accountId, transactions);
sqsTemplate.sendMany(transactionsQueue, messages);
await().atMost(Duration.ofSeconds(5))
.until(() -> transactionService.getProcessedTransactionsByAccount(accountId),
isEqual(eventsToEntities(transactions)));
}
}
이 테스트에서는 SqsTemplate의 sendMany() 메서드를 활용하여 한 번에 최대 10개의 메시지를 보냅니다. 그런 다음, 메시지를 수신할 때까지 최대 5초 동안 대기합니다.
테스트 로직을 깔끔하게 유지하기 위해 몇 가지 헬퍼 메서드를 생성하겠습니다. sendMany() 메서드는 List
private List<Message<TransactionEvent>> createTransactionMessages(UUID accountId,
Collection<TransactionEvent> transactions) {
return transactions.stream()
.map(transaction -> MessageBuilder.withPayload(transaction)
.setHeader(SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER,
accountId.toString())
.build())
.toList();
}
SQS FIFO에서는 MessageGroupId 속성이 함께 그룹화되어야 할 메시지를 알리는 데 사용됩니다. 우리의 시나리오에서, 우리는 한 계정의 거래가 순서대로 유지되도록 보장해야 하지만 계정 간의 순서는 필요하지 않기 때문에 accountId를 MessageGroupId로 사용할 것입니다. 이를 위해 SqsHeaders의 헤더를 사용할 수 있으며 프레임워크는 이를 SQS 메시지 속성으로 매핑합니다.
나머지 헬퍼 메서드는 이벤트를 거래로 매핑하고 TransactionEvents를 생성하는 간단한 메서드입니다:
private List<Transaction> eventsToEntities(List<TransactionEvent> transactionEvents) {
return transactionEvents.stream()
.map(TransactionEvent::toEntity)
.toList();
}
private TransactionEvent createWithdraw(UUID accountId, double amount) {
return new TransactionEvent(UUID.randomUUID(), accountId, amount, TransactionType.WITHDRAW);
}
private TransactionEvent createDeposit(UUID accountId, double amount) {
return new TransactionEvent(UUID.randomUUID(), accountId, amount, TransactionType.DEPOSIT);
}
6.3. 테스트 실행
테스트를 실행하면, 테스트가 통과하고 거래가 선언한 순서대로 발생하는 로그를 생성하는 것을 확인할 수 있습니다:
TransactionService : Processing transaction: DEPOSIT:100.0 for account f97876f9-5ef9-4b62-a69d-a5d87b5b8e7e
TransactionService : Processing transaction: WITHDRAW:50.0 for account f97876f9-5ef9-4b62-a69d-a5d87b5b8e7e
TransactionService : Processing transaction: DEPOSIT:25.0 for account f97876f9-5ef9-4b62-a69d-a5d87b5b8e7e
우리가 여전히 확신이 서지 않거나 우연이 아닐 경우, @RepeatableTest 주석을 추가하여 이 테스트를 100번 실행할 수 있습니다:
@RepeatedTest(100)
void givenTransactionsFromSameAccount_whenSend_shouldReceiveInOrder() {
// ... test remains the same
}
모든 100번의 실행이 동일한 순서로 실패 없이 통과해야 합니다.
추가적으로, FIFO 대신 표준 큐를 사용해 테스트하여 어떻게 동작하는지 확인해 보겠습니다.
이를 위해, application.yml의 큐 이름에서 .fifo 접미사를 제거해야 합니다:
transactions-queue: "transactions-queue"
그 다음, createTransactionMessages() 메서드에서 MessageId 헤더를 추가하는 코드를 주석 처리해야 하며, 표준 SQS 큐는 해당 속성을 지원하지 않습니다:
// .setHeader(SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER, accountId.toString())
이제 다시 100번 테스트를 실행합니다. 메시지가 예상한 순서로 도착하우연히 잘 처리된 경우 가끔 테스트가 통과한 것을 확인하지만, 다른 경우에는 표준 큐에서는 메시지 순서를 보장하지 않기 때문에 실패하는 것을 보아야 할 것입니다.
이 섹션을 마치기 전에 이러한 변경 사항을 되돌리고 .fifo 접미사를 큐에 추가하며, @RepeatedTest 주석을 제거하고 MessageGroupId 코드를 주석 해제합니다.
7. 여러 메시지 그룹을 병렬로 처리
SQS FIFO에서 메시지 소비 처리량을 극대화하기 위해, 서로 다른 메시지 그룹의 메시지를 병렬로 처리할 수 있으며, 각 메시지 그룹 내에서 메시지 순서를 유지할 수 있습니다. Spring Cloud AWS SQS는 추가적인 구성 없이 이러한 동작을 기본적으로 지원합니다.
이 동작을 설명하기 위해, 거래 서비스에 느린 연결을 시뮬레이션하는 메서드를 추가해 보겠습니다:
public void simulateSlowProcessing(Transaction transaction) {
try {
processTransaction(transaction);
Thread.sleep((int) 100);
logger.info("Transaction processing completed: {}:{} for account {}", transaction.type(), transaction.amount(), transaction.accountId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
느린 연결을 사용하면 서로 다른 계정의 메시지가 병렬로 처리되면서 각 계정 내에서 거래 순서가 보존되고 있는지 검증할 수 있습니다.
이제 TransactionListener 클래스 내에서 이 새로운 메서드를 사용할 리스너를 만들겠습니다:
@SqsListener("${events.queues.fifo.slow-queue}")
public void processParallelTransaction(TransactionEvent transactionEvent) {
transactionService.simulateSlowProcessing(transactionEvent.toEntity());
}
마지막으로, 행태를 검증하기 위해 테스트를 만들겠습니다:
@Test
void givenTransactionsFromDifferentAccounts_whenSend_shouldProcessInParallel() {
var accountId1 = UUID.randomUUID();
var accountId2 = UUID.randomUUID();
var account1Transactions = List.of(createDeposit(accountId1, 100.0),
createWithdraw(accountId1, 50.0),
createDeposit(accountId1, 25.0));
var account2Transactions = List.of(createDeposit(accountId2, 50.0),
createWithdraw(accountId2, 25.0),
createDeposit(accountId2, 50.0));
var allMessages = Stream.concat(createTransactionMessages(accountId1, account1Transactions).stream(),
createTransactionMessages(accountId2, account2Transactions).stream()).toList();
sqsTemplate.sendMany(slowQueue, allMessages);
await().atMost(Duration.ofSeconds(5))
.until(() -> transactionService.getProcessedTransactionsByAccount(accountId1),
isEqual(eventsToEntities(account1Transactions)));
await().atMost(Duration.ofSeconds(5))
.until(() -> transactionService.getProcessedTransactionsByAccount(accountId2),
isEqual(eventsToEntities(account2Transactions)));
}
이 테스트에서는 두 개의 서로 다른 계정에 대한 두 세트의 트랜잭션 이벤트를 보냅니다. 그런 다음 우리는 다시 sendMany() 메서드를 활용하여 모든 메시지를 같은 배치로 보내고, 모든 메시지가 예상한 순서로 수신되는지 검증합니다.
테스트를 실행하면 다음과 유사한 로그를 확인할 수 있습니다:
TransactionService : Processing transaction: DEPOSIT:50.0 for account 639eba64-a40d-458a-be74-2457dff9d6d1
TransactionService : Processing transaction: DEPOSIT:100.0 for account 1a813756-520c-4713-a0ed-791b66e4551c
TransactionService : Transaction processing completed: DEPOSIT:100.0 for account 1a813756-520c-4713-a0ed-791b66e4551c
TransactionService : Transaction processing completed: DEPOSIT:50.0 for account 639eba64-a40d-458a-be74-2457dff9d6d1
TransactionService : Processing transaction: WITHDRAW:50.0 for account 1a813756-520c-4713-a0ed-791b66e4551c
TransactionService : Processing transaction: WITHDRAW:25.0 for account 639eba64-a40d-458a-be74-2457dff9d6d1
TransactionService : Transaction processing completed: WITHDRAW:50.0 for account 1a813756-520c-4713-a0ed-791b66e4551c
TransactionService : Transaction processing completed: WITHDRAW:25.0 for account 639eba64-a40d-458a-be74-2457dff9d6d1
TransactionService : Processing transaction: DEPOSIT:50.0 for account 639eba64-a40d-458a-be74-2457dff9d6d1
TransactionService : Processing transaction: DEPOSIT:25.0 for account 1a813756-520c-4713-a0ed-791b66e4551c
서로 다른 두 계정의 메시지가 병렬로 처리되는 동안 각 계정 내에서 순서가 보존되고 있음을 확인할 수 있으며, 이는 테스트가 통과함으로써 검증됩니다.
8. 순서대로 재처리
마지막 시나리오에서는 네트워크 실패를 시뮬레이션하고 처리 순서가 일관되게 유지되도록 합니다. 리스너 메서드가 오류를 발생시키면 프레임워크는 해당 메시지 그룹에 대한 실행을 중단하고 메시지를 승인하지 않습니다. SQS는 가시성 창이 만료된 후 나머지 메시지를 다시 제공합니다.
이 동작을 설명하기 위해, 첫 번째 메시지를 처리할 때 항상 실패하는 메서드를 TransactionService에 추가하겠습니다.
먼저, 이미 실패한 ID를 보관할 Set을 추가합니다:
private final Set<UUID> failedTransactions = ConcurrentHashMap.newKeySet();
그런 다음, processTransactionWithFailure() 메서드를 추가합니다:
public void processTransactionWithFailure(Transaction transaction) {
if (!failedTransactions.contains(transaction.transactionId())) {
failedTransactions.add(transaction.transactionId());
throw new RuntimeException("Simulated failure for transaction " + transaction.type() + ":" + transaction.amount());
}
processTransaction(transaction);
}
이 메서드는 거래가 처음 처리될 때 오류를 발송하지만 이후 재시도 시 정상적으로 처리합니다.
이제 메시지를 처리할 리스너를 추가하겠습니다. 테스트 속도를 높이기 위해 messageVisibilitySeconds를 1로 설정하여 가시성 창을 줄입니다:
@SqsListener(value = "${events.queues.fifo.failure-queue}", messageVisibilitySeconds = "1")
public void retryFailedTransaction(TransactionEvent transactionEvent) {
transactionService.processTransactionWithFailure(transactionEvent.toEntity());
}
마지막으로, 기대한 대로 동작하는지 검증하는 테스트를 생성하겠습니다:
@Test
void givenTransactionProcessingFailure_whenSend_shouldRetryInOrder() {
var accountId = UUID.randomUUID();
var transactions = List.of(createDeposit(accountId, 100.0),
createWithdraw(accountId, 50.0),
createDeposit(accountId, 25.0));
var messages = createTransactionMessages(accountId, transactions);
sqsTemplate.sendMany(failureQueue, messages);
await().atMost(Duration.ofSeconds(10))
.until(() -> transactionService.getProcessedTransactionsByAccount(accountId),
isEqual(eventsToEntities(transactions)));
}
이 테스트에서는 세 개의 이벤트를 전송하고 기대한 순서대로 처리되는지 확인합니다.
테스트를 실행하면 예외 스택 트레이스에서 다음과 유사한 로그를 확인할 수 있습니다:
Caused by: java.lang.RuntimeException: Simulated failure for transaction DEPOSIT:100.0
이후 출력은 다음과 같습니다:
TransactionService : Processing transaction: DEPOSIT:100.0 for account 3f684ccb-80e8-4e40-9136-c3b59bdd980b
이는 이벤트가 두 번째 시도에서 성공적으로 처리되었음을 나타냅니다.
다음 이벤트에 대해서도 2쌍의 유사한 출력을 확인해야 합니다:
Caused by: java.lang.RuntimeException: Simulated failure for transaction WITHDRAW:50.0
TransactionService : Processing transaction: WITHDRAW:50.0 for account 3f684ccb-80e8-4e40-9136-c3b59bdd980b
Caused by: java.lang.RuntimeException: Simulated failure for transaction DEPOSIT:25.0
TransactionService : Processing transaction: DEPOSIT:25.0 for account 3f684ccb-80e8-4e40-9136-c3b59bdd980b
이는 오류가 발생하더라도 이벤트가 올바른 순서로 처리되었음을 나타냅니다.
9. 결론
이번 기사에서는 Spring Cloud AWS v3의 FIFO 큐 지원에 대해 살펴보았습니다. 우리는 이벤트가 순서대로 처리되어야 하는 거래 처리 서비스를 만들고 세 가지 다양한 시나리오에서 메시지 순서가 유지되도록 검증했습니다: 단일 메시지 그룹 처리, 여러 메시지 그룹 병렬 처리, 및 실패 후 메시지 재처리.
우리는 각 시나리오를 테스트하기 위해 로컬 테스트 환경을 구성하고 우리의 논리가 올바른지 확인하기 위해 라이브 테스트를 생성하였습니다.
항상 그렇듯이 이 기사의 전체 코드는 GitHub에서 확인할 수 있습니다.