스프링 배치 가이드
- 지난 포스팅에서 튜토리얼 보고 따라해봤는데, 이번에는 개념부터 천천히 공부해보자.
- 책을 보고 공부하려 했으나, 스프링과 부트의 배치 사용 문법이 많이 달라 이동욱님의 블로그를 보고 공부했다.
https://devhooney.tistory.com/136
1. Chunk란?
- Chunk는 스프링 배치에서 데이터 덩어리로 작업할 때 각 커밋 사이에 처리되는 row 수를 말한다.
- Chunk 지향 처리는 한 번에 하나의 데이터를 읽어 Chunk 덩어리를 만들어, Chunk 단위로 트랜잭션을 다루는 것을 의미한다.
- 트랜잭션 단위로 작업되기 때문에 실패하면 실패한 Chunk는 롤백되고, 그 전의 Chunk는 반영이 된다.
- 순서
- Reader에서 데이터 하나를 읽는다.
- 읽은 데이터를 Processor에서 가공한다.
- 가공된 데이터를 별도의 공간에 모은 뒤, Chunk 단위 만큼 쌓이게 도면 Writer에 전달하고 저장한다.
- Reader와 Processor에서는 1건씩 다뤄지고, Writer에선 Chunk 단위로 처리된다.
2. ChunkOrientedTasklet
- Chunk 지향 처리의 전체 로직을 다루는 것은 ChunkOrientedTasklet 클래스이다.
public class ChunkOrientedTasklet<I> implements Tasklet {
private static final String INPUTS_KEY = "INPUTS";
private final ChunkProcessor<I> chunkProcessor;
private final ChunkProvider<I> chunkProvider;
private boolean buffering = true;
private static Log logger = LogFactory.getLog(ChunkOrientedTasklet.class);
public ChunkOrientedTasklet(ChunkProvider<I> chunkProvider, ChunkProcessor<I> chunkProcessor) {
this.chunkProvider = chunkProvider;
this.chunkProcessor = chunkProcessor;
}
...
- chunkProvider.provide()로 Reader에서 Chunk size만큼 데이터를 가져온다.
- chunkProcessor.process() 에서 Reader로 받은 데이터를 가공(Processor)하고 저장(Writer)한다.
@Nullable
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
@SuppressWarnings("unchecked")
Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY);
if (inputs == null) {
inputs = chunkProvider.provide(contribution);
if (buffering) {
chunkContext.setAttribute(INPUTS_KEY, inputs);
}
}
chunkProcessor.process(contribution, inputs);
chunkProvider.postProcess(contribution, inputs);
// Allow a message coming back from the processor to say that we
// are not done yet
if (inputs.isBusy()) {
logger.debug("Inputs still busy");
return RepeatStatus.CONTINUABLE;
}
chunkContext.removeAttribute(INPUTS_KEY);
chunkContext.setComplete();
if (logger.isDebugEnabled()) {
logger.debug("Inputs not busy, ended: " + inputs.isEnd());
}
return RepeatStatus.continueIf(!inputs.isEnd());
}
- 데이터를 가져오는 chunkProvider.provide()를 보면
@Override
public Chunk<I> provide(final StepContribution contribution) throws Exception {
final Chunk<I> inputs = new Chunk<>();
repeatOperations.iterate(new RepeatCallback() {
@Override
public RepeatStatus doInIteration(final RepeatContext context) throws Exception {
I item = null;
Timer.Sample sample = Timer.start(Metrics.globalRegistry);
String status = BatchMetrics.STATUS_SUCCESS;
try {
item = read(contribution, inputs);
}
catch (SkipOverflowException e) {
// read() tells us about an excess of skips by throwing an
// exception
status = BatchMetrics.STATUS_FAILURE;
return RepeatStatus.FINISHED;
}
finally {
stopTimer(sample, contribution.getStepExecution(), status);
}
if (item == null) {
inputs.setEnd();
return RepeatStatus.FINISHED;
}
inputs.add(item);
contribution.incrementReadCount();
return RepeatStatus.CONTINUABLE;
}
});
- inputs이 ChunkSize만큼 쌓일때까지 read()를 호출하는데, 이 read() 는 내부를 보시면 실제로는 ItemReader.read를 호출한다.
@Nullable
protected I read(StepContribution contribution, Chunk<I> chunk) throws SkipOverflowException, Exception {
return doRead();
}
@Nullable
protected final I doRead() throws Exception {
try {
listener.beforeRead();
I item = itemReader.read();
if(item != null) {
listener.afterRead(item);
}
return item;
}
catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(e.getMessage() + " : " + e.getClass().getName());
}
listener.onReadError(e);
throw e;
}
}
- 즉, ItemReader.read에서 1건씩 데이터를 조회해 Chunk size만큼 데이터를 쌓는 것이 provide()가 하는 일이다.
3. SimpleChunkProcessor
- Processor와 Writer 로직을 담고 있는 것은 ChunkProcessor이다.
public interface ChunkProcessor<I> {
void process(StepContribution contribution, Chunk<I> chunk) throws Exception;
}
- 인터페이스이기 때문에 실제 구현체가 있는데, 기본적으로 SimpleChunkProcessor이다.
public class SimpleChunkProcessor<I, O> implements ChunkProcessor<I>, InitializingBean {
private ItemProcessor<? super I, ? extends O> itemProcessor;
private ItemWriter<? super O> itemWriter;
private final MulticasterBatchListener<I, O> listener = new MulticasterBatchListener<>();
/**
* Default constructor for ease of configuration.
*/
@SuppressWarnings("unused")
private SimpleChunkProcessor() {
this(null, null);
}
public SimpleChunkProcessor(@Nullable ItemProcessor<? super I, ? extends O> itemProcessor, ItemWriter<? super O> itemWriter) {
this.itemProcessor = itemProcessor;
this.itemWriter = itemWriter;
}
public SimpleChunkProcessor(ItemWriter<? super O> itemWriter) {
this(null, itemWriter);
}
/**
* @param itemProcessor the {@link ItemProcessor} to set
*/
public void setItemProcessor(ItemProcessor<? super I, ? extends O> itemProcessor) {
this.itemProcessor = itemProcessor;
}
...
- 처리를 담당하는 핵심 로직은 process()
@Override
public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception {
// Allow temporary state to be stored in the user data field
initializeUserData(inputs);
// If there is no input we don't have to do anything more
if (isComplete(inputs)) {
return;
}
// Make the transformation, calling remove() on the inputs iterator if
// any items are filtered. Might throw exception and cause rollback.
Chunk<O> outputs = transform(contribution, inputs);
// Adjust the filter count based on available data
contribution.incrementFilterCount(getFilterCount(inputs, outputs));
// Adjust the outputs if necessary for housekeeping purposes, and then
// write them out...
write(contribution, inputs, getAdjustedOutputs(inputs, outputs));
}
- Chunk<I> inputs를 파라미터로 받는다.
- 이 데이터는 chunkProvider.provide() 에서 받은 ChunkSize만큼 쌓인 item.
- transform() 에서는 전달 받은 inputs을 doProcess()로 전달하고 변환값을 받는다.
- transform()을 통해 가공된 대량의 데이터는 write()를 통해 일괄 저장된다.
- write()는 저장이 될수도 있고, 외부 API로 전송할 수 도 있음.
- 이는 개발자가 ItemWriter를 어떻게 구현했는지에 따라 달라진다.
- 여기서 transform()은 반복문을 통해 doProcess()를 호출하는데, 이는 ItemProcessor의 process()를 사용한다.
protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception {
Chunk<O> outputs = new Chunk<>();
for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
final I item = iterator.next();
O output;
Timer.Sample sample = BatchMetrics.createTimerSample();
String status = BatchMetrics.STATUS_SUCCESS;
try {
output = doProcess(item);
}
catch (Exception e) {
/*
* For a simple chunk processor (no fault tolerance) we are done
* here, so prevent any more processing of these inputs.
*/
inputs.clear();
status = BatchMetrics.STATUS_FAILURE;
throw e;
}
finally {
stopTimer(sample, contribution.getStepExecution(), "item.process", status, "Item processing");
}
if (output != null) {
outputs.add(output);
}
else {
iterator.remove();
}
}
return outputs;
}
- doProcess() 를 처리하는데 만약 ItemProcessor가 없다면 item을 그대로 반환
- ItemProcessor가 있다면 ItemProcessor의 process()로 가공하여 반환한다.
- 이렇게 가공된 데이터들은 SimpleChunkProcessor의 doWrite() 를 호출하여 일괄 처리 한다.
protected final void doWrite(List<O> items) throws Exception {
if (itemWriter == null) {
return;
}
try {
listener.beforeWrite(items);
writeItems(items);
doAfterWrite(items);
}
catch (Exception e) {
doOnWriteError(e, items);
throw e;
}
}
4. Page Size vs Chunk Size
- 스프링 배치를 더 사용해보고 수정 예정
'개발 > Java&Kotlin' 카테고리의 다른 글
[Spring] 스프링 배치(Spring Batch) 가이드 따라가기 (6) (0) | 2022.12.20 |
---|---|
[Spring] 스프링 배치(Spring Batch) 가이드 따라가기 (5) (0) | 2022.12.18 |
[Spring] 스프링 배치(Spring Batch) 가이드 따라가기 (3) (0) | 2022.12.13 |
[Spring] 스프링 배치(Spring Batch) 가이드 따라가기 (2) (0) | 2022.12.11 |
[Spring] 스프링 배치(Spring Batch) 가이드 따라가기 (1) (0) | 2022.12.10 |