대부분 스프링 배치의 내용은 '기억보다는 기록을' 블로그를 참고하여 공부하였습니다.
배치에 대해 공부하고자 하시는분은 이 글 보다 아래 블로그를 보는게 훨씬 도움이 됩니다
https://jojoldu.tistory.com/
1탄 https://mizdevlog.tistory.com/12
2탄 https://mizdevlog.tistory.com/13
코드는 GitHub에 있습니다.
1. 기존상황
- 2탄에서 @Component로 지정한
DataShareBean
을 통해 step간 데이터 공유를 쉽게 만들었습니다. 추가적으로 처음 작성할때 ConcurrentHashMap을 잘못사용했던 점을 고쳤습니다.
2. 개선사항
- 이제 Step간에 병렬로 처리할때 발생하는 필드변수의 문제점은 해결하였으니 진짜 병렬로 처리해보고 성능 개선을 해보겠습니다.
- 테스트를 위해서 1탄에서 사용한 Pay데이터를 10개, 10만개, 100만개 순으로 늘려서 적용해보았습니다.
- 다양한 병렬 처리기능을 SpringBatch에서 지원하지만, 저는 아래에서
jojoldu
님의 글을 참조하여 파티셔닝으로 처리해보겠습니다.
3. 시작
파티셔닝
매니저(마스터)를 이용해 데이터를 쪼개서 나눈 다음 파티션에서 슬레이브가 독립적으로 동작한다.
- 데이터를 더 작은 Chunk 단위로 쪼개서 적용한다.
- 기존의 코드에 대한 변경 없이 추가적인 코드 작성으로 병렬처리가 가능하다.
4. JobConfiguration Class
@Slf4j @Configuration @RequiredArgsConstructor public class PayTotalJobPartitionConfiguration { private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); public static final String JOB_NAME = "PayTotalJobPartitionJob"; public static final String BEAN_PREFIX = JOB_NAME + "_"; private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final EntityManagerFactory entityManagerFactory; private final PayRepository payRepository; private final TotalPayRepository totalPayRepository; private final DataShareBean<Long> dataShareBean; private final static int chunkSize = 1000; private final static int poolSize = 20; @Bean(name = JOB_NAME+"_taskPool") public TaskExecutor executor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(poolSize); executor.setMaxPoolSize(poolSize); executor.setThreadNamePrefix("partition-thread"); executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE); executor.initialize(); return executor; } @Bean(name = JOB_NAME+"_partitionHandler") public TaskExecutorPartitionHandler partitionHandler() { TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler(); // (1) partitionHandler.setStep(step1()); // (2) partitionHandler.setTaskExecutor(executor()); // (3) partitionHandler.setGridSize(poolSize); // (4) return partitionHandler; } @Bean(JOB_NAME) public Job job() { //listener beforeJob으로 변경 //dataShareBean.putData("totalAmount", 0L); return jobBuilderFactory.get(JOB_NAME) .listener(listener()) .start(step1Manager()) .next(step2(null)) //.preventRestart() .build(); } @Bean(BEAN_PREFIX + "listener") public JobExecutionListener listener() { return new TotalJobListener(dataShareBean); } @Bean(name = JOB_NAME +"_step1Manager") public Step step1Manager() { return stepBuilderFactory.get("step1.manager") // (1) .partitioner("step1", partitioner(null)) // (2) .step(step1()) // (3) .partitionHandler(partitionHandler()) // (4) .build(); } @Bean(name = JOB_NAME +"_partitioner") @StepScope public PayIdRangePartitioner partitioner( @Value("#{jobParameters[requestDate]}") String requestDate) { return new PayIdRangePartitioner(payRepository,requestDate); } @Bean(BEAN_PREFIX + "step") public Step step1() { return stepBuilderFactory.get(BEAN_PREFIX + "step") .<Pay, Pay>chunk(chunkSize) .reader(reader(null,null)) .writer(writer()) .build(); } @Bean(BEAN_PREFIX + "reader") @StepScope public JpaPagingItemReader<Pay> reader( @Value("#{stepExecutionContext[minId]}") Long minId, @Value("#{stepExecutionContext[maxId]}") Long maxId ) { Map<String, Object> params = new HashMap<>(); params.put("minId", minId); params.put("maxId", maxId); return new JpaPagingItemReaderBuilder<Pay>() .name(BEAN_PREFIX + "reader") .entityManagerFactory(entityManagerFactory) .pageSize(chunkSize) .queryString("select p from Pay p where p.id BETWEEN :minId AND :maxId") .parameterValues(params) .build(); } @Bean(BEAN_PREFIX + "writer") public ItemWriter<Pay> writer() { return list -> { for(Pay pay : list) { dataShareBean.addData("totalAmount",pay.getAmount()); } }; } @Bean(BEAN_PREFIX + "step2") @JobScope public Step step2(@Value("#{jobParameters[requestDate]}") String requestDate) { return stepBuilderFactory.get(BEAN_PREFIX + "step2") .tasklet((contribution, chunkContext) -> { TotalPay(dataShareBean.getTotal(),requestDate); TotalPay totalPay = new TotalPay(dataShareBean.getData("totalAmount"),requestDate); totalPayRepository.save(totalPay); return RepeatStatus.FINISHED; }).build(); }
}
#### 4.1 코드별 소개
- PartionHandler
```java
@Bean(name = JOB_NAME+"_partitionHandler")
public TaskExecutorPartitionHandler partitionHandler() {
TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler(); // (1)
partitionHandler.setStep(step1()); // (2)
partitionHandler.setTaskExecutor(executor()); // (3)
partitionHandler.setGridSize(poolSize); // (4)
return partitionHandler;
}
- TaskExecutorPartitionHandler 을 사용하여 멀티쓰레드로 활용할 수 있게 합니다.
- 사용할 step을 설정해줍니다.
- 멀티쓰레드를 실행하기 위해 executor를 설정합니다.
- gridSize와 멀티쓰레드를 같은 사이즈로 쓰기 위해
poolSize
로 동일하게 사용합니다.
- executor : TaskExcutor
@Bean(name = JOB_NAME+"_taskPool") public TaskExecutor executor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(poolSize); executor.setMaxPoolSize(poolSize); executor.setThreadNamePrefix("partition-thread"); executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE); executor.initialize(); return executor; }
- partition을 멀티쓰레드로 돌릴수있게 poolsize등을 설정합니다.
- StepManager()
@Bean(name = JOB_NAME +"_step1Manager") public Step step1Manager() { return stepBuilderFactory.get("step1.manager") // (1) .partitioner("step1", partitioner(null)) // (2) .step(step1()) // (3) .partitionHandler(partitionHandler()) // (4) .build(); }
- manager의 이름을 정합니다. 지금상황은 step1의 매니저이기때문에 "step1.manager"로 지정합니다.
- step1에 사용될 Partitioner 구현체를 등록합니다.
- 파니셔닝 될 step을 등록한다
- 파티셔닝을 관리한 handler를 등록합니다.
partitioner
@Bean(name = JOB_NAME +"_partitioner") @StepScope public PayIdRangePartitioner partitioner( @Value("#{jobParameters[requestDate]}") String requestDate) { return new PayIdRangePartitioner(payRepository,requestDate); //(1) }
- 저는 요청날짜를 기반으로 그 날의 데이터들을 총합하는 배치를 하기 때문에 요청날짜를 넣어줍니다.
PayIdRangePartitioner
@Slf4j @RequiredArgsConstructor public class PayIdRangePartitioner implements Partitioner { private final PayRepository payRepository; private final String requestDate; @Override public Map<String, ExecutionContext> partition(int gridSize) { Long max = payRepository.findMaxId(requestDate); Long min = payRepository.findMinId(requestDate); Long targetSize = (max - min) / gridSize + 1; log.info("max,min = {} {}", max,min); Map<String, ExecutionContext> result = new HashMap<>(); long number = 0; long start = min; long end = start + targetSize - 1; while (start <= max) { ExecutionContext value = new ExecutionContext(); result.put("partition" + number, value); if (end >= max) { end = max; } value.putLong("minId", start); value.putLong("maxId", end); log.info("partion min max = {} {}",start,end); start += targetSize; end += targetSize; number++; } return result; } }
요청 날짜의 모든 Pay 의 max Id 와 Min Id를 가져 온뒤 gridSize(poolSize)에 맞게 쪼개어서
Map<String, ExecutionContext>
을 반환합니다.생성된 ExecutionContext에 맞춰 Worker Step들이 생성되어 그들의
Step Executions
이 됩니다.
- itemReader
@Bean(BEAN_PREFIX + "reader") @StepScope public JpaPagingItemReader<Pay> reader( @Value("#{stepExecutionContext[minId]}") Long minId, @Value("#{stepExecutionContext[maxId]}") Long maxId ) { Map<String, Object> params = new HashMap<>(); params.put("minId", minId); params.put("maxId", maxId); return new JpaPagingItemReaderBuilder<Pay>() .name(BEAN_PREFIX + "reader") .entityManagerFactory(entityManagerFactory) .pageSize(chunkSize) .queryString("select p from Pay p where p.id BETWEEN :minId AND :maxId") .parameterValues(params) .build(); }
- itemReader에서는 위에서 생성해준
Step Executions
에 있는 max id와 min id를 받아옵니다.
TotalJobListener
@RequiredArgsConstructor @Slf4j public class TotalJobListener extends JobExecutionListenerSupport { private final DataShareBean dataShareBean; @Override public void beforeJob(JobExecution jobExecution) { if(jobExecution.getStatus() == BatchStatus.STARTED) { log.info("start Job & initial dataShareBean.totalAmount"); dataShareBean.putData("totalAmount",0L); } } }
JobListener를 생성해 Job 시작전에 DataSourceBean의 map에 값을 초기화해줍니다.
특정 배치잡에만 필요한 listener로 사용하기 위해서 따로 빈으로 등록하지 않고 configuration class에서 빈으로 등록합니다. 필요한
dataShareBean
은 등록할때 DI 해줍니다.
5.결과
- 테스트 한 결과입니다.
- 데이터 10만개
- single : 323053831084 ns (약 320초)
- gridSize 5 : 8366703292 ns (약 8초)
- 데이터 100만개
- single : x ns (약 20분) -> 기록을 하지 않고 돌려서 약 20분 넘게 걸렸습니다.
- gridSize 5 : 18277477625 ns (약 18초)
- gridSize 20 : 12726939792 na (약 12초)
- 데이터 백만개에서 엄청난 차이가 발생합니다.
6. 주의사항
- gridSize, poolSize 를 10개 이상으로 가져갈때 주의사항입니다.
- SpringBoot hikari pool의 max pool size는 default가 10입니다.
- 설정 변경없이 10개이상으로 실행할시 sql 관련된 에러가 발생합니다.
- 해결방법
- properties 설정 변경
spring: datasource: hikari: maximumPoolSize : 20
7. 의문점
- step을 두개로 나누어서 TotalPay를 저장하는게 좋은 방법인가?
- 차라리 step을 한개로 두고 Listener의 afterJob에서 처리하는게 좋을지?
- 좋은 방법이 생긴다면 수정하겠습니다.
'Spring' 카테고리의 다른 글
Spring Batch 삽질기 2탄 (0) | 2021.06.22 |
---|---|
Spring Batch 삽질기 1탄 (0) | 2021.06.22 |
SpringBatch Jpa Test 설정 (0) | 2021.06.22 |
Spring Collection @Valid (0) | 2021.06.22 |
Spring Bean 생명주기 (0) | 2021.06.22 |
최근댓글