개발/Java&Kotlin

[Spring] 스프링 배치(Spring Batch) 가이드 따라가기 (4)

devhooney 2022. 12. 14. 09:15
728x90

스프링 배치 가이드

 

- 지난 포스팅에서 튜토리얼 보고 따라해봤는데, 이번에는 개념부터 천천히 공부해보자.

- 책을 보고 공부하려 했으나, 스프링과 부트의 배치 사용 문법이 많이 달라 이동욱님의 블로그를 보고 공부했다.

https://devhooney.tistory.com/136

 

[Spring] 스프링 배치(Spring Batch) 가이드 따라가기 (2)

스프링 배치 가이드 - 지난 포스팅에서 튜토리얼 보고 따라해봤는데, 이번에는 개념부터 천천히 공부해보자. - 책을 보고 공부하려 했으나, 스프링과 부트의 배치 사용 문법이 많이 달라 이동욱

devhooney.tistory.com

 

 

1. Chunk란?

- Chunk는 스프링 배치에서 데이터 덩어리로 작업할 때 각 커밋 사이에 처리되는 row 수를 말한다.

- Chunk 지향 처리는 한 번에 하나의 데이터를 읽어 Chunk 덩어리를 만들어, Chunk 단위로 트랜잭션을 다루는 것을 의미한다.

- 트랜잭션 단위로 작업되기 때문에 실패하면 실패한 Chunk는 롤백되고, 그 전의 Chunk는 반영이 된다.

- 순서

  • Reader에서 데이터 하나를 읽는다.
  • 읽은 데이터를 Processor에서 가공한다.
  • 가공된 데이터를 별도의 공간에 모은 뒤, Chunk 단위 만큼 쌓이게 도면 Writer에 전달하고 저장한다.

- Reader와 Processor에서는 1건씩 다뤄지고, Writer에선 Chunk 단위로 처리된다.

 

 

 

 

 

728x90

 

 

 

 

2. ChunkOrientedTasklet

Chunk 지향 처리의 전체 로직을 다루는 것은 ChunkOrientedTasklet 클래스이다.

 

public class ChunkOrientedTasklet<I> implements Tasklet {

   private static final String INPUTS_KEY = "INPUTS";

   private final ChunkProcessor<I> chunkProcessor;

   private final ChunkProvider<I> chunkProvider;

   private boolean buffering = true;

   private static Log logger = LogFactory.getLog(ChunkOrientedTasklet.class);

   public ChunkOrientedTasklet(ChunkProvider<I> chunkProvider, ChunkProcessor<I> chunkProcessor) {
      this.chunkProvider = chunkProvider;
      this.chunkProcessor = chunkProcessor;
   }
   ...

 

- chunkProvider.provide()로 Reader에서 Chunk size만큼 데이터를 가져온다.

- chunkProcessor.process() 에서 Reader로 받은 데이터를 가공(Processor)하고 저장(Writer)한다.

 

@Nullable
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

   @SuppressWarnings("unchecked")
   Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY);
   if (inputs == null) {
      inputs = chunkProvider.provide(contribution);
      if (buffering) {
         chunkContext.setAttribute(INPUTS_KEY, inputs);
      }
   }

   chunkProcessor.process(contribution, inputs);
   chunkProvider.postProcess(contribution, inputs);

   // Allow a message coming back from the processor to say that we
   // are not done yet
   if (inputs.isBusy()) {
      logger.debug("Inputs still busy");
      return RepeatStatus.CONTINUABLE;
   }

   chunkContext.removeAttribute(INPUTS_KEY);
   chunkContext.setComplete();

   if (logger.isDebugEnabled()) {
      logger.debug("Inputs not busy, ended: " + inputs.isEnd());
   }
   return RepeatStatus.continueIf(!inputs.isEnd());

}

 

- 데이터를 가져오는 chunkProvider.provide()를 보면

 

@Override
public Chunk<I> provide(final StepContribution contribution) throws Exception {

   final Chunk<I> inputs = new Chunk<>();
   repeatOperations.iterate(new RepeatCallback() {

      @Override
      public RepeatStatus doInIteration(final RepeatContext context) throws Exception {
         I item = null;
         Timer.Sample sample = Timer.start(Metrics.globalRegistry);
         String status = BatchMetrics.STATUS_SUCCESS;
         try {
            item = read(contribution, inputs);
         }
         catch (SkipOverflowException e) {
            // read() tells us about an excess of skips by throwing an
            // exception
            status = BatchMetrics.STATUS_FAILURE;
            return RepeatStatus.FINISHED;
         }
         finally {
            stopTimer(sample, contribution.getStepExecution(), status);
         }
         if (item == null) {
            inputs.setEnd();
            return RepeatStatus.FINISHED;
         }
         inputs.add(item);
         contribution.incrementReadCount();
         return RepeatStatus.CONTINUABLE;
      }

   });

 

- inputs이 ChunkSize만큼 쌓일때까지 read()를 호출하는데,  read() 는 내부를 보시면 실제로는 ItemReader.read를 호출한다.

 

@Nullable
protected I read(StepContribution contribution, Chunk<I> chunk) throws SkipOverflowException, Exception {
   return doRead();
}
@Nullable
protected final I doRead() throws Exception {
   try {
      listener.beforeRead();
      I item = itemReader.read();
      if(item != null) {
         listener.afterRead(item);
      }
      return item;
   }
   catch (Exception e) {
      if (logger.isDebugEnabled()) {
         logger.debug(e.getMessage() + " : " + e.getClass().getName());
      }
      listener.onReadError(e);
      throw e;
   }
}

 

- 즉, ItemReader.read에서 1건씩 데이터를 조회해 Chunk size만큼 데이터를 쌓는 것이 provide()가 하는 일이다.

 

 

 

 

 

 

 

3. SimpleChunkProcessor

 

Processor와 Writer 로직을 담고 있는 것은 ChunkProcessor이다.

 

public interface ChunkProcessor<I> {
   
   void process(StepContribution contribution, Chunk<I> chunk) throws Exception;

}

 

- 인터페이스이기 때문에 실제 구현체가 있는데, 기본적으로 SimpleChunkProcessor이다.

 

public class SimpleChunkProcessor<I, O> implements ChunkProcessor<I>, InitializingBean {

   private ItemProcessor<? super I, ? extends O> itemProcessor;

   private ItemWriter<? super O> itemWriter;

   private final MulticasterBatchListener<I, O> listener = new MulticasterBatchListener<>();

   /**
    * Default constructor for ease of configuration.
    */
   @SuppressWarnings("unused")
   private SimpleChunkProcessor() {
      this(null, null);
   }

   public SimpleChunkProcessor(@Nullable ItemProcessor<? super I, ? extends O> itemProcessor, ItemWriter<? super O> itemWriter) {
      this.itemProcessor = itemProcessor;
      this.itemWriter = itemWriter;
   }

   public SimpleChunkProcessor(ItemWriter<? super O> itemWriter) {
      this(null, itemWriter);
   }

   /**
    * @param itemProcessor the {@link ItemProcessor} to set
    */
   public void setItemProcessor(ItemProcessor<? super I, ? extends O> itemProcessor) {
      this.itemProcessor = itemProcessor;
   }

...

 

- 처리를 담당하는 핵심 로직은 process()

 

@Override
public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception {

   // Allow temporary state to be stored in the user data field
   initializeUserData(inputs);

   // If there is no input we don't have to do anything more
   if (isComplete(inputs)) {
      return;
   }

   // Make the transformation, calling remove() on the inputs iterator if
   // any items are filtered. Might throw exception and cause rollback.
   Chunk<O> outputs = transform(contribution, inputs);

   // Adjust the filter count based on available data
   contribution.incrementFilterCount(getFilterCount(inputs, outputs));

   // Adjust the outputs if necessary for housekeeping purposes, and then
   // write them out...
   write(contribution, inputs, getAdjustedOutputs(inputs, outputs));

}

 

- Chunk<I> inputs를 파라미터로 받는다.

  • 이 데이터는 chunkProvider.provide() 에서 받은 ChunkSize만큼 쌓인 item.

transform() 에서는 전달 받은 inputs을 doProcess()로 전달하고 변환값을 받는다.

- transform()을 통해 가공된 대량의 데이터는 write()를 통해 일괄 저장된다.

  • write()는 저장이 될수도 있고, 외부 API로 전송할 수 도 있음.
  • 이는 개발자가 ItemWriter를 어떻게 구현했는지에 따라 달라진다.

- 여기서 transform()은 반복문을 통해 doProcess()를 호출하는데, 이는 ItemProcessor의 process()를 사용한다.

 

protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception {
   Chunk<O> outputs = new Chunk<>();
   for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
      final I item = iterator.next();
      O output;
      Timer.Sample sample = BatchMetrics.createTimerSample();
      String status = BatchMetrics.STATUS_SUCCESS;
      try {
         output = doProcess(item);
      }
      catch (Exception e) {
         /*
          * For a simple chunk processor (no fault tolerance) we are done
          * here, so prevent any more processing of these inputs.
          */
         inputs.clear();
         status = BatchMetrics.STATUS_FAILURE;
         throw e;
      }
      finally {
         stopTimer(sample, contribution.getStepExecution(), "item.process", status, "Item processing");
      }
      if (output != null) {
         outputs.add(output);
      }
      else {
         iterator.remove();
      }
   }
   return outputs;
}

 

- doProcess() 를 처리하는데 만약 ItemProcessor가 없다면 item을 그대로 반환

- ItemProcessor가 있다면 ItemProcessor의 process()로 가공하여 반환한다.

- 이렇게 가공된 데이터들은 SimpleChunkProcessor의 doWrite() 를 호출하여 일괄 처리 한다.

 

protected final void doWrite(List<O> items) throws Exception {

   if (itemWriter == null) {
      return;
   }

   try {
      listener.beforeWrite(items);
      writeItems(items);
      doAfterWrite(items);
   }
   catch (Exception e) {
      doOnWriteError(e, items);
      throw e;
   }

}

 

 

4. Page Size vs Chunk Size

- 스프링 배치를 더 사용해보고 수정 예정

 

 

 

 

 

728x90