[9회차] 입맛에 맞는 배치 처리를 위한 Custom ItemReader/ItemWriter 구현방법 알아보기

아래 글은 한국 스프링 사용자 모임(KSUG)에서 진행된 스프링 배치 스터디 내용을 정리한 게시글입니다.
DEVOCEAN에 연재 중인 KIDO님의 글을 참고하여 실습한 내용을 기록했습니다.

 

원본: [SpringBatch 연재 09] 입맛에 맞는 배치 처리를 위한 Custom ItemReader/ItemWriter 구현방법 알아보기

 

[SpringBatch 연재 09] 입맛에 맞는 배치 처리를 위한 Custom ItemReader/ItemWriter 구현방법 알아보기

 

devocean.sk.com

 

스프링 배치를 사용할 때 비즈니스 로직에 맞는 배치 수행을 위해 커스텀이 필요합니다.

커스텀을 위해 두 가지의 실습으로 나누어 진행 해보았습니다.

  1. QueryDSL을 활용해 QuerydslPagingItemReader 클래스 구현하기
  2. CustomItemWriter 클래스를 구현하여 다른 서비스 클래스를 호출하는 샘플 만들기

 

1. QuerydslPagingItemReader 클래스 구현하기

QueryDSL을 사용하려면 의존성 추가가 필요합니다.

implementation 'com.querydsl:querydsl-jpa:5.0.0:jakarta'
annotationProcessor "com.querydsl:querydsl-apt:${dependencyManagement.importedProperties['querydsl.version']}:jakarta"
// querydsl QClass 파일 생성 위치를 지정
def generated = 'src/main/generated'
tasks.withType(JavaCompile) {
	options.getGeneratedSourceOutputDirectory().set(file(generated))
}

// java source set 에 querydsl QClass 위치 추가
sourceSets {
	main.java.srcDirs += [ generated ]
}

// gradle clean 시에 QClass 디렉토리 삭제
clean {
	delete file(generated)
}

빌드시 /src/main/generated 폴더에 생성된 QCustomer 클래스를 볼 수 있습니다.

 

1.1. QuerydslPagingItemReader 클래스 만들기

public class QuerydslPagingItemReader<T> extends AbstractPagingItemReader<T> {
    /**
     * AbstractPagingItemReader은 어댑터 패턴으로, 상속받는 쪽은 doReadPage만 구현하면 된다.
     */

    private EntityManager em;
    private final Function<JPAQueryFactory, JPAQuery<T>> querySupplier;

    private final Boolean alwaysReadFromZero;

    public QuerydslPagingItemReader(EntityManagerFactory entityManagerFactory, Function<JPAQueryFactory, JPAQuery<T>> querySupplier, int chunkSize) {
        this(ClassUtils.getShortName(QuerydslPagingItemReader.class), entityManagerFactory, querySupplier, chunkSize, false);
    }

    /**
     * 생성자
     * @param name ItemReader를 구분하기 위한 이름
     * @param entityManagerFactory JPA를 이용하기 위해 entityManagerFactory를 전달함
     * @param querySupplier JpaQeury를 생성하기 위한 Functional Interface
     * @param chunkSize 한 번에 페이징 처리할 페이지 크기
     * @param alwaysReadFromZero 항상 0부터 페이징을 읽을지 여부를 지정
     *                           만약 paging 처리된 데이터 자체를 수정하는 경우 배치처리 누락이 발생할 수 있으므로 이를 해결하기 위한 방안으로 사용됨
     */
    public QuerydslPagingItemReader(String name, EntityManagerFactory entityManagerFactory, Function<JPAQueryFactory, JPAQuery<T>> querySupplier, int chunkSize, Boolean alwaysReadFromZero) {
        super.setPageSize(chunkSize);
        setName(name);
        this.querySupplier = querySupplier;
        this.em = entityManagerFactory.createEntityManager();
        this.alwaysReadFromZero = alwaysReadFromZero;

    }

    /**
     * 기본적으로 AbstractPagingItemReader에 자체 구현되어 있지만 EntityManager 자원을 해제하기 위해 em.close()를 수행한다.
     */
    @Override
    protected void doClose() throws Exception {
        if (em != null)
            em.close();
        super.doClose();
    }

    /**
     * 실제로 구현해야 할 추상 메서드
     */
    @Override
    protected void doReadPage() {
        initQueryResult();

        JPAQueryFactory jpaQueryFactory = new JPAQueryFactory(em); // 함수형 인터페이스로 지정된 queryDSL에 적용할 QueryFactory
        long offset = 0;
        // alwaysReadFromZero가 false라면 offset과 limit을 계속 이동하면서 조회하도록 offset을 계산한다.
        if (!alwaysReadFromZero) {
            offset = (long) getPage() * getPageSize();
        }

        JPAQuery<T> query = querySupplier.apply(jpaQueryFactory).offset(offset).limit(getPageSize());
        // 우리가 제공한 querySupplier에 JPAQueryFactory를 적용하여 JPAQuery를 생성하도록 한다.
        // 페이징을 위해 offset, limit을 계산된 offset 과 pageSize(청크 크기)를 지정하여 페이징 처리를 하도록 한다.

        List<T> queryResult = query.fetch();
        // 결과를 fetch 후 fetch된 내역을 result에 담는다.

        for (T entity: queryResult) {
            em.detach(entity); // 변경이 실제 DB에 반영되지 않도록 영속성 객체에서 제외 시킨다.
            results.add(entity);
        }
    }

    private void initQueryResult() {
        if (CollectionUtils.isEmpty(results)) {
            results = new CopyOnWriteArrayList<>();
        } else {
            results.clear();
        }
    }
}

 

이렇게 생성할 수도 있고, Builder 패턴으로 생성도 가능한데,

Builder 패턴으로 구현한 방법은 원 게시글의 코드를 참고 해주시면 됩니다!

 

1.2. Config 클래스 생성

@Slf4j
@Configuration
public class QueryDSLPagingReaderJobConfig {

    /**
     * CHUNK 크기를 지정한다.
     */
    public static final int CHUNK_SIZE = 2;
    public static final String ENCODING = "UTF-8";
    public static final String QUERYDSL_PAGING_CHUNK_JOB = "QUERYDSL_PAGING_CHUNK_JOB";

    @Autowired
    DataSource dataSource;

    @Autowired
    EntityManagerFactory entityManagerFactory;

//    @Bean
//    public QuerydslPagingItemReader<Customer> customerQuerydslPagingItemReader() throws Exception {
//
//        Function<JPAQueryFactory, JPAQuery<Customer>> query = jpaQueryFactory -> jpaQueryFactory.select(QCustomer.customer).from(QCustomer.customer);
//
//        return new QuerydslPagingItemReader<>("customerQuerydslPagingItemReader", entityManagerFactory, query, CHUNK_SIZE, false);
//    }

    /**
     * QuerydslPagingItemReaderBuilder 를 사용하여 Querydsl 기반의 페이징 가능한 ItemReader
     */
    @Bean
    public QuerydslPagingItemReader<Customer> customerQuerydslPagingItemReader() {
        return new QuerydslPagingItemReaderBuilder<Customer>()
                .name("customerQuerydslPagingItemReader")
                .entityManagerFactory(entityManagerFactory)
                // 페이징 처리 시 한 번에 처리할 데이터의 크기 설정 (2개씩 처리)
                .chunkSize(2)
                // Querydsl을 사용하여 동적 쿼리 정의
                .querySupplier(jpaQueryFactory ->
                        // Querydsl의 JPAQueryFactory를 통해 쿼리를 정의
                        jpaQueryFactory.select(QCustomer.customer)
                                .from(QCustomer.customer)
                                .where(QCustomer.customer.age.gt(20)))
                .build();
    }

    @Bean
    public FlatFileItemWriter<Customer> customerQuerydslFlatFileItemWriter() {

        return new FlatFileItemWriterBuilder<Customer>()
                .name("customerQuerydslFlatFileItemWriter")
                .resource(new FileSystemResource("./output/task09_customer_new_v2.csv"))
                .encoding(ENCODING)
                .delimited().delimiter("\\t")
                .names("Name", "Age", "Gender")
                .build();
    }

    @Bean
    public Step customerQuerydslPagingStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
        log.info("------------------ Init customerQuerydslPagingStep -----------------");

        return new StepBuilder("customerJpaPagingStep", jobRepository)
                .<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
                .reader(customerQuerydslPagingItemReader())
                .processor(new CustomerItemProcessor())
                .writer(customerQuerydslFlatFileItemWriter())
                .build();
    }

    @Bean
    public Job customerJpaPagingJob(Step customerJdbcPagingStep, JobRepository jobRepository) {
        log.info("------------------ Init customerJpaPagingJob -----------------");
        return new JobBuilder(QUERYDSL_PAGING_CHUNK_JOB, jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(customerJdbcPagingStep)
                .build();
    }
}

 

1.3. 결과

실행 결과는 이전 실습들과 동일합니다.

Alice	30	F
Bob	45	M
Charlie	25	M
Diana	29	F
Evan	35	M
Fiona	40	F
George	55	M
Hannah	32	F
Alice	30	F
Bob	45	M
Charlie	25	M
Diana	29	F
Evan	35	M
Fiona	40	F
George	55	M
Hannah	32	F

2. CustomItemWriter 만들기

간단하게 만들기 위해 log만 찍을 수 있도록 구현하였습니다.

@Slf4j
@Service
public class CustomService {

    public Map<String, String> processToOtherService(Customer item) {

        log.info("Call API to OtherService....");

        return Map.of("code", "200", "message", "OK");
    }
}
@Slf4j
@Component
public class CustomItemWriter implements ItemWriter<Customer> {

    private final CustomService customService;

    public CustomItemWriter(CustomService customService) {
        this.customService = customService;
    }

    @Override
    public void write(Chunk<? extends Customer> chunk) throws Exception {
        for (Customer customer: chunk) {
            log.info("Call Porcess in CustomItemWriter...");
            Map<String, String> outputMap = customService.processToOtherService(customer);
            log.info("Customer OutputMap: " + outputMap);
        }
    }
}

 

Config 클래스 생성

@Slf4j
@Configuration
public class MybatisItemWriterJobConfig {

    /**
     * CHUNK 크기를 지정한다.
     */
    public static final int CHUNK_SIZE = 100;
    public static final String ENCODING = "UTF-8";
    public static final String MY_BATIS_ITEM_WRITER = "MY_BATIS_ITEM_WRITER";

    @Autowired
    DataSource dataSource;

    @Autowired
    SqlSessionFactory sqlSessionFactory;

    @Autowired
    CustomItemWriter customItemWriter;

    @Bean
    public FlatFileItemReader<Customer> flatFileItemReader() {

        return new FlatFileItemReaderBuilder<Customer>()
                .name("FlatFileItemReader")
                .resource(new ClassPathResource("./customer.csv"))
                .encoding(ENCODING)
                .delimited().delimiter(",")
                .names("name", "age", "gender")
                .targetType(Customer.class)
                .build();
    }

    @Bean
    public Step flatFileStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        log.info("------------------ Init flatFileStep -----------------");

        return new StepBuilder("flatFileStep", jobRepository)
                .<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
                .reader(flatFileItemReader())
                .writer(customItemWriter)
                .build();
    }

    @Bean
    public Job flatFileJob(Step flatFileStep, JobRepository jobRepository) {
        log.info("------------------ Init flatFileJob -----------------");
        return new JobBuilder(MY_BATIS_ITEM_WRITER, jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(flatFileStep)
                .build();
    }
}

실습 결과

  • 실습 파일(customer.csv)
Alice,30,F
Bob,45,M
2024-12-14T11:13:23.744+09:00  INFO 90154 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=MY_BATIS_ITEM_WRITER]] launched with the following parameters: [{'run.id':'{value=3, type=class java.lang.Long, identifying=true}'}]
2024-12-14T11:13:23.767+09:00  INFO 90154 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [flatFileStep]
2024-12-14T11:13:23.782+09:00  INFO 90154 --- [           main] c.e.b.jobs.task10.CustomItemWriter       : Call Porcess in CustomItemWriter...
2024-12-14T11:13:23.782+09:00  INFO 90154 --- [           main] c.e.b.jobs.task10.CustomService          : Call API to OtherService....
2024-12-14T11:13:23.783+09:00  INFO 90154 --- [           main] c.e.b.jobs.task10.CustomItemWriter       : Customer OutputMap: {message=OK, code=200}
2024-12-14T11:13:23.783+09:00  INFO 90154 --- [           main] c.e.b.jobs.task10.CustomItemWriter       : Call Porcess in CustomItemWriter...
2024-12-14T11:13:23.783+09:00  INFO 90154 --- [           main] c.e.b.jobs.task10.CustomService          : Call API to OtherService....
2024-12-14T11:13:23.783+09:00  INFO 90154 --- [           main] c.e.b.jobs.task10.CustomItemWriter       : Customer OutputMap: {message=OK, code=200}
2024-12-14T11:13:23.788+09:00  INFO 90154 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [flatFileStep] executed in 21ms
2024-12-14T11:13:23.798+09:00  INFO 90154 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=MY_BATIS_ITEM_WRITER]] completed with the following parameters: [{'run.id':'{value=3, type=class java.lang.Long, identifying=true}'}] and the following status: [COMPLETED] in 44ms

 

customer.csv 파일에 저장되어있는 인원의 수만큼 로그가 남겨지는 것을 볼 수 있습니다.