아래 글은 한국 스프링 사용자 모임(KSUG)에서 진행된 스프링 배치 스터디 내용을 정리한 게시글입니다.
DEVOCEAN에 연재 중인 KIDO님의 글을 참고하여 실습한 내용을 기록했습니다.
원본: [SpringBatch 연재 09] 입맛에 맞는 배치 처리를 위한 Custom ItemReader/ItemWriter 구현방법 알아보기
스프링 배치를 사용할 때 비즈니스 로직에 맞는 배치 수행을 위해 커스텀이 필요합니다.
커스텀을 위해 두 가지의 실습으로 나누어 진행 해보았습니다.
- QueryDSL을 활용해 QuerydslPagingItemReader 클래스 구현하기
- CustomItemWriter 클래스를 구현하여 다른 서비스 클래스를 호출하는 샘플 만들기
1. QuerydslPagingItemReader 클래스 구현하기
QueryDSL을 사용하려면 의존성 추가가 필요합니다.
implementation 'com.querydsl:querydsl-jpa:5.0.0:jakarta'
annotationProcessor "com.querydsl:querydsl-apt:${dependencyManagement.importedProperties['querydsl.version']}:jakarta"
// querydsl QClass 파일 생성 위치를 지정
def generated = 'src/main/generated'
tasks.withType(JavaCompile) {
options.getGeneratedSourceOutputDirectory().set(file(generated))
}
// java source set 에 querydsl QClass 위치 추가
sourceSets {
main.java.srcDirs += [ generated ]
}
// gradle clean 시에 QClass 디렉토리 삭제
clean {
delete file(generated)
}
빌드시 /src/main/generated 폴더에 생성된 QCustomer 클래스를 볼 수 있습니다.
1.1. QuerydslPagingItemReader 클래스 만들기
public class QuerydslPagingItemReader<T> extends AbstractPagingItemReader<T> {
/**
* AbstractPagingItemReader은 어댑터 패턴으로, 상속받는 쪽은 doReadPage만 구현하면 된다.
*/
private EntityManager em;
private final Function<JPAQueryFactory, JPAQuery<T>> querySupplier;
private final Boolean alwaysReadFromZero;
public QuerydslPagingItemReader(EntityManagerFactory entityManagerFactory, Function<JPAQueryFactory, JPAQuery<T>> querySupplier, int chunkSize) {
this(ClassUtils.getShortName(QuerydslPagingItemReader.class), entityManagerFactory, querySupplier, chunkSize, false);
}
/**
* 생성자
* @param name ItemReader를 구분하기 위한 이름
* @param entityManagerFactory JPA를 이용하기 위해 entityManagerFactory를 전달함
* @param querySupplier JpaQeury를 생성하기 위한 Functional Interface
* @param chunkSize 한 번에 페이징 처리할 페이지 크기
* @param alwaysReadFromZero 항상 0부터 페이징을 읽을지 여부를 지정
* 만약 paging 처리된 데이터 자체를 수정하는 경우 배치처리 누락이 발생할 수 있으므로 이를 해결하기 위한 방안으로 사용됨
*/
public QuerydslPagingItemReader(String name, EntityManagerFactory entityManagerFactory, Function<JPAQueryFactory, JPAQuery<T>> querySupplier, int chunkSize, Boolean alwaysReadFromZero) {
super.setPageSize(chunkSize);
setName(name);
this.querySupplier = querySupplier;
this.em = entityManagerFactory.createEntityManager();
this.alwaysReadFromZero = alwaysReadFromZero;
}
/**
* 기본적으로 AbstractPagingItemReader에 자체 구현되어 있지만 EntityManager 자원을 해제하기 위해 em.close()를 수행한다.
*/
@Override
protected void doClose() throws Exception {
if (em != null)
em.close();
super.doClose();
}
/**
* 실제로 구현해야 할 추상 메서드
*/
@Override
protected void doReadPage() {
initQueryResult();
JPAQueryFactory jpaQueryFactory = new JPAQueryFactory(em); // 함수형 인터페이스로 지정된 queryDSL에 적용할 QueryFactory
long offset = 0;
// alwaysReadFromZero가 false라면 offset과 limit을 계속 이동하면서 조회하도록 offset을 계산한다.
if (!alwaysReadFromZero) {
offset = (long) getPage() * getPageSize();
}
JPAQuery<T> query = querySupplier.apply(jpaQueryFactory).offset(offset).limit(getPageSize());
// 우리가 제공한 querySupplier에 JPAQueryFactory를 적용하여 JPAQuery를 생성하도록 한다.
// 페이징을 위해 offset, limit을 계산된 offset 과 pageSize(청크 크기)를 지정하여 페이징 처리를 하도록 한다.
List<T> queryResult = query.fetch();
// 결과를 fetch 후 fetch된 내역을 result에 담는다.
for (T entity: queryResult) {
em.detach(entity); // 변경이 실제 DB에 반영되지 않도록 영속성 객체에서 제외 시킨다.
results.add(entity);
}
}
private void initQueryResult() {
if (CollectionUtils.isEmpty(results)) {
results = new CopyOnWriteArrayList<>();
} else {
results.clear();
}
}
}
이렇게 생성할 수도 있고, Builder 패턴으로 생성도 가능한데,
Builder 패턴으로 구현한 방법은 원 게시글의 코드를 참고 해주시면 됩니다!
1.2. Config 클래스 생성
@Slf4j
@Configuration
public class QueryDSLPagingReaderJobConfig {
/**
* CHUNK 크기를 지정한다.
*/
public static final int CHUNK_SIZE = 2;
public static final String ENCODING = "UTF-8";
public static final String QUERYDSL_PAGING_CHUNK_JOB = "QUERYDSL_PAGING_CHUNK_JOB";
@Autowired
DataSource dataSource;
@Autowired
EntityManagerFactory entityManagerFactory;
// @Bean
// public QuerydslPagingItemReader<Customer> customerQuerydslPagingItemReader() throws Exception {
//
// Function<JPAQueryFactory, JPAQuery<Customer>> query = jpaQueryFactory -> jpaQueryFactory.select(QCustomer.customer).from(QCustomer.customer);
//
// return new QuerydslPagingItemReader<>("customerQuerydslPagingItemReader", entityManagerFactory, query, CHUNK_SIZE, false);
// }
/**
* QuerydslPagingItemReaderBuilder 를 사용하여 Querydsl 기반의 페이징 가능한 ItemReader
*/
@Bean
public QuerydslPagingItemReader<Customer> customerQuerydslPagingItemReader() {
return new QuerydslPagingItemReaderBuilder<Customer>()
.name("customerQuerydslPagingItemReader")
.entityManagerFactory(entityManagerFactory)
// 페이징 처리 시 한 번에 처리할 데이터의 크기 설정 (2개씩 처리)
.chunkSize(2)
// Querydsl을 사용하여 동적 쿼리 정의
.querySupplier(jpaQueryFactory ->
// Querydsl의 JPAQueryFactory를 통해 쿼리를 정의
jpaQueryFactory.select(QCustomer.customer)
.from(QCustomer.customer)
.where(QCustomer.customer.age.gt(20)))
.build();
}
@Bean
public FlatFileItemWriter<Customer> customerQuerydslFlatFileItemWriter() {
return new FlatFileItemWriterBuilder<Customer>()
.name("customerQuerydslFlatFileItemWriter")
.resource(new FileSystemResource("./output/task09_customer_new_v2.csv"))
.encoding(ENCODING)
.delimited().delimiter("\\t")
.names("Name", "Age", "Gender")
.build();
}
@Bean
public Step customerQuerydslPagingStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
log.info("------------------ Init customerQuerydslPagingStep -----------------");
return new StepBuilder("customerJpaPagingStep", jobRepository)
.<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
.reader(customerQuerydslPagingItemReader())
.processor(new CustomerItemProcessor())
.writer(customerQuerydslFlatFileItemWriter())
.build();
}
@Bean
public Job customerJpaPagingJob(Step customerJdbcPagingStep, JobRepository jobRepository) {
log.info("------------------ Init customerJpaPagingJob -----------------");
return new JobBuilder(QUERYDSL_PAGING_CHUNK_JOB, jobRepository)
.incrementer(new RunIdIncrementer())
.start(customerJdbcPagingStep)
.build();
}
}
1.3. 결과
실행 결과는 이전 실습들과 동일합니다.
Alice 30 F
Bob 45 M
Charlie 25 M
Diana 29 F
Evan 35 M
Fiona 40 F
George 55 M
Hannah 32 F
Alice 30 F
Bob 45 M
Charlie 25 M
Diana 29 F
Evan 35 M
Fiona 40 F
George 55 M
Hannah 32 F
2. CustomItemWriter 만들기
간단하게 만들기 위해 log만 찍을 수 있도록 구현하였습니다.
@Slf4j
@Service
public class CustomService {
public Map<String, String> processToOtherService(Customer item) {
log.info("Call API to OtherService....");
return Map.of("code", "200", "message", "OK");
}
}
@Slf4j
@Component
public class CustomItemWriter implements ItemWriter<Customer> {
private final CustomService customService;
public CustomItemWriter(CustomService customService) {
this.customService = customService;
}
@Override
public void write(Chunk<? extends Customer> chunk) throws Exception {
for (Customer customer: chunk) {
log.info("Call Porcess in CustomItemWriter...");
Map<String, String> outputMap = customService.processToOtherService(customer);
log.info("Customer OutputMap: " + outputMap);
}
}
}
Config 클래스 생성
@Slf4j
@Configuration
public class MybatisItemWriterJobConfig {
/**
* CHUNK 크기를 지정한다.
*/
public static final int CHUNK_SIZE = 100;
public static final String ENCODING = "UTF-8";
public static final String MY_BATIS_ITEM_WRITER = "MY_BATIS_ITEM_WRITER";
@Autowired
DataSource dataSource;
@Autowired
SqlSessionFactory sqlSessionFactory;
@Autowired
CustomItemWriter customItemWriter;
@Bean
public FlatFileItemReader<Customer> flatFileItemReader() {
return new FlatFileItemReaderBuilder<Customer>()
.name("FlatFileItemReader")
.resource(new ClassPathResource("./customer.csv"))
.encoding(ENCODING)
.delimited().delimiter(",")
.names("name", "age", "gender")
.targetType(Customer.class)
.build();
}
@Bean
public Step flatFileStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
log.info("------------------ Init flatFileStep -----------------");
return new StepBuilder("flatFileStep", jobRepository)
.<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
.reader(flatFileItemReader())
.writer(customItemWriter)
.build();
}
@Bean
public Job flatFileJob(Step flatFileStep, JobRepository jobRepository) {
log.info("------------------ Init flatFileJob -----------------");
return new JobBuilder(MY_BATIS_ITEM_WRITER, jobRepository)
.incrementer(new RunIdIncrementer())
.start(flatFileStep)
.build();
}
}
실습 결과
- 실습 파일(customer.csv)
Alice,30,F
Bob,45,M
2024-12-14T11:13:23.744+09:00 INFO 90154 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=MY_BATIS_ITEM_WRITER]] launched with the following parameters: [{'run.id':'{value=3, type=class java.lang.Long, identifying=true}'}]
2024-12-14T11:13:23.767+09:00 INFO 90154 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [flatFileStep]
2024-12-14T11:13:23.782+09:00 INFO 90154 --- [ main] c.e.b.jobs.task10.CustomItemWriter : Call Porcess in CustomItemWriter...
2024-12-14T11:13:23.782+09:00 INFO 90154 --- [ main] c.e.b.jobs.task10.CustomService : Call API to OtherService....
2024-12-14T11:13:23.783+09:00 INFO 90154 --- [ main] c.e.b.jobs.task10.CustomItemWriter : Customer OutputMap: {message=OK, code=200}
2024-12-14T11:13:23.783+09:00 INFO 90154 --- [ main] c.e.b.jobs.task10.CustomItemWriter : Call Porcess in CustomItemWriter...
2024-12-14T11:13:23.783+09:00 INFO 90154 --- [ main] c.e.b.jobs.task10.CustomService : Call API to OtherService....
2024-12-14T11:13:23.783+09:00 INFO 90154 --- [ main] c.e.b.jobs.task10.CustomItemWriter : Customer OutputMap: {message=OK, code=200}
2024-12-14T11:13:23.788+09:00 INFO 90154 --- [ main] o.s.batch.core.step.AbstractStep : Step: [flatFileStep] executed in 21ms
2024-12-14T11:13:23.798+09:00 INFO 90154 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=MY_BATIS_ITEM_WRITER]] completed with the following parameters: [{'run.id':'{value=3, type=class java.lang.Long, identifying=true}'}] and the following status: [COMPLETED] in 44ms
customer.csv 파일에 저장되어있는 인원의 수만큼 로그가 남겨지는 것을 볼 수 있습니다.
'알아두면 좋은 개발 지식 > Spring Batch 스터디' 카테고리의 다른 글
[10회차] 스프링배치 플로우 컨트롤 하기 (3) | 2024.12.16 |
---|---|
[8회차] CompositeItemProcessor 으로 여러단계에 걸쳐 데이터 Transform하기 (1) | 2024.11.28 |
[7회차] Spring Batch 스터디: 후기 및 추가 학습 내용 (4) | 2024.11.20 |
[7회차] MyBatisPagingItemReader로 DB내용을 읽고, MyBatisItemWriter로 DB에 쓰기 (3) | 2024.11.16 |
[6회차] Spring Batch 스터디: 후기 및 추가 학습 내용 (2) | 2024.11.16 |