스프링 배치 가이드
- 지난 포스팅에서 튜토리얼 보고 따라해봤는데, 이번에는 개념부터 천천히 공부해보자.
- 책을 보고 공부하려 했으나, 스프링과 부트의 배치 사용 문법이 많이 달라 이동욱님의 블로그를 보고 공부했다.
https://devhooney.tistory.com/136
- Spring Batch는 Chunk 지향 처리를 하고 있다.
- Spring Batch는 Job과 Step으로 구성되어 있다.
- Step은 Tasklet 단위로 처리된다.
- Tasklet 중 ChunkOrientedTasklet을 통해 Chunk를 처리한다.
- ChunkOrientedTasklet는 ItemReader, ItemWriter, ItemProcessor가 있다.
1.ItemReader란?
- Spring Batch의 ItemReader는 데이터를 읽는다.
- DB, File, XML, JSON 등 데이터 소스를 읽는다.
- JMS (Java Message Service)와 같은 다른 유형의 데이터 소스도 읽는다.
- ItemReader의 구현체 중 JdbcPagingItemReader를 보면 ItemReader와 ItemStream이 있다.
- ItemReader 인터페이스
public interface ItemReader<T> {
/**
* Reads a piece of input data and advance to the next one. Implementations
* <strong>must</strong> return <code>null</code> at the end of the input
* data set. In a transactional setting, caller might get the same item
* twice from successive calls (or otherwise), if the first call was in a
* transaction that rolled back.
*
* @throws ParseException if there is a problem parsing the current record
* (but the next one may still be valid)
* @throws NonTransientResourceException if there is a fatal exception in
* the underlying resource. After throwing this exception implementations
* should endeavour to return null from subsequent calls to read.
* @throws UnexpectedInputException if there is an uncategorised problem
* with the input data. Assume potentially transient, so subsequent calls to
* read might succeed.
* @throws Exception if an there is a non-specific error.
* @return T the item to be processed or {@code null} if the data source is
* exhausted
*/
@Nullable
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
- read()는 데이터를 읽는 메소드
- ItemStream 인터페이스
public interface ItemStream {
/**
* Open the stream for the provided {@link ExecutionContext}.
*
* @param executionContext current step's {@link org.springframework.batch.item.ExecutionContext}. Will be the
* executionContext from the last run of the step on a restart.
* @throws IllegalArgumentException if context is null
*/
void open(ExecutionContext executionContext) throws ItemStreamException;
/**
* Indicates that the execution context provided during open is about to be saved. If any state is remaining, but
* has not been put in the context, it should be added here.
*
* @param executionContext to be updated
* @throws IllegalArgumentException if executionContext is null.
*/
void update(ExecutionContext executionContext) throws ItemStreamException;
/**
* If any resources are needed for the stream to operate they need to be destroyed here. Once this method has been
* called all other methods (except open) may throw an exception.
*/
void close() throws ItemStreamException;
}
- ItemStream 인터페이스는 주기적으로 상태를 저장하고, 오류가 발생하면 해당 상태에서 복원하는 역할을 한다.
즉, ItemReader의 상태를 저장하고 실패한 곳에서 다시 실행할 수 있게 해주는 역할
- open(), close()는 스트림을 열고 닫는 메소드
- update()는 Batch 처리의 상태를 업데이트하는 메소드
2. Database Reader
- 일반적으로 Batch 작업은 많은 양의 데이터를 처리한다.
- Spring의 JdbcTemplate은 분할 처리를 지원하지 않는다. 그래서 직접 limit, offset을 사용하는 작업이 필요하다.
- Spring Batch는 이런 문제를 해결하기 위해 2개의 Reader 타입(Cursor, Paging)을 지원
- Cursor
- Cursor는 실제로 JDBC ResultSet의 기본 기능
- ResultSet이 open 될 때마다 next() 메소드가 호출 되어 Database의 데이터가 반환
- 이를 통해 필요에 따라 Database에서 데이터를 Streaming 할 수 있다.
- Paging
- Paging은 page라는 Chunk로 Database에서 데이터를 검색
- 즉, 페이지 단위로 한번에 데이터를 조회해오는 방식.
- Cursor vs Paging
- Cursor 방식은 Database와 커넥션을 맺은 후, Cursor를 한칸씩 옮기면서 지속적으로 데이터를 가져온다.
- Paging 방식에서는 한번에 10개 (혹은 개발자가 지정한 PageSize)만큼 데이터를 가져온다.
- Cursor 기반 ItemReader 구현체
- JdbcCursorItemReader
- HibernateCursorItemReader
- StoredProcedureItemReader
- Paging 기반 ItemReader 구현체
- JdbcPagingItemReader
- HibernatePagingItemReader
- JpaPagingItemReader
3. PagingItemReader(JpaPagingItem)
- 게시판 페이징과 같이 batch를 페이징 하는 것
- offset과 limit을 지정하여 사용
- Spring Batch에서는 offset과 limit을 PageSize에 맞게 자동으로 생성
- 각 쿼리는 개별적으로 실행
- 각 페이지마다 새로운 쿼리를 실행하므로 페이징시 결과를 정렬하는 것이 중요하다.
- 데이터 생성
create table pay (
id bigint not null auto_increment,
amount bigint,
tx_name varchar(255),
tx_date_time datetime,
primary key (id)
) engine = InnoDB;
insert into pay (amount, tx_name, tx_date_time) VALUES (1000, 'trade1', '2018-09-10 00:00:00');
insert into pay (amount, tx_name, tx_date_time) VALUES (2000, 'trade2', '2018-09-10 00:00:00');
insert into pay (amount, tx_name, tx_date_time) VALUES (3000, 'trade3', '2018-09-10 00:00:00');
insert into pay (amount, tx_name, tx_date_time) VALUES (4000, 'trade4', '2018-09-10 00:00:00');
- Pay Entity
@ToString
@Getter
@Setter
@NoArgsConstructor
@Entity
public class Pay {
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss");
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long amount;
private String txName;
private LocalDateTime txDateTime;
public Pay(Long amount, String txName, String txDateTime) {
this.amount = amount;
this.txName = txName;
this.txDateTime = LocalDateTime.parse(txDateTime, FORMATTER);
}
public Pay(Long id, Long amount, String txName, String txDateTime) {
this.id = id;
this.amount = amount;
this.txName = txName;
this.txDateTime = LocalDateTime.parse(txDateTime, FORMATTER);
}
}
- JpaPagingItemReaderJobConfiguration
@Slf4j
@RequiredArgsConstructor
@Configuration
public class JpaPagingItemReaderJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
private int chunkSize = 10;
@Bean
public Job jpaPagingItemReaderJob() {
return jobBuilderFactory.get("jpaPagingItemReaderJob")
.start(jpaPagingItemReaderStep())
.build();
}
@Bean
public Step jpaPagingItemReaderStep() {
return stepBuilderFactory.get("jpaPagingItemReaderStep")
.<Pay, Pay>chunk(chunkSize)
.reader(jpaPagingItemReader())
.writer(jpaPagingItemWriter())
.build();
}
@Bean
public JpaPagingItemReader<Pay> jpaPagingItemReader() {
return new JpaPagingItemReaderBuilder<Pay>()
.name("jpaPagingItemReader")
.entityManagerFactory(entityManagerFactory)
.pageSize(chunkSize)
.queryString("SELECT p FROM Pay p WHERE amount >= 2000")
.build();
}
private ItemWriter<Pay> jpaPagingItemWriter() {
return list -> {
for (Pay pay: list) {
log.info("Current Pay={}", pay);
}
};
}
}
- 실행하면
데이터를 잘 읽어왔다.
- 참고
'개발 > Java&Kotlin' 카테고리의 다른 글
[Spring] 스프링 배치(Spring Batch) 가이드 따라가기 (7) (0) | 2022.12.23 |
---|---|
[Spring] 스프링 배치(Spring Batch) 가이드 따라가기 (6) (0) | 2022.12.20 |
[Spring] 스프링 배치(Spring Batch) 가이드 따라가기 (4) (0) | 2022.12.14 |
[Spring] 스프링 배치(Spring Batch) 가이드 따라가기 (3) (0) | 2022.12.13 |
[Spring] 스프링 배치(Spring Batch) 가이드 따라가기 (2) (0) | 2022.12.11 |