개발/Java&Kotlin

[Spring] 스프링 배치(Spring Batch) 가이드 따라가기 (5)

devhooney 2022. 12. 18. 21:42
728x90

스프링 배치 가이드

 

- 지난 포스팅에서 튜토리얼 보고 따라해봤는데, 이번에는 개념부터 천천히 공부해보자.

- 책을 보고 공부하려 했으나, 스프링과 부트의 배치 사용 문법이 많이 달라 이동욱님의 블로그를 보고 공부했다.

https://devhooney.tistory.com/136

 

[Spring] 스프링 배치(Spring Batch) 가이드 따라가기 (2)

스프링 배치 가이드 - 지난 포스팅에서 튜토리얼 보고 따라해봤는데, 이번에는 개념부터 천천히 공부해보자. - 책을 보고 공부하려 했으나, 스프링과 부트의 배치 사용 문법이 많이 달라 이동욱

devhooney.tistory.com

 

 

- Spring Batch는 Chunk 지향 처리를 하고 있다.

- Spring Batch는 Job과 Step으로 구성되어 있다.

- Step은 Tasklet 단위로 처리된다.

- Tasklet 중 ChunkOrientedTasklet을 통해 Chunk를 처리한다.

- ChunkOrientedTasklet는 ItemReader, ItemWriter, ItemProcessor가 있다.

 

 

1.ItemReader란?

- Spring Batch의 ItemReader는 데이터를 읽는다.

- DB, File, XML, JSON 등 데이터 소스를 읽는다.

- JMS (Java Message Service)와 같은 다른 유형의 데이터 소스도 읽는다.

- ItemReader의 구현체 중 JdbcPagingItemReader를 보면 ItemReader와 ItemStream이 있다. 

 

- ItemReader 인터페이스

public interface ItemReader<T> {

   /**
    * Reads a piece of input data and advance to the next one. Implementations
    * <strong>must</strong> return <code>null</code> at the end of the input
    * data set. In a transactional setting, caller might get the same item
    * twice from successive calls (or otherwise), if the first call was in a
    * transaction that rolled back.
    * 
    * @throws ParseException if there is a problem parsing the current record
    * (but the next one may still be valid)
    * @throws NonTransientResourceException if there is a fatal exception in
    * the underlying resource. After throwing this exception implementations
    * should endeavour to return null from subsequent calls to read.
    * @throws UnexpectedInputException if there is an uncategorised problem
    * with the input data. Assume potentially transient, so subsequent calls to
    * read might succeed.
    * @throws Exception if an there is a non-specific error.
    * @return T the item to be processed or {@code null} if the data source is
    * exhausted
    */
   @Nullable
   T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;

}

- read()는 데이터를 읽는 메소드

 

 

- ItemStream 인터페이스

public interface ItemStream {

   /**
    * Open the stream for the provided {@link ExecutionContext}.
    *
    * @param executionContext current step's {@link org.springframework.batch.item.ExecutionContext}.  Will be the
    *                            executionContext from the last run of the step on a restart.
    * @throws IllegalArgumentException if context is null
    */
   void open(ExecutionContext executionContext) throws ItemStreamException;

   /**
    * Indicates that the execution context provided during open is about to be saved. If any state is remaining, but
    * has not been put in the context, it should be added here.
    * 
    * @param executionContext to be updated
    * @throws IllegalArgumentException if executionContext is null.
    */
   void update(ExecutionContext executionContext) throws ItemStreamException;

   /**
    * If any resources are needed for the stream to operate they need to be destroyed here. Once this method has been
    * called all other methods (except open) may throw an exception.
    */
   void close() throws ItemStreamException;
}

 

- ItemStream 인터페이스는 주기적으로 상태를 저장하고, 오류가 발생하면 해당 상태에서 복원하는 역할을 한다.

즉, ItemReader의 상태를 저장하고 실패한 곳에서 다시 실행할 수 있게 해주는 역할

 

- open(), close()는 스트림을 열고 닫는 메소드

- update()는 Batch 처리의 상태를 업데이트하는 메소드

 

2. Database Reader

- 일반적으로 Batch 작업은 많은 양의 데이터를 처리한다.

-  Spring의 JdbcTemplate은 분할 처리를 지원하지 않는다. 그래서 직접 limit, offset을 사용하는 작업이 필요하다.

- Spring Batch는 이런 문제를 해결하기 위해 2개의 Reader 타입(Cursor, Paging)을 지원

 

- Cursor

  • Cursor는 실제로 JDBC ResultSet의 기본 기능
  • ResultSet이 open 될 때마다 next() 메소드가 호출 되어 Database의 데이터가 반환
  • 이를 통해 필요에 따라 Database에서 데이터를 Streaming 할 수 있다.

 

- Paging

  • Paging은 page라는 Chunk로 Database에서 데이터를 검색
  • 즉, 페이지 단위로 한번에 데이터를 조회해오는 방식.

 

- Cursor vs Paging

  • Cursor 방식은 Database와 커넥션을 맺은 후, Cursor를 한칸씩 옮기면서 지속적으로 데이터를 가져온다.
  • Paging 방식에서는 한번에 10개 (혹은 개발자가 지정한 PageSize)만큼 데이터를 가져온다.

 

- Cursor 기반 ItemReader 구현체

  • JdbcCursorItemReader
  • HibernateCursorItemReader
  • StoredProcedureItemReader

Paging 기반 ItemReader 구현체

  • JdbcPagingItemReader
  • HibernatePagingItemReader
  • JpaPagingItemReader

 

3. PagingItemReader(JpaPagingItem)

- 게시판 페이징과 같이 batch를 페이징 하는 것

- offset과 limit을 지정하여 사용

- Spring Batch에서는 offset과 limit을 PageSize에 맞게 자동으로 생성
- 각 쿼리는 개별적으로 실행

- 각 페이지마다 새로운 쿼리를 실행하므로 페이징시 결과를 정렬하는 것이 중요하다.



- 데이터 생성

create table pay (
  id         bigint not null auto_increment,
  amount     bigint,
  tx_name     varchar(255),
  tx_date_time datetime,
  primary key (id)
) engine = InnoDB;

insert into pay (amount, tx_name, tx_date_time) VALUES (1000, 'trade1', '2018-09-10 00:00:00');
insert into pay (amount, tx_name, tx_date_time) VALUES (2000, 'trade2', '2018-09-10 00:00:00');
insert into pay (amount, tx_name, tx_date_time) VALUES (3000, 'trade3', '2018-09-10 00:00:00');
insert into pay (amount, tx_name, tx_date_time) VALUES (4000, 'trade4', '2018-09-10 00:00:00');

 

 

- Pay Entity

@ToString
@Getter
@Setter
@NoArgsConstructor
@Entity
public class Pay {
    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss");

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private Long amount;
    private String txName;
    private LocalDateTime txDateTime;

    public Pay(Long amount, String txName, String txDateTime) {
        this.amount = amount;
        this.txName = txName;
        this.txDateTime = LocalDateTime.parse(txDateTime, FORMATTER);
    }

    public Pay(Long id, Long amount, String txName, String txDateTime) {
        this.id = id;
        this.amount = amount;
        this.txName = txName;
        this.txDateTime = LocalDateTime.parse(txDateTime, FORMATTER);
    }
}

 

- JpaPagingItemReaderJobConfiguration 

@Slf4j
@RequiredArgsConstructor
@Configuration
public class JpaPagingItemReaderJobConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory entityManagerFactory;

    private int chunkSize = 10;

    @Bean
    public Job jpaPagingItemReaderJob() {
        return jobBuilderFactory.get("jpaPagingItemReaderJob")
                .start(jpaPagingItemReaderStep())
                .build();
    }

    @Bean
    public Step jpaPagingItemReaderStep() {
        return stepBuilderFactory.get("jpaPagingItemReaderStep")
                .<Pay, Pay>chunk(chunkSize)
                .reader(jpaPagingItemReader())
                .writer(jpaPagingItemWriter())
                .build();
    }

    @Bean
    public JpaPagingItemReader<Pay> jpaPagingItemReader() {
        return new JpaPagingItemReaderBuilder<Pay>()
                .name("jpaPagingItemReader")
                .entityManagerFactory(entityManagerFactory)
                .pageSize(chunkSize)
                .queryString("SELECT p FROM Pay p WHERE amount >= 2000")
                .build();
    }

    private ItemWriter<Pay> jpaPagingItemWriter() {
        return list -> {
            for (Pay pay: list) {
                log.info("Current Pay={}", pay);
            }
        };
    }
}

 

 

- 실행하면

데이터를 잘 읽어왔다.

 

 

 

- 참고

http://jojoldu.tistory.com/336?category=902551 

728x90