[Spring] 동시성 이슈 해결 방법 (3)

2022. 9. 22. 14:38개발/Spring

728x90

지난번 포스팅과 코드를 이어간다.

 

https://devhooney.tistory.com/109

 

[Spring] 동시성 이슈 해결 방법 (2)

지난번 포스팅과 코드를 이어간다. https://devhooney.tistory.com/108 [Spring] 동시성 이슈 해결 방법 (1) 간단한 재고 시스템으로 알아보는 동시성 이슈 Stock @Entity public class Stock { @Id @GeneratedVal..

devhooney.tistory.com

 

DB를 활용하여 데이터 정합성을 맞추는 여러가지 방법을 알아보려한다.

 

Mysql(mariaDB)를 활용한 방법은 3가지가 있다.

  1. Pessimistic Lock (exclusive lock)
  2. Optimistic Lock
  3. Named Lock

 

- Pessimistic Lock

 

서버가 여러 개가 있을 때 서버 1이 lock을 걸면 나머지 서버에서 DB에 접근할 수 없다.

데이터에는 lock을 가진 쓰레드만 접근할 수 있기 때문에 race condition을 방지할 수 있다.

데드락이 걸릴 수 있기 때문에 주의해서 사용해야 한다.

 

Thread1 Stock Thread2
select * from stock where id = 1 id: 1, quantity: 5  
update set quantity = 4 from stock where id = 1 id: 1, quantity: 4  
  id: 1, quantity: 4 select * from stock where id = 1
  id: 1, quantity: 3 update set quantity = 3 from stock where id = 1

 

쓰레드1이 데이터 락을 걸고 데이터를 가져온다.

그 후에 쓰레드2가 락을 걸고 데이터 획득을 시도하지만 락이 걸려있으므로 대기중 상태가 된다.

쓰레드1의 작업이 끝나면 쓰레드2가 락을 걸고 데이터를 가져와서 작업을 진행한다.

 

StockRepository

public interface StockRepository extends JpaRepository<Stock, Long> {

    @Lock(value = LockModeType.PESSIMISTIC_WRITE)
    @Query("select s from Stock s where s.id = :id")
    Stock findByIdWithPessimisticLock(Long id);

 
}

 

스프링 데이터 JPA에서는 @Lock 어노테이션으로 Pessimistic Lock을 설정하기 쉽다.

 

PessimisticLockStockService

@Service
public class PessimisticLockStockService {
    private StockRepository stockRepository;

    public PessimisticLockStockService(StockRepository stockRepository) {
        this.stockRepository = stockRepository;
    }

    @Transactional
    public void decrease(Long id, Long quantity) {
        Stock stock = stockRepository.findByIdWithPessimisticLock(id);

        stock.decrease(quantity);

        stockRepository.saveAndFlush(stock);
    }
}

 

Test 코드

@SpringBootTest
class StockServiceTest {
    @Autowired
    private PessimisticLockStockService stockService;

    @Autowired
    private StockRepository stockRepository;

    @BeforeEach
    public void before() {
        Stock stock = new Stock(1L, 100L);

        stockRepository.saveAndFlush(stock);
    }

    @AfterEach
    public void after() {
        stockRepository.deleteAll();
    }

    @Test
    public void stock_decrease() {
        stockService.decrease(1L, 1L);

        // 100 - 1 = 99
        Stock stock = stockRepository.findById(1L).orElseThrow();

        assertEquals(99, stock.getQuantity());

    }

    @Test
    public void 동시에_100개의_요청() throws InterruptedException {
        int threadCount = 100;
        ExecutorService executorService = Executors.newFixedThreadPool(32);
        CountDownLatch latch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                try {
                    stockService.decrease(1L, 1L);
                } finally {
                    {
                        latch.countDown();
                    }
                }
            });
        }

        latch.await();

        Stock stock = stockRepository.findById(1L).orElseThrow();

        // 100 - (1 * 100) = 0
        assertEquals(0L, stock.getQuantity());

    }
}

충돌이 빈번할 경우 Optimistic Lock보다 성능이 좋다.

데이터 정합성이 어느 정도 보장된다.

별도의 Lock을 잡기 때문에 성능이 떨어질 수 있다.

 

 

- Optimistic Lock

Lock 대신 버전을 사용해서 정합성을 맞추는 방법이다.

서버 1이 버전 1을 가지고 데이터에 접근을 하고, 쿼리 실행 시 DB는 버전2가된다.

서버 2에서 업데이트를 시도할 때 버전 1을 갖고 하는데, DB와 버전이 맞지 않아 진행이 되질 않는다.

서버 2에서 업데이트를 하기 위해서는 새롭게 조회하는 로직이 추가되어야 한다.

이러한 방법으로 race condition을 방지한다.

 

server1 data server2
select * from stock where id = 1 id: 1, quantity: 100, version: 1 read (id: 1, quantity: 100, version: 1)
update quantity = 98, version = version + 1 from stock where id = 1 and version = 1 id: 1, quantity: 98, version: 2  
  id: 1, quantity: 98, version 2 update quantity = 98, version = version + 1 from stock where id = 1 and version = 1

버전이 다르므로 업데이트 실패한다.

 

StockRepository

public interface StockRepository extends JpaRepository<Stock, Long> {

    @Lock(value = LockModeType.PESSIMISTIC_WRITE)
    @Query("select s from Stock s where s.id = :id")
    Stock findByIdWithPessimisticLock(Long id);

    @Lock(value = LockModeType.OPTIMISTIC)
    @Query("select s from Stock s where s.id = :id")
    Stock findByIdWithOptimisticLock(Long id);
}

 

OptimisticLockStockService

@Service
public class OptimisticLockStockService {
    private StockRepository stockRepository;

    public OptimisticLockStockService(StockRepository stockRepository) {
        this.stockRepository = stockRepository;
    }

    @Transactional
    public void decrease(Long id, Long quantity) {
        Stock stock = stockRepository.findByIdWithOptimisticLock(id);

        stock.decrease(quantity);

        stockRepository.saveAndFlush(stock);
    }
}

 

실패했을 경우 재시도하는 로직을 만들기 위해서 Facade라는 패키지 안에 클래스를 생성한다.

 

OptimisticLockStockFacade

@Component
public class OptimisticLockStockFacade {

    private OptimisticLockStockService optimisticLockStockService;

    public OptimisticLockStockFacade(OptimisticLockStockService optimisticLockStockService) {
        this.optimisticLockStockService = optimisticLockStockService;
    }

    public void decrease(Long id, Long quantity) throws InterruptedException {
        while (true) {
            try {
                optimisticLockStockService.decrease(id, quantity);

                break;
            } catch (Exception e) {
                Thread.sleep(50);
            }
        }
    }
}

 

Test 코드

@SpringBootTest
class OptimisticLockStockFacadeTest {
    @Autowired
    private OptimisticLockStockFacade optimisticLockStockFacade;

    @Autowired
    private StockRepository stockRepository;

    @BeforeEach
    public void before() {
        Stock stock = new Stock(1L, 100L);

        stockRepository.saveAndFlush(stock);
    }

    @AfterEach
    public void after() {
        stockRepository.deleteAll();
    }


    @Test
    public void 동시에_100개의_요청() throws InterruptedException {
        int threadCount = 100;
        ExecutorService executorService = Executors.newFixedThreadPool(32);
        CountDownLatch latch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                try {
                    optimisticLockStockFacade.decrease(1L, 1L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    {
                        latch.countDown();
                    }
                }
            });
        }

        latch.await();

        Stock stock = stockRepository.findById(1L).orElseThrow();

        // 100 - (1 * 100) = 0
        assertEquals(0L, stock.getQuantity());

    }
}

재시도 때문에 성능이 떨어질 수 있다.

별도의 Lock이 없으므로 Pessimistic Lock보다 성능이 좋다.

충돌이 빈번할 경우 성능이 떨어진다.

충돌났을 경우 재시도하는 로직을 직접 짜야만 한다.

 

 

- Named Lock

이름을 가진 metadata locking.

이름을 가진 lock을 획득한 후 해제할 때까지 다른 세션은 데이터에 접근할 수 없다.(lock을 획득할 수 없다.)

transaction이 종료될 때 lock이 자동으로 해제되지 않기 때문에 별도로 해제를 수행해주거나 선점시간이 끝나야 해제가 된다.

Pessimistic lock과 비슷하지만, Pessimistic lock은 row나 table 단위로 lock을 걸지만, named는 metadata 단위로 lock을 건다.

 

 

Pessimistic Lock과 반대로 Stock이 아닌 별도의 공간에 Lock을 걸게 된다.

세션1에 Lock을 걸고, 작업하고, Lock 해제 후 세션2에 Lock을 걸고 작업한다.

 

실무에서는 Named Lock을 사용할 경우 데이터소스를 분리해서 사용해야한다.

같은 데이터소스를 사용하면 커넥션 풀이 부족해질 수 있다.

 

강의에서는 같이 사용했다.

 

LockRepository

public interface LockRepository extends JpaRepository<Stock, Long> {
    @Query(value = "select get_lock(:key, 3000)", nativeQuery = true)
    void getLock(String key);

    @Query(value = "select release_lock(:key)", nativeQuery = true)
    void releaseLock(String key);
}

 

실제 로직 전 후로 get, release Lock을 실행시켜줘야 하기 때문에 facade 패키지 안에 클래스를 추가한다.

 

NamedLockStockFacade

@Component
public class NamedLockStockFacade {
    private final LockRepository lockRepository;

    private final StockService stockService;

    public NamedLockStockFacade(LockRepository lockRepository, StockService stockService) {
        this.lockRepository = lockRepository;
        this.stockService = stockService;
    }

    @Transactional
    public void decrease(Long id, Long quantity) {
        try {
            lockRepository.getLock(id.toString());
            stockService.decrease(id, quantity);
        } finally {
            lockRepository.releaseLock(id.toString());
        }
    }
}

 

 

StockService

부모의 트랜잭션과 별도로 실행되기 위해서 propagation을 수정함

@Service
public class StockService {
    private StockRepository stockRepository;

    public StockService(StockRepository stockRepository) {
        this.stockRepository = stockRepository;
    }

    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public synchronized void decrease(Long id, Long quantity) {
        // get stock
        // 재고 감소
        // 저장

        Stock stock = stockRepository.findById(id).orElseThrow();

        stock.decrease(quantity);

        stockRepository.saveAndFlush(stock);
    }
}

 

application.yml

같인 데이터소스를 사용하기 때문에 커넥션풀을 넉넉하게 잡아준다.

spring:
  jpa:
    hibernate:
      ddl-auto: create
    show-sql: true
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/stock_example
    username: root
    password: 1234
    hikari:
      maximum-pool-size: 40

logging:
  level:
    org:
      hibernate:
        SQL: DEBUG
        type:
          descriptor:
            sql:
              BasicBinder: TRACE

 

Test 코드

@SpringBootTest
class NamedLockStockFacadeTest {
    @Autowired
    private NamedLockStockFacade namedLockStockFacade;

    @Autowired
    private StockRepository stockRepository;

    @BeforeEach
    public void before() {
        Stock stock = new Stock(1L, 100L);

        stockRepository.saveAndFlush(stock);
    }

    @AfterEach
    public void after() {
        stockRepository.deleteAll();
    }


    @Test
    public void 동시에_100개의_요청() throws InterruptedException {
        int threadCount = 100;
        ExecutorService executorService = Executors.newFixedThreadPool(32);
        CountDownLatch latch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                try {
                    namedLockStockFacade.decrease(1L, 1L);
                } finally {
                    {
                        latch.countDown();
                    }
                }
            });
        }

        latch.await();

        Stock stock = stockRepository.findById(1L).orElseThrow();

        // 100 - (1 * 100) = 0
        assertEquals(0L, stock.getQuantity());

    }
}

 

NamedLock은 분산 Lock을 구현할 때 사용된다.

트랜잭션 종료 시 Lock 해제를 주의해야 한다.

 

728x90