본문 바로가기

java

동시성 이슈 해결하기(feat. spring(java), MySQL, Redis(Redisson))

Single Thread
Multi-Threading
Tomcat - Spring framework
분산 시스템(Distributed Systems)
단일 DB에서의 처리
NoSQL에서의 처리 - Redis(Redisson library)

 

이번 글에서는 좀 더 실용적으로 접근해볼 예정입니다. 그래서 실제 서비스 운영 환경에서 동시성 이슈를 어떻게 해결해야 할지에 대해 직접 코드를 작성하며 살펴보고자 합니다.

앞으로 설명할 내용들은 이전 글들(동시성 처리 - intro동시성 처리 - Lock Algorithms(락 알고리즘))의 내용에 기반하고 있기 때문에 먼저 읽고 오시면 아래 내용을 더 잘 이해하실 수 있을겁니다!

concurrency, blocking, non-blocking, sync, async, lock striping, lock free, CAS 등의 키워드에 대해서 잘 이해하고 있다면 넘어가셔도 좋습니다.

 

Single Thread

자바 class101에서 아주 기초적인 프로그래밍을 하고 있다고 생각해봅시다. program의 시작점인 main 메서드에서 1개의 단일 thread로 특정 task를 수행할 수 있겠죠. 간단한 for문으로 counting 하고자 합니다.

int count = 0;
for (int i = 0; i < 100; i++) {
    count++;
}

결과를 보면 100이 나오겠죠? main thread 1개가 counting task를 수행하니 동시성 이슈도 생길 일이 없습니다. 성능도 뭐 메모리에서 100정도 더하는건 ms 단위로 측정이 안될 정도로 빠를겁니다.

자 이제 counting task의 cost를 늘려봅시다. counting 하는 특정 task가 1초나 걸린다고 가정해볼게요. 위의 작업을 곧이 곧대로 실행한다면, 100초가 걸릴겁니다. 실제 운영 서비스에서는 대부분 I/O 작업이 필요하므로(db에 특정 data를 persist 하는 일 정도는 웬만하면 하니까요), 1초까지는 아니더라도 latency를 만들어내는 일입니다. 그래도 1초나 걸린다고 가정해볼게요.

Multi-Threading

100초나 걸리는 일을 100개의 thread에 나눠주어 한 번에 더하면 아마 1초만에 모든 task를 수행할 수 있을겁니다.

@Slf4j
public class MyClass {
​
    static int count = 0;
​
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(100);
​
        log.info("start at: {}", LocalDateTime.now());
        IntStream.range(0, 100)
                .forEach((i) -> {
                    executorService.submit(() -> {
                        run();
                    });
                });
​
        executorService.shutdown();
        if (executorService.awaitTermination(60, TimeUnit.SECONDS)) {
            executorService.shutdownNow();
        }
        log.info("end at: {}", LocalDateTime.now());
        log.info("result: {}", count);
​
    }
​
    static void run() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        count++;
    }
}
//start at: 2024-06-24T20:05:55.053682
//end at  : 2024-06-24T20:05:56.073028
//result  : 96

결과를 봅시다. 시간은 대략 1초가 걸렸네요. 그런데 결과는 100을 예상했겠지만, 96이라는 수치가 나왔습니다. 이전 글에서도 살펴보았지만 낮은 latency와 높은 throughput을 달성하기 위해 사용한 multi threading(multi programming)이 동시성 이슈를 발생시켰죠. 데이터 무결성이 깨진 것입니다.

Tomcat - Spring framework

tomcat과 spring f/w 조합은 sync - blocking 모델을 기본으로 합니다. 물론 async - non blocking 방식으로 처리할 수도 있지만, 상당히 복잡한 처리를 많이 해주어야겠죠. 서블릿도 비동기적으로 동작하게 해야 하고, 내부적인 비즈니스 로직도 blocking I/O로 처리되지 않도록 세심하게 프로그래밍 해야 할 것입니다.(netty - webflux 조합을 사용한다면 조금 더 수월해지겠네요)

아무튼 sync - blocking 조합이라는 가정 하에, tomcat은 별도 설정을 하지 않는다면 200개의 default max threads까지 생성할 수 있고, 기본 10개의 min spare threads를 유지하며 thread pool에서 관리를 합니다.

위에서 설명한 multi threading의 작동 방식과 유사하다고 할 수 있습니다. 결국 우리는 위에서 본 count와 같은 '공유 자원'의 무결성이 깨지지 않도록, 즉 동시성 이슈가 발생하지 않도록 critical section을 설정해 task가 원자적으로 동작하도록 보장할 필요가 있습니다. 또는 CAS 연산 등의 아이디어를 활용해 처리할 수도 있겠죠.

이전 글에서 살펴본 것처럼 java에서 기본적으로 제공하는 java.util.concurrent.atomic or locks 패키지에 있는 여러 클래스들을 활용한다면, 복잡한 구현 없이 좀 더 쉽게 우리의 목표를 달성할 수 있을거에요.

이것으로 충분한 것일까요?

분산 시스템(Distributed Systems)

요즘 서비스 운영 환경은 단일 프로세스로 동작하는 경우가 많지는 않을거에요. product 관점에서 굳이 분산 환경을 구축하지 않아도 되는 경우가 사실은 훨씬 많을 것 같기는 하지만.. 최소한 failover에 대해 고려한 서비스라면 2개 서버는 구축해서 사용하고 있을 것입니다.

하나의 프로세스에서도 여러 thread가 경합하며 공유자원에 대한 무결성이 깨지는데, 분산 시스템 환경에서는 어떨까요? 독립적인 메모리 영역을 사용한다면, 심지어 물리적으로 서버가 다른 곳에 위치해 있다면, 데이터 무결성을 보장하기에 더 큰 어려움이 있을 것으로 보이네요.

무언가 단일 지점에서 공유자원에 대한 접근을 통제할 필요가 있어보입니다. 제품에 따라 분산 환경에서도 이를 보장하도록 지원하기도 하지만, 우선은 '단일 지점'에 포커스를 맞춰 살펴보도록 할게요.

단일 DB에서의 처리

spring 기반으로 구축한 application을 scale-out 하기 위해 k8s를 활용해 여러 대의 서버를 띄운다고 가정해봅시다. 어떤 상품의 재고를 관리하기 위해 RDB 중에 MySQL 제품을 사용해 data를 저장하고 읽는다고 생각해볼게요. 재고 테이블을 만들어서 상품과 재고수량을 저장할 수 있도록 만들겠죠.

MySQL은 기본적으로 index와 record 기반 잠금을 기본으로 동작하기 때문에 특정 index 또는 row에 접근하여 locking 한다면 잠금의 범위를 줄일 수 있을거에요(lock striping의 아이디어네요). update 시에는 exclusive lock이 걸리기 때문에 update 쿼리가 수행되는 순간(바로 그 순간)에는 읽지도 쓰지도 못할거에요.

하지만 실제로는 일관성(consistency)이라는 것이 그렇게 아름답게 동작하지 않기 때문에 application level에서 적절한 처리를 해줘야 하죠. pessimistic locking을 할 수도 있고(데이터를 읽을 때 select ... for update, select ... lock in share 등의 처리를 해서 다른 트랜잭션에서 접근하지 못하게 해야죠), optimistic locking을 활용해 실제 쓰기가 발생하는 순간에 충돌 여부를 검사하게 할 수도 있을거에요. 데이터 충돌 가능성이 높은지 낮은지에 따라 사용해야 할 상황이 다르고, 재시도를 어떻게 해야할지에 대해서도 고려해야 하니 도메인의 특성에 따라 적절하게 선택해야 할거에요.

우리가 사용하는 JPA에서는 이런 코드가 되겠죠.

@Lock(LockModeType.PESSIMISTIC_WRITE)
//@Lock(LockModeType.PESSIMISTIC_READ)
Optional<product> findById(Long id);
​
@Entity
public class Product {
  //...
  
  @Version
  private Integer version;
}

단일 지점에 포커싱하여 살펴보기로 했지만, 분산 DB 환경에서는 이러한 잠금 처리도 데이터 일관성을 보장하지 않을 수도 있어요. 그래서 제품의 도움을 받아 해결하고는 하죠. RDB에서 MySQL은 Galera Cluster, Group Replication을, Oracle은 RAC, OGG 등을 통해 이러한 문제를 해결하죠. 그래도 근본적으로는 하나의 단일지점에서 '데이터 일관성을 보장하는 책임'은 제품에 있지 않고, application level에서 복합적으로 풀어야 하는 문제에요. 고가용성과 확장성에 대한 문제에 조금 더 가깝다고 볼 수 있겠네요.

어쨌든 다중 노드들이 동기화 문제를 해결해준다면! 적절한 locking을 통해 위에서 지속적으로 이야기해 온 동시성 문제를 해결할 수 있을거에요.

NoSQL에서의 처리 - Redis(Reddison library)

RDB는 관계형 데이터의 쓰기와 읽기에 집중한다고 하고, 데이터 일관성 해결을 위한 locking 문제는 다른 지점으로 옮겨볼게요. in-memory 기반의 Redis의 Redisson library를 활용하려 합니다. Redis는 싱글 스레드로 동작해 단일 노드 내에서의 데이터 접근 시 동시성 문제를 해결할 수 있지만, 역시나 다중 cluster 환경이라면 별도의 동시성 제어가 필요해요.

이 때 Redisson은 분산 잠금(distributed lock) 기능을 제공해 간단하게 동시성을 제어할 수 있습니다. redisson에서 제공하는 템플릿을 가공하여 사용한다면 annotation 하나로 locking이 필요한 critical session을 제어할 수도 있죠.

우리가 위에서 counting하는 작업인 run() 메서드가 만약 service layer의 로직이었다면, 다음과 같이 어노테이션을 하나만 추가하면 locking을 할 수 있게 될거에요.

@RedisLock
void run() {
    try {
        Thread.sleep(1000L);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    count++;
}

전체 코드를 살펴보겠습니다.

//build.gradle
dependencies {
  //...
​
  //redisson
  implementation 'org.redisson:redisson-spring-boot-starter:3.17.7'
}
@RestController
@RequiredArgsConstructor
public class MyController {
​
    private final MyService service;
​
    @GetMapping("/count")
    public String count() {
        service.run();
        return "ok";
    }
}
​
@Service
@RequiredArgsConstructor
public class MyService {
​
    private final MyCounter counter;
​
    //waitTime은 최대 5개까지 처리하도록 5초 이상, leaseTime은 task 처리당 1초가 걸리므로 1초 이상으로 설정한다고 해보자
    @RedisLock(key = "counter", waitTime = 5100L, leaseTime = 1100L, timeUnit = TimeUnit.MILLISECONDS)
    public void run() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        counter.count++;
    }
}
​
@Component
public class MyCounter {
​
    public int count = 0;
}
​
//configuration 설정은 application.yml에서 설정
@Configuration
public class RedisConfig {
​
    @Value("${spring.redis.host}")
    private String redisHost;
​
    @Value("${spring.redis.port}")
    private int redisPort;
​
    private static final String REDISSON_HOST_PREFIX = "redis://";
​
    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer().setAddress(REDISSON_HOST_PREFIX + redisHost + ":" + redisPort);
        return Redisson.create(config);
    }
}
​
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisLock {
    String key() default "default";
    long waitTime() default 10L;
    long leaseTime() default 10L;
    TimeUnit timeUnit() default TimeUnit.SECONDS;
}
​
@Slf4j
@Aspect
@Component
@RequiredArgsConstructor
public class RedisLockAop {
​
    private static final String REDIS_LOCK_PREFIX = "LOCK:";
​
    private final RedissonClient redissonClient;
​
    @Around("@annotation(com.test.RedisLock)")
    public Object lock(final ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();
        RedisLock distributedLock = method.getAnnotation(RedisLock.class);
​
        String key = REDIS_LOCK_PREFIX + distributedLock.key();
        RLock lock = redissonClient.getLock(key);
        boolean isLocked = false;
        try {
            isLocked = lock.tryLock(distributedLock.waitTime(), distributedLock.leaseTime(), distributedLock.timeUnit());
            if (!isLocked) {
                throw new RuntimeException("lock timeout!");  //lock을 획득하지 못하면 처리해야 할 로직!
            }
            log.info("Lock acquired!");
            return joinPoint.proceed();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            if (isLocked) {
                lock.unlock();
            }
        }
    }
}

postman이나 jmeter 등을 사용해 테스트를 해보면 locking이 정상 동작하는 것을 확인할 수 있을겁니다. task당 처리가 1초이고, redis와의 I/O 등을 고려해 5.1초 정도로 timeout 설정을 해두었으므로 6개의 요청이 동시에 들어간다면 5개만 성공하고 나머지는 timeout 처리될 것입니다.

사실 redis를 활용하여 lock을 건다고 하여 성능적으로 큰 이점이 있는 것은 아닙니다. in-memory 기반이라고 하여서 lock을 걸어도 더 빠를 것이라고 생각하는데, lock을 획득하고 얼마나 빨리 release 해주는가는 전체 throughput을 늘리는 데 아주 큰 이점을 갖기는 어렵습니다. 아주 조금 더 빨라지고 tps가 조금 더 늘어날 수 있기는 하겠지만요. 심지어 redis에 대한 의존성이 생기므로 이에 대한 관리도 추가적으로 들어가죠.

그럼에도 redis를 사용한 이유는 locking의 책임을 분리하고, 데이터 무결성이 매우 중요한 도메인에서 명시적으로 잠금을 설정하고 손쉽게 구현할 수 있기 때문입니다. 어떤 제품이나 기법, 아키텍쳐 등을 사용했다면 반드시 trade-off가 생깁니다. 계속 고민해야 할 문제겠네요!