SQS에 대해서 공부한 내용을 블로그에 정리하는 목적으로 해당 게시글을 작성하게 되었다.
SQS를 적절하게 사용하면 어려운 문제에 대해서 아주 효과적으로 처리할 수 있는데 예를 들어서 선착순 티켓팅 시스템이나 알림 발송 시스템 등 순간적으로 많은 트래픽이 발생하는 작업에 대해서 유연하게 대처할 수 있다.

이번 게시글에서 SQS에 대한 간단한 설명과 설정 그리고 Spring boot와 연동하는 방법에 대해서 설명하겠다.

목차

1. SQS 란 (Amazon Simple Queue Service) ?
2. SQS를 사용하는 이유
3. SQS 유형
4. SQS 생성 및 설정 방법
5. SQS와 Spring Boot 연동 

6. 결론


1. SQS 란 (Amazon Simple Queue Service) ?

AWS SQS(Amazon Simple Queue Service)는 메시지를 송수신할 수 있는 완전 관리형 메시지 큐 서비스이다.
이를 통해 분산 시스템 간의 비동기 통신을 가능하게 하며, 메시지의 유실 없이 안정적인 데이터 전달을 보장한다.

2. SQS를 사용하는 이유

1) 비동기 처리 및 작업 분리

SQS는 비동기 처리를 가능하게 해준다. 시스템에서 시간이 오래 걸리는 작업이나 비동기적으로 처리해야 하는 작업이 있을 때, SQS를 사용하면 작업을 큐에 넣고 리시버에서 작업 수행하도록 설계할 수 있다. 
이런식으로 작업을 큐로 전송하는 서버와 작업을 처리하는 서버로 분리하면 처리량과 안정성을 높일 수 있다.

2) 시스템 간의 결합도 감소

마이크로서비스 아키텍처에서는 서비스 간의 결합도를 낮추는 것이 매우 중요하다. SQS는 서비스 간의 직접적인 통신을 피하고, 메시지를 큐를 통해 전달함으로써 서비스 간의 결합도를 낮출 수 있다. 이를 통해 각 서비스는 독립적으로 개발, 배포 및 확장할 수 있다.

3) 확장성과 유연성

SQS는 AWS의 관리형 서비스로, 메시지 큐의 크기와 처리량을 자동으로 확장할 수 있다. 대규모 메시지를 처리할 수 있으며 트래픽이 급증해도 시스템이 이를 처리할 수 있도록 자동으로 확장된다.

이는 수동으로 인프라를 관리할 필요 없고, 사용한 만큼만 비용을 소모할 수 있다.

4) 내결함성과 안정성

SQS는 메시지의 안정적이고 지속적인 전달을 보장한다. 메시지가 처리되지 못했을 경우 Dead-Letter Queue를 사용해 해당 메시지를 별도로 관리할 수 있다. 이는 시스템 오류나 장애가 발생하더라도 메시지를 안전하게 처리할 수 있도록 도와준다.

Dead-Letter Queue를 사용하면 별도의 큐를 관리해야 하므로 추가적인 비용이 발생할 수 있다. 하지만 중요한 작업이고 실패할 경우 안정적으로 재시도 작업을 원한다면 SQS는 아주 쉽게 해당 기능을 제공한다.

5) 보안

SQS는 AWS IAM을 사용해 액세스 제어를 제공하며, 전송 중인 데이터와 대기열에 저장된 데이터를 AWS KMS(Key Management Service)를 통해 암호화할 수 있다. 이를 통해 민감한 데이터를 안전하게 처리할 수 있다.

6) 저비용

SQS는 메시지의 수에 따라 비용이 청구되므로, 사용한 만큼만 비용을 지불하게 된다. 이는 예산을 효율적으로 관리할 수 있게 해주며, 특히 트래픽이 변동적인 애플리케이션에 유리하다.

3. SQS 유형

SQS는 크게 표준 대기열, FIFO 대기열로 나뉘어진다. 목적에 따라서 적절히 선택해야한다.

1) 표준 대기열

  • 무제한 처리량: 표준 대기열은 API 작업당 거의 무제한의 초당 트랜잭션(TPS)을 지원한다.
  • 최소한 한 번은 전달: 메시지가 최소한 한 번 전달되고, 가끔 2개 이상의 메시지 복사본이 전달될 수 있다.
  • 메시지 전송 순서 보장은 안됌: 가끔 메시지가 전송된 순서와 다르게 전달될 수 있다.

2) FIFO 대기열

  • 높은 처리량: 기본적으로 FIFO 대기열은 초당 최대 300개의 메시지(초당 300개의 전송, 수신 또는 삭제 작업)를 지원합한다. 작업당 최대 10개 메시지를 일괄 처리할 경우, FIFO 대기열은 초당 3000개의 메시지까지 지원할 수 있다. 
  • 정확히 한 번 처리: 메시지가 한 번 전달되고 소비자가 이를 처리 및 삭제할 때까지 유지된다. 중복 메시지는 대기열에 올라가지 않는다..
  • 선입선출 전달: 메시지가 전송되고 수신되는 순서가 엄격하게 지켜진다.

4. SQS 생성 및 설정 방법

1) SQS IAM 사용자 생성

사용자 세부 정보 지정

권한 설정 및 사용자 생성

SQS 전용 IAM 사용자를 생성했다면 액세스 키를 만들고 ARN을 복사해 두자.

2) Amazon SQS 대기열 생성

대기열 생성

  • 표준 대기열을 선택하고 큐 이름을 입력한다.

대기열 구성

  • 표시 제한 시간 (Visibility Timeout) 
    • 이 설정은 메시지가 중복 처리되는 것을 방지하기 위해 사용된다. 메시지가 소비자에게 전달되면 해당 메시지는 표시 제한 시간 동안 큐에서 "잠금(lock)" 상태가 된다. 이 기간 내에 소비자는 메시지를 처리하고 SQS에 처리가 완료되었음을 알려야 한다. 만약 표시 제한 시간이 지나도 처리가 완료되지 않으면, 해당 메시지의 잠금이 풀리며 다른 소비자가 메시지를 소비할 수 있다.
  • 전송 지연 (Delay Seconds)
    • 전송 지연은 메시지가 큐에 추가되자마자 소비자에게 전달되지 않고, 지정된 시간 동안 대기 상태가 되는 설정이다. 이 설정은 메시지의 즉각적인 처리가 필요하지 않거나, 특정 작업이 지연되어 실행되기를 원할 때 유용하다.
  • 메시지 수신 대기 시간 (Receive Message Wait Time)
    • 메시지 수신 대기 시간은 SQS가 폴링 방식으로 메시지를 검색할 때, 메시지가 큐에 없을 경우 대기하는 시간을 지정하는 설정이다. 이 설정을 통해 짧은 폴링 긴 폴링을 제어할 수 있다.  짧은 폴링은 메시지가 없을 때 즉시 응답하지만, 긴 폴링은 설정된 시간 동안 큐에 메시지가 들어올 때까지 대기한다. 긴 폴링을 사용하면 네트워크 트래픽과 비용을 줄일 수 있다.
    • 대기하는 주체는 폴링 요청을 하는 클라이언트(리시버)이며 메시지 수신 대기시간을 길게 설정하면 폴링 할 때 메시지가 없을 경우 바로 연결을 끊지 않고 설정한 시간동안 대기한다.
  • 메시지 보존 기간 (Message Retention Period)
    • 메시지 보존 기간은 큐에 들어온 메시지가 큐에서 삭제되지 않고 보관되는 최대 시간을 설정한다. 메시지가 이 기간 동안 처리되지 않으면, 자동으로 삭제된다.
    • 메시지 보존 기간을 길게 설정하면 메시지를 오랜 기간 동안 보관할 수 있지만, 이 경우 큐에 메시지가 쌓이면서 스토리지 비용이 증가할 수 있다. 반면, 보존 기간을 짧게 설정하면 메시지가 빠르게 삭제되므로 큐의 크기를 줄일 수 있지만, 메시지 손실 위험이 증가할 수 있다.

암호화

  • 메시지에 대한 암호화 기능을 제공한다.

SQS 큐 전송자와 큐 수신자에 대한 액세스 정책

  • 이전에 생성해둔 SQS IAM에 대한 ARN 링크를 복사해서 큐 전송자와 큐 수신자에 입력한다.
  • 큐 전송자와 수신자를 별도의 사용자로 나누고 싶다면 IAM 사용자를 추가로 생성해주면 된다.
  • IAM 사용자 외에도 역할에 SQS 관련 권한을 넣어주고 해당 역할에 대한 ARN 정보를 입력해도 된다. 

Dead-Letter Queue 정책 설정

  • 수신자에서 작업이 실패될 경우 Dead-Letter Queue로 실패 작업에 대한 메시지를 전송할 수 있다.
  •  해당 설정을 해주려면 위에서 대기열을 생성했던 것과 같이  Dead-Letter Queue 전용으로 생성해주어야 한다.
  • 그 다음 해당 대기열의 ARN을 복사해서 대기열 ARN에 입력해준다.

모든 설정이 완료되었으면 대기열을 생성해주자.

5. SQS와 Spring Boot 연동 

Spring Boot 버전에 따라서 연동하는 방법이 다를 수 있다. 이번 게시글에서는 jdk 17과 spring 3.x 으로 연동 테스트를 진행하겠다. Spring project를 생성하고 아래 코드를 참고해서 테스트해보자

build.gradle

plugins {
    id 'java'
    id 'org.springframework.boot' version '3.3.2'
    id 'io.spring.dependency-management' version '1.1.6'
}

...

dependencies {
	...
    implementation platform("io.awspring.cloud:spring-cloud-aws-dependencies:3.0.1")
    implementation 'io.awspring.cloud:spring-cloud-aws-starter-sqs'
    ...
}

application.yml

cloud:
  aws:
    credentials:
      access-key: access-key
      secret-key: secret-key
    region:
      static: ap-northeast-2
    sqs:
      queue:
        name: my-test-queue
  • SQS IAM의 access-key와 secret-key를 입력하고 queue의 region과 name을 입력하자.

AwsSqsConfiguration

  • AWS SQS 관련 설정 클래스이다. 편의상 메시지 전송 설정과 메시지 수신 설정을 하나의 클래스에 정의하고 동일한 비동기 클라이언트를 사용하도록 설정한다 
import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory;
import io.awspring.cloud.sqs.operations.SqsTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

import java.time.Duration;

@Slf4j
@Configuration
public class AwsSqsConfiguration {

    private final String accessKey;

    private final String secretKey;

    private final String region;

    public AwsSqsConfiguration(
        @Value("${cloud.aws.credentials.access-key}") String accessKey,
        @Value("${cloud.aws.credentials.secret-key}") String secretKey,
        @Value("${cloud.aws.region.static}") String region
    ) {
        this.accessKey = accessKey;
        this.secretKey = secretKey;
        this.region = region;
    }

    /**
     * SQS 비동기 클라이언트
     *
     * @return
     */
    @Bean
    public SqsAsyncClient sqsAsyncClient() {
        return SqsAsyncClient.builder()
            .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)))
            .region(Region.of(region))
            .build();
    }
    
    /**
     * SQS 메시지 생성 템플릿
     *
     * @return
     */
    @Bean
    public SqsTemplate sqsTemplate() {
        return SqsTemplate.newTemplate(sqsAsyncClient());
    }    

    /**
     * SQS Listener 설정
     */
    @Bean
    public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory() {
        return SqsMessageListenerContainerFactory
            .builder()
            .configure(sqsContainerOptionsBuilder ->
                sqsContainerOptionsBuilder
                    .maxConcurrentMessages(10) // 컨테이너의 스레드 풀 크기
                    .maxMessagesPerPoll(10) // 한 번의 폴링 요청으로 수신할 수 있는 최대 메시지 수를 지정
                    .acknowledgementInterval(Duration.ofSeconds(5)) // AWS SQS 응답 간격
                    .acknowledgementThreshold(10) // AWS SQS 응답 최소 개수
            )
            .sqsAsyncClient(sqsAsyncClient())
            .build();
    }
}
  • 크게 SqsAsyncClient, SqsTemplete, SqsMessageListenerContainerFactory으로 구성된다.
  • SqsAsyncClient : AWS SQS 비동기 통신을 제공하며, access-key와 secret-key, region을 설정한 후 생성한다.
  • SqsTemplete : SQS로 메시지 생성 및 전송 템플릿 클래스이다.
  • SqsMessageListenerContainerFactory : SQS 메시지 수신 컨테이너 팩토리 클래스이다. 비동기로 수신하며 설정에 따라서 효율적으로 작업을 수행할 수 있다.
    • maxConcurrentMessages : 컨테이너의 스레드 풀을 설정한다. 기본값은 10이며 해당 설정에 따라서 병렬로 처리할 수 있는 스레드 수를 설정할 수 있다.
    • maxMessagesPerPoll : 한 번의 폴링(polling)으로 SQS 큐에서 가져올 수 있는 최대 메시지 수를 지정한다. 일종의 버퍼 역할을 한다고 보면된다.
    • acknowledgementInterval : 메시지를 처리한 후 즉시 SQS로 응답을 보내지 않고, 설정한 시간 만큼 기다렸다가 시간 내에 처리된 모든 메시지에 대해 한 번에 SQS로 응답을 보낸다.
    • acknowledgementThreshold : 설정한 개수의 메시지가 처리될 때마다 SQS로 응답을 보낸다. 설정한 개수 이하의 메시지가 처리된 경우에는 확인을 보내지 않고, 개수 충족 시 한 번에 SQS로 응답을 보낸다.
    • 일반적으로 acknowledgementInterval과 acknowledgementThreshold는 함께 사용되어, 시간과 메시지 수 기준 중 어느 한 가지 조건이 충족될 때 SQS로 응답을 보낼 수 있다.
  • 자세한 설정 정보를 확인하려면 아래 링크를 참조하면 된다.
https://docs.awspring.io/spring-cloud-aws/docs/3.0.0/reference/html/index.html#sqscontaineroptions-descriptions

NotificationSender

  • 알림 전송 관련 인터페이스 및 구현체를 정의한다. 
// 알림 메시지
@AllArgsConstructor(access = AccessLevel.PRIVATE)
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Getter
public class Notification {
    private String message;
    private LocalDateTime createAt;

    public static Notification create(String message) {
        return new Notification(
            message,
            LocalDateTime.now()
        );
    }
}

// 알림 전송 결과
public record NotificationSendResult(
    String messageId,
    boolean success
) {

    public static NotificationSendResult success(String messageId) {
        return new NotificationSendResult(messageId, true);
    }

    public static NotificationSendResult failure() {
        return new NotificationSendResult(null, false);
    }
}

// 알림 전송 인터페이스
public interface NotificationSender {
    NotificationSendResult sendNotification(Notification notification);
}


// AWS SQS 알림 전송 구현체
@Slf4j
@RequiredArgsConstructor
@Component
public class AwsSqsNotificationSender implements NotificationSender {

    @Value("${cloud.aws.sqs.queue.name}")
    private String queueName;

    private final ObjectMapper objectMapper;

    private final SqsTemplate template;

    @Override
    public NotificationSendResult sendNotification(Notification notification) {
        try {
            String message = objectMapper.writeValueAsString(notification);

            SendResult<String> result = template.send(to -> to
                .queue(queueName)
                .payload(message));

            return NotificationSendResult.success(result.messageId().toString());
        } catch (Exception e) {
            log.error("send notification error : ", e);
            return NotificationSendResult.failure();
        }
    }
}

@RequiredArgsConstructor
@RequestMapping("/api/v1/send-notification")
@RestController
public class NotificationSendController {

    private final NotificationSender notificationSender;

    @PostMapping
    public ResponseEntity<NotificationSendResult> send(@RequestBody String message) {
        return ResponseEntity.ok(notificationSender.sendNotification(Notification.create(message)));
    }
}
  • AWS SQS 요청 및 응답 DTO를 생성하고 AWS SQS 구현체를 작성한 알림 전송 컨트롤러로 메시지를 전송해보자.

알림 메시지 전송 테스트

  • 알림 메시지 전송 엔트포인트로 요청을 보내고 SQS에 메시지가 수신되었는지 확인해보자 

  • AWS SQS 메시지 전송 및 수신 페이지로 이동한다

  • 메시지 수신을 확인해보면 사용가능한 메시지 2개를 확인할 수 있고, 메시지 폴링 버튼을 누르면 확인가능한 메시지 두건을 확인할 수 있다. 메시지의 ID를 보면 postman의 응답에 있는 messageId 와 동일한 것을 알 수 있다.
  • 폴링해서 메시지를 확인한다고 해도 메시지가 소모된 것은 아니며 SQS를 통해 메시지를 사용했고 삭제해달라는 요청을 보내야 비로서 메시지를 큐에서 제거한다

SQS 메시지 수신 Listener

@Slf4j
@Component
@RequiredArgsConstructor
public class AwsSqsListener {

    @SqsListener(value = "${cloud.aws.sqs.queue.name}")
    public void listen(String message) {
        log.info("notification : {}", message);
    }
}
  • @SqsListener(value = "${cloud.aws.sqs.queue.name}")을 어노테이션을 선언한 Bean에 대해서 SQS 메시지 수신을 수행한다.
  • SqsMessageListenerContainerFactory 설정에 따라서 동시에 처리할 수 있는 리시버(워커 또는 노드 라고도 한다)개수를 지정할 수 있다.
  • 서버 인스턴스 사양에 따라서 적절하게 설정 해주면 효율적인 SQS 메시지 처리가 가능하다
  • @SqsListener 어노테이션이 작동하지 않는 경우가 있는데 spring boot 3.0으로 변경되면서 연동하는 방식이 달라졌다고 한다. 라이브러리 버전을 잘 체크해보자
2024-08-14T17:22:59.582+09:00  INFO 38884 --- [ntContainer#0-1] i.s.s.i.sqs.receiver.AwsSqsListener      : notification : {"message":"{\r\n    \"message\" : \"반갑습니다\"\r\n}","createAt":"2024-08-14T17:09:33.4354856"}
2024-08-14T17:22:59.641+09:00  INFO 38884 --- [ntContainer#0-2] i.s.s.i.sqs.receiver.AwsSqsListener      : notification : {"message":"{\r\n    \"message\" : \"안녕하세요\"\r\n}","createAt":"2024-08-14T17:12:50.1727803"}
  • 위와 같이 메시지가 잘 수신된 것을 확인할 수 있다.
  • 별도의 설정을 하지 않으면 작업 종료 시 SQS에 응답하면서 메시지는 삭제된다.

6. 결론

 AWS SQS가 아니어도 Apache Kafka, Redis message broker, rebbitMQ 등을 활용해서 메시징 시스템을 구축할 수 있다. 전략적으로 현재 상황에 맞는 메시징 시스템을 선택하면 된다.

메시지 전송자와 메시지 수신자 (워커, 노드)를 별도의 서버로 구현하는 것이 좋으며 이러한 메시징 시스템을 적절하게 활용하는 것이 매우 중요하다

+ Recent posts