Spring Batch 삽질기 3탄

Spring / / 2021. 6. 22. 15:19

대부분 스프링 배치의 내용은 '기억보다는 기록을' 블로그를 참고하여 공부하였습니다.
배치에 대해 공부하고자 하시는분은 이 글 보다 아래 블로그를 보는게 훨씬 도움이 됩니다
https://jojoldu.tistory.com/

1탄 https://mizdevlog.tistory.com/12
2탄 https://mizdevlog.tistory.com/13
코드는 GitHub에 있습니다.

1. 기존상황

  • 2탄에서 @Component로 지정한 DataShareBean을 통해 step간 데이터 공유를 쉽게 만들었습니다.
  • 추가적으로 처음 작성할때 ConcurrentHashMap을 잘못사용했던 점을 고쳤습니다.

2. 개선사항

  • 이제 Step간에 병렬로 처리할때 발생하는 필드변수의 문제점은 해결하였으니 진짜 병렬로 처리해보고 성능 개선을 해보겠습니다.
  • 테스트를 위해서 1탄에서 사용한 Pay데이터를 10개, 10만개, 100만개 순으로 늘려서 적용해보았습니다.
  • 다양한 병렬 처리기능을 SpringBatch에서 지원하지만, 저는 아래에서 jojoldu님의 글을 참조하여 파티셔닝으로 처리해보겠습니다.

    https://jojoldu.tistory.com/550?category=902551

3. 시작

  • 파티셔닝

  • 매니저(마스터)를 이용해 데이터를 쪼개서 나눈 다음 파티션에서 슬레이브가 독립적으로 동작한다.

    • 데이터를 더 작은 Chunk 단위로 쪼개서 적용한다.
    • 기존의 코드에 대한 변경 없이 추가적인 코드 작성으로 병렬처리가 가능하다.

    4. JobConfiguration Class

    @Slf4j
    @Configuration
    @RequiredArgsConstructor
    public class PayTotalJobPartitionConfiguration {
    
      private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
      public static final String JOB_NAME = "PayTotalJobPartitionJob";
      public static final String BEAN_PREFIX = JOB_NAME + "_";
    
      private final JobBuilderFactory jobBuilderFactory;
      private final StepBuilderFactory stepBuilderFactory;
      private final EntityManagerFactory entityManagerFactory;
      private final PayRepository payRepository;
      private final TotalPayRepository totalPayRepository;
      private final DataShareBean<Long> dataShareBean;
    
      private final static int chunkSize = 1000;
      private final static int poolSize = 20;
    
      @Bean(name = JOB_NAME+"_taskPool")
      public TaskExecutor executor() {
          ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
          executor.setCorePoolSize(poolSize);
          executor.setMaxPoolSize(poolSize);
          executor.setThreadNamePrefix("partition-thread");
          executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
          executor.initialize();
          return executor;
      }
    
      @Bean(name = JOB_NAME+"_partitionHandler")
      public TaskExecutorPartitionHandler partitionHandler() {
          TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler(); // (1)
          partitionHandler.setStep(step1()); // (2)
          partitionHandler.setTaskExecutor(executor()); // (3)
          partitionHandler.setGridSize(poolSize); // (4)
          return partitionHandler;
      }
    
      @Bean(JOB_NAME)
      public Job job() {
          //listener beforeJob으로 변경
          //dataShareBean.putData("totalAmount", 0L);
          return jobBuilderFactory.get(JOB_NAME)
                  .listener(listener())
                  .start(step1Manager())
                  .next(step2(null))
                  //.preventRestart()
                  .build();
      }
    
      @Bean(BEAN_PREFIX + "listener")
      public JobExecutionListener listener() {
          return new TotalJobListener(dataShareBean);
      }
    
      @Bean(name = JOB_NAME +"_step1Manager")
      public Step step1Manager() {
          return stepBuilderFactory.get("step1.manager") // (1)
                  .partitioner("step1", partitioner(null)) // (2)
                  .step(step1()) // (3)
                  .partitionHandler(partitionHandler()) // (4)
                  .build();
      }
    
      @Bean(name = JOB_NAME +"_partitioner")
      @StepScope
      public PayIdRangePartitioner partitioner(
              @Value("#{jobParameters[requestDate]}") String requestDate) {
    
          return new PayIdRangePartitioner(payRepository,requestDate);
      }
    
      @Bean(BEAN_PREFIX + "step")
      public Step step1() {
          return stepBuilderFactory.get(BEAN_PREFIX + "step")
                  .<Pay, Pay>chunk(chunkSize)
                  .reader(reader(null,null))
                  .writer(writer())
                  .build();
      }
    
      @Bean(BEAN_PREFIX + "reader")
      @StepScope
      public JpaPagingItemReader<Pay> reader(
          @Value("#{stepExecutionContext[minId]}") Long minId,
          @Value("#{stepExecutionContext[maxId]}") Long maxId
      ) {
          Map<String, Object> params = new HashMap<>();
          params.put("minId", minId);
          params.put("maxId", maxId);
          return new JpaPagingItemReaderBuilder<Pay>()
                  .name(BEAN_PREFIX + "reader")
                  .entityManagerFactory(entityManagerFactory)
                  .pageSize(chunkSize)
                  .queryString("select p from Pay p where p.id BETWEEN :minId AND :maxId")
                  .parameterValues(params)
                  .build();
      }
    
      @Bean(BEAN_PREFIX + "writer")
      public ItemWriter<Pay> writer() {
          return list -> {
              for(Pay pay : list) {
                  dataShareBean.addData("totalAmount",pay.getAmount());
              }
          };
      }
    
      @Bean(BEAN_PREFIX + "step2")
      @JobScope
      public Step step2(@Value("#{jobParameters[requestDate]}") String requestDate) {
          return stepBuilderFactory.get(BEAN_PREFIX + "step2")
                  .tasklet((contribution, chunkContext) -> {
                  TotalPay(dataShareBean.getTotal(),requestDate);
                      TotalPay totalPay = new TotalPay(dataShareBean.getData("totalAmount"),requestDate);
    
                      totalPayRepository.save(totalPay);
    
                      return RepeatStatus.FINISHED;
                  }).build();
      }
    
    

}

#### 4.1 코드별 소개
- PartionHandler
```java
    @Bean(name = JOB_NAME+"_partitionHandler")
    public TaskExecutorPartitionHandler partitionHandler() {
        TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler(); // (1)
        partitionHandler.setStep(step1()); // (2)
        partitionHandler.setTaskExecutor(executor()); // (3)
        partitionHandler.setGridSize(poolSize); // (4)
        return partitionHandler;
    }
  1. TaskExecutorPartitionHandler 을 사용하여 멀티쓰레드로 활용할 수 있게 합니다.
  2. 사용할 step을 설정해줍니다.
  3. 멀티쓰레드를 실행하기 위해 executor를 설정합니다.
  4. gridSize와 멀티쓰레드를 같은 사이즈로 쓰기 위해 poolSize로 동일하게 사용합니다.

  • executor : TaskExcutor
      @Bean(name = JOB_NAME+"_taskPool")
      public TaskExecutor executor() {
          ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
          executor.setCorePoolSize(poolSize);
          executor.setMaxPoolSize(poolSize);
          executor.setThreadNamePrefix("partition-thread");
          executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
          executor.initialize();
          return executor;
      }
  1. partition을 멀티쓰레드로 돌릴수있게 poolsize등을 설정합니다.

  • StepManager()
      @Bean(name = JOB_NAME +"_step1Manager")
      public Step step1Manager() {
          return stepBuilderFactory.get("step1.manager") // (1)
                  .partitioner("step1", partitioner(null)) // (2)
                  .step(step1()) // (3)
                  .partitionHandler(partitionHandler()) // (4)
                  .build();
      }
  1. manager의 이름을 정합니다. 지금상황은 step1의 매니저이기때문에 "step1.manager"로 지정합니다.
  2. step1에 사용될 Partitioner 구현체를 등록합니다.
  3. 파니셔닝 될 step을 등록한다
  4. 파티셔닝을 관리한 handler를 등록합니다.

  • partitioner

      @Bean(name = JOB_NAME +"_partitioner")
      @StepScope
      public PayIdRangePartitioner partitioner(
              @Value("#{jobParameters[requestDate]}") String requestDate) {
    
          return new PayIdRangePartitioner(payRepository,requestDate); //(1)
      }
  1. 저는 요청날짜를 기반으로 그 날의 데이터들을 총합하는 배치를 하기 때문에 요청날짜를 넣어줍니다.

  • PayIdRangePartitioner

    @Slf4j
    @RequiredArgsConstructor
    public class PayIdRangePartitioner implements Partitioner {
    
      private final PayRepository payRepository;
      private final String requestDate;
    
      @Override
      public Map<String, ExecutionContext> partition(int gridSize) {
          Long max = payRepository.findMaxId(requestDate);
          Long min = payRepository.findMinId(requestDate);
    
          Long targetSize = (max - min) / gridSize + 1;
          log.info("max,min = {} {}", max,min);
          Map<String, ExecutionContext> result = new HashMap<>();
    
          long number = 0;
          long start = min;
          long end = start + targetSize - 1;
    
          while (start <= max) {
              ExecutionContext value = new ExecutionContext();
              result.put("partition" + number, value);
    
              if (end >= max) {
                  end = max;
              }
    
              value.putLong("minId", start);
              value.putLong("maxId", end);
              log.info("partion min max = {} {}",start,end);
              start += targetSize;
              end += targetSize;
              number++;
          }
    
          return result;
      }
    }
  • 요청 날짜의 모든 Pay 의 max Id 와 Min Id를 가져 온뒤 gridSize(poolSize)에 맞게 쪼개어서 Map<String, ExecutionContext>을 반환합니다.

  • 생성된 ExecutionContext에 맞춰 Worker Step들이 생성되어 그들의 Step Executions이 됩니다.


  • itemReader
      @Bean(BEAN_PREFIX + "reader")
      @StepScope
      public JpaPagingItemReader<Pay> reader(
          @Value("#{stepExecutionContext[minId]}") Long minId,
          @Value("#{stepExecutionContext[maxId]}") Long maxId
      ) {
          Map<String, Object> params = new HashMap<>();
          params.put("minId", minId);
          params.put("maxId", maxId);
          return new JpaPagingItemReaderBuilder<Pay>()
                  .name(BEAN_PREFIX + "reader")
                  .entityManagerFactory(entityManagerFactory)
                  .pageSize(chunkSize)
                  .queryString("select p from Pay p where p.id BETWEEN :minId AND :maxId")
                  .parameterValues(params)
                  .build();
      }
  • itemReader에서는 위에서 생성해준 Step Executions에 있는 max id와 min id를 받아옵니다.

  • TotalJobListener

    @RequiredArgsConstructor
    @Slf4j
    public class TotalJobListener extends JobExecutionListenerSupport {
    
      private final DataShareBean dataShareBean;
    
      @Override
      public void beforeJob(JobExecution jobExecution) {
          if(jobExecution.getStatus() == BatchStatus.STARTED) {
              log.info("start Job & initial dataShareBean.totalAmount");
              dataShareBean.putData("totalAmount",0L);
          }
      }
    }
  • JobListener를 생성해 Job 시작전에 DataSourceBean의 map에 값을 초기화해줍니다.

  • 특정 배치잡에만 필요한 listener로 사용하기 위해서 따로 빈으로 등록하지 않고 configuration class에서 빈으로 등록합니다. 필요한 dataShareBean은 등록할때 DI 해줍니다.


5.결과

  • 테스트 한 결과입니다.
  • 데이터 10만개
  • single : 323053831084 ns (약 320초)
    • gridSize 5 : 8366703292 ns (약 8초)
  • 데이터 100만개
  • single : x ns (약 20분) -> 기록을 하지 않고 돌려서 약 20분 넘게 걸렸습니다.
    • gridSize 5 : 18277477625 ns (약 18초)
    • gridSize 20 : 12726939792 na (약 12초)
  • 데이터 백만개에서 엄청난 차이가 발생합니다.

6. 주의사항

  • gridSize, poolSize 를 10개 이상으로 가져갈때 주의사항입니다.
  • SpringBoot hikari pool의 max pool size는 default가 10입니다.
  • 설정 변경없이 10개이상으로 실행할시 sql 관련된 에러가 발생합니다.
  • 해결방법
  • properties 설정 변경
    spring:
    datasource:
      hikari:
        maximumPoolSize : 20

7. 의문점

  1. step을 두개로 나누어서 TotalPay를 저장하는게 좋은 방법인가?
  2. 차라리 step을 한개로 두고 Listener의 afterJob에서 처리하는게 좋을지?
  • 좋은 방법이 생긴다면 수정하겠습니다.

'Spring' 카테고리의 다른 글

Spring Batch 삽질기 2탄  (0) 2021.06.22
Spring Batch 삽질기 1탄  (0) 2021.06.22
SpringBatch Jpa Test 설정  (0) 2021.06.22
Spring Collection @Valid  (0) 2021.06.22
Spring Bean 생명주기  (0) 2021.06.22
  • 네이버 블러그 공유하기
  • 네이버 밴드에 공유하기
  • 페이스북 공유하기
  • 카카오스토리 공유하기