해당 게시글은 내용 정리 목적으로 작성되었습니다. 틀린 내용이 있다면 언제든지 말씀해 주세요

목차

1. Callable, Future, ExecutorService, Executors  설명
2. 비동기 기능 테스트 예제 
3. 결론


API를 개발하다 보면 동시성 테스트를 해야하는 경우가 있다.
예를 들어서 포인트 차감, 선착순 이벤트 응모, 재고 관리가 대표적인 예다.
자바 서블릿은 멀티 스레드 환경을 제공하기 때문에 동시에 여러 요청을 받을 수 있다.
이 경우 적절한 조치를 취해서 동시에 요청한 경우라도 데이터 일관성, 무결성을  유지해야한다.
이러한 비동기 요청에 대해서 테스트할 수 있는 방법을 알아보자

1. Callable, Future, ExecutorService, Executors

  • 위 기능은 java.util.concurrent 에서 제공하며 JDK 1.5부터 지원되고 Java의 동시성 프로그래밍을 보다 효율적으로 관리하기 위해 도입된 클래스와 인터페이스이다.
  • 각 인터페이스 및 클래스에 대해서 간단하게 숙지하고 예제를 보면 쉽게 이해가 가능하다

1) Callable

  • Callable은 자바의 인터페이스로, 비동기 작업을 수행하고 결과를 반환하는 메서드를 정의한다.
  • Runnable 인터페이스와 유사하지만, Runnable은 반환값이 없고 예외를 던질 수 없는 반면, Callable은 반환값이 있고 예외를 던질 수 있다.
@FunctionalInterface
public interface Callable<V> {

    //비동기 작업을 수행하고 결과를 반환한다
    V call() throws Exception;
}

2) Future

  • Future는 비동기 작업의 결과를 나타내는 인터페이스이다.
  • 비동기 작업이 완료될 때까지 기다리고 작업의 결과를 확인할 수 있다.
public interface Future<V> {
	
    // 작업을 취소한다.
    boolean cancel(boolean mayInterruptIfRunning);

    // 작업이 취소되었는지 확인한다
    boolean isCancelled();
    
    //작업이 완료되었는지 확인한다.
    boolean isDone();

    //작업이 완료될 때까지 기다렸다가 결과를 반환한다
    V get() throws InterruptedException, ExecutionException;
    
    //지정된 시간 동안 기다렸다가 결과를 반환한다. 시간이 초과되면 TimeoutException을 던진다.
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

3) ExecutorService

  • ExecutorService는 Executor의 하위 인터페이스로, 더 많은 기능을 제공하여 스레드 풀을 관리하고, 비동기 작업을 제출하고, 종료하는 등의 작업을 할 수 있다.
  • Runnable, Callable 인터페이스를 작업으로 등록이 가능하다.
  • 여러 메소드가 있지만 크게 invokeAll, shutdown 메소드를 사용한다
   // tasks를 받아 동시에 실행하고 그 결과를 List<Future<T>>에 담아서 반환한다
   <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    // 작업 종료
    void shutdown();
  • 미리 정의해둔 task를 동시에 실행하고 모두 종료될 때 까지 대기한다.
  • 설정해둔 스레드 만큼 작업을 동시에 수행한다.

3) Executors

  • Executors는 다양한 유형의 ExecutorService를 생성할 수 있는 유틸리티 클래스이다
  • 여러 종류의 스레드 풀을 쉽게 만들 수 있는 정적 팩토리 메서드를 제공한다.
  • newFixedThreadPool(int nThreads): 고정된 크기의 스레드 풀을 생성한다
  • newCachedThreadPool(): 필요에 따라 새로운 스레드를 생성하고, 이전 스레드를 재사용하는 스레드 풀을 생성한다
  • newSingleThreadExecutor(): 단일 스레드를 사용하여 작업을 순차적으로 실행하는 ExecutorService를 생성한다
  • newScheduledThreadPool(int corePoolSize): 지정된 수의 스레드를 가진 스케줄링 가능한 스레드 풀을 생성한다.

newScheduledThreadPool 사용 예제

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPoolExample {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);

        Runnable task = () -> System.out.println("Executing task at " + System.currentTimeMillis() + " by " + Thread.currentThread().getName());

        // 5초 후에 작업 실행
        scheduledThreadPool.schedule(task, 5, TimeUnit.SECONDS);

        // 초기 지연 후 10초마다 작업 실행
        scheduledThreadPool.scheduleAtFixedRate(task, 5, 10, TimeUnit.SECONDS);

        // 초기 지연 후 10초 간격으로 작업 실행 (이전 작업 종료 후 10초 지연)
        scheduledThreadPool.scheduleWithFixedDelay(task, 5, 10, TimeUnit.SECONDS);
    }
}

 

 

2. 비동기 기능 테스트 예제 

  • 회원 포인트 차감에 대한 유스케이스를 만들어서 동시에 포인트가 차감될 경우 포인트가 일관성 있게 차감되는지 테스트한다
    • 특정 회원이 110 포인트를 갖고 있고 1포인트 씩 100번을 동시에 차감했을 경우 10포인트가 남아있는지 테스트한다

테스트 환경

  • 스프링 부트 3.3.2
  • JPA
  • H2
  • JDK 17
  • Junit 5, assertj
  • lombok

build.gradle dependencies

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    compileOnly 'org.projectlombok:lombok'
    runtimeOnly 'com.h2database:h2'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}

application.yml

spring:
  profiles:
    active: test
  datasource:
    url: jdbc:h2:mem:testdb;MODE=MariaDB;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
    username: sa
    password:
    driverClassName: org.h2.Driver
  jpa:
    hibernate:
      ddl-auto: update

UserPoint

@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor
@Getter
@Entity
public class UserPoint {

    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Id
    private Long id;
    private Long points;

    @Column(name = "user_id")
    private Long userId;

    public void use(Long usePoints) {
        if (this.points < usePoints) {
            throw new IllegalArgumentException("포인트가 부족합니다");
        }
        this.points -= usePoints;
    }
}

UserRepository

public interface UserPointRepository extends JpaRepository<UserPoint, Long> {

    @Lock(LockModeType.PESSIMISTIC_WRITE)
    Optional<UserPoint> findWithExclusiveLockByUserId(Long userId);

    Optional<UserPoint> findByUserId(Long userId);

}

UserPointDeductUseCase

@Slf4j
@RequiredArgsConstructor
@Service
public class UserPointDeductUseCase {
    private final UserPointRepository userPointRepository;

    @Transactional
    public Long usePoints(Long userId, Long usePoints) {

//      비관적 락을 걸지 않았을 때 포인트 차감을 확인 하기 위해 잠시 주석 
//        UserPoint userPoint = userPointRepository.findWithExclusiveLockByUserId(userId)
//            .orElseThrow(() -> new IllegalArgumentException("존재하지 않는 고객입니다."));

        UserPoint userPoint = userPointRepository.findByUserId(userId)
            .orElseThrow(() -> new IllegalArgumentException("존재하지 않는 고객입니다."));

        userPoint.use(usePoints);
        Long remainingPoints = userPoint.getPoints();

        log.info("userId : {} usePoints  : {}, remainingPoints : {}", userId, usePoints, remainingPoints);

        return remainingPoints;
    }
}

UserPoinstDeductUseCaseTest

@ActiveProfiles("test")
@SpringBootTest
class UserPointDeductUseCaseTest {

    @Autowired
    public UserPointDeductUseCase userPointDeductUseCase;

    @Autowired
    public UserPointRepository userPointRepository;

    @BeforeEach
    public void setup() {
        userPointRepository.deleteAll();
    }

    @Test
    public void 동시에_포인트_차감_테스트() throws InterruptedException {
        // given
        long userId = 1;
        long remainingPoints = 110L;
        int taskCount = 100;
        long usePoints = 1L;
        int threads = 10;

        // 스레드 실행기 생성
        ExecutorService executorService = Executors.newScheduledThreadPool(threads);

        // 미리 회원 포인트 엔티티를 생성한다.
        userPointRepository.save(new UserPoint(null, remainingPoints, userId));

        List<Callable<Long>> tasks = new ArrayList<>();

        //태스크를 미리 정의한다.
        for (int index = 0; index < taskCount; index++) {
            tasks.add((() -> userPointDeductUseCase.usePoints(userId, usePoints)));
        }

        // when
        //회원 포인트 차감 태스크 호출
        List<Future<Long>> futures = executorService.invokeAll(tasks);


        //then
        // 결과 확인
        for (Future<Long> future : futures) {
            //예외가 발생하지 않았는지 확인
            // 포인트가 부족하면 예외가 발생한다.
            assertThatCode(future::get)
                .doesNotThrowAnyException();
        }

        // 스레드 실행기 종료
        executorService.shutdown();

        UserPoint resultsUserPoint = userPointRepository.findByUserId(userId).get();
        Assertions.assertThat(resultsUserPoint.getPoints()).isEqualTo(remainingPoints - (taskCount * usePoints));
    }
}
  • 두가지를 테스트해볼 건데 우선 UserPointDeductUseCase에 비관적락을 걸지 않은 상태에서 테스트를 실행해보자
...
2024-07-31T16:12:16.676+09:00  INFO 10000 --- [pool-2-thread-8] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 105
2024-07-31T16:12:16.676+09:00  INFO 10000 --- [pool-2-thread-4] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 102
2024-07-31T16:12:16.677+09:00  INFO 10000 --- [pool-2-thread-2] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 104
2024-07-31T16:12:16.677+09:00  INFO 10000 --- [pool-2-thread-6] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 104
2024-07-31T16:12:16.678+09:00  INFO 10000 --- [pool-2-thread-1] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 104
2024-07-31T16:12:16.678+09:00  INFO 10000 --- [pool-2-thread-8] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 104
2024-07-31T16:12:16.678+09:00  INFO 10000 --- [pool-2-thread-7] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 104
2024-07-31T16:12:16.679+09:00  INFO 10000 --- [pool-2-thread-3] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 103
2024-07-31T16:12:16.679+09:00  INFO 10000 --- [pool-2-thread-1] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 101
2024-07-31T16:12:16.679+09:00  INFO 10000 --- [pool-2-thread-5] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 101
2024-07-31T16:12:16.679+09:00  INFO 10000 --- [pool-2-thread-8] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 101
2024-07-31T16:12:16.680+09:00  INFO 10000 --- [pool-2-thread-4] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 103
2024-07-31T16:12:16.680+09:00  INFO 10000 --- [pool-2-thread-6] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 100
2024-07-31T16:12:16.681+09:00  INFO 10000 --- [pool-2-thread-8] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 104
2024-07-31T16:12:16.681+09:00  INFO 10000 --- [pool-2-thread-7] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 104
2024-07-31T16:12:16.681+09:00  INFO 10000 --- [pool-2-thread-9] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 102
2024-07-31T16:12:16.681+09:00  INFO 10000 --- [pool-2-thread-6] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 102
2024-07-31T16:12:16.682+09:00  INFO 10000 --- [pool-2-thread-2] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 103
2024-07-31T16:12:16.682+09:00  INFO 10000 --- [ool-2-thread-10] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 103
2024-07-31T16:12:16.683+09:00  INFO 10000 --- [pool-2-thread-3] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 101
2024-07-31T16:12:16.683+09:00  INFO 10000 --- [pool-2-thread-6] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 100
2024-07-31T16:12:16.683+09:00  INFO 10000 --- [pool-2-thread-7] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 100

Expected :10L
Actual   :101L
  • 10개의 스레드를 생성하고 총 100번 1포인트씩 차감한다.
  • [pool-2-thread-7] 부분을 확인해보면 1~10 까지 스레드 풀에 있는 스레드를 돌아가면서 사용하는 것을 확인할 수 있다.
  • 100포인트가 차감되어 10포인트가 남은 것을 예상했지만 실제로는 101 포인트가 남아서 테스트가 실패했다.
  • 테스트가 실패한 이유는 비동기로 UserPointDeductUseCase.usePoints()를 호출해서 동시성 이슈가 발생했기 때문이다.
  • 하나의 자원을 동시에 수정할 때 순차적으로 값을 수정하지 않기 때문에 마지막으로 업데이트한 값으로 갱신되는데 이 때문에 원하는 결과가 나오지 않은 것이다.
  • 이를 해결하기 위해서 각 트랜잭션이 특정 자원을 수정할 때 락을 걸어야 한다. 
  • 락을 거는 동안 자원을 선점하고 트랜잭션이 종료되면 다음 트랜잭션이 해당 자원을 선점하는 방식이다.
  • 이번 테스트에서는 비관적 락을 걸어 동시성 이슈를 해결할 것이다.
  • UserPointDeductUseCase에 주석쳤던 부분을 해제해서 비관적락이 걸린 상태로 다시 테스트를 실행해보자.
...
2024-07-31T16:19:55.784+09:00  INFO 59196 --- [pool-2-thread-2] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 17
2024-07-31T16:19:55.785+09:00  INFO 59196 --- [pool-2-thread-6] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 16
2024-07-31T16:19:55.785+09:00  INFO 59196 --- [pool-2-thread-1] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 15
2024-07-31T16:19:55.786+09:00  INFO 59196 --- [pool-2-thread-7] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 14
2024-07-31T16:19:55.787+09:00  INFO 59196 --- [pool-2-thread-8] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 13
2024-07-31T16:19:55.788+09:00  INFO 59196 --- [pool-2-thread-9] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 12
2024-07-31T16:19:55.789+09:00  INFO 59196 --- [pool-2-thread-5] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 11
2024-07-31T16:19:55.790+09:00  INFO 59196 --- [pool-2-thread-3] o.e.s.service.UserPointDeductUseCase     : userId : 1 usePoints  : 1, remainingPoints : 10
  • 포인트가 순차적으로 1포인트씩 총 100포인트가 차감된 것을 확인할 수 있다.

 

3. 결론


ExecutorService는 동시성 이슈를 테스트하기에 매우 유용하다.
해당 라이브러리가 존재하지 않았다면 직접 API를 호출하는 방식으로 테스트를 해야했을 것이다.
이런 수고를 덜려면 ExecutorService 사용법을 잘 숙지해보자

+ Recent posts