아래 글은 한국 스프링 사용자 모임(KSUG)에서 진행된 스프링 배치 스터디 내용을 정리한 게시글입니다.
DEVOCEAN에 연재 중인 KIDO님의 글을 참고하여 실습한 내용을 기록했습니다.
원본: [SpringBatch 연재 06] JpaPagingItemReader로 DB내용을 읽고, JpaItemWriter로 DB에 쓰기
지난 시간에는 JdbcPagingItemReader/Writer에 대해 배웠습니다.
본격적으로 공부하기 전, 두 클래스가 조금 헷갈려서 정리를 먼저 하고 진행하였습니다.
JdbcPagingItemReader<T>와 JpaPagingItemReader<T>는 둘 다 AbstractPagingItemReader<T>를 상속받고, InitializingBean 인터페이스를 구현하고 있습니다.
JdbcPagingItemReader와 JpaPagingItemReader는 모두 페이징 방식으로 대량의 데이터를 읽기 위한 기능을 제공하지만, JdbcPagingItemReader가 좀 더 로우 레벨에 가까운 방식으로 동작합니다.
둘의 차이는 Jdbc와 Jpa를 사용한다는 메서드 명에서부터 알 수 있습니다.
JdbcPagingItemReader와 JpaPagingItemReader의 주요 차이점
- 데이터 접근 방식
- JdbcPagingItemReader: JDBC(Java Database Connectivity)를 사용하여 SQL 쿼리를 직접 실행하므로, SQL을 직접 작성하고 실행해야 합니다.
- JpaPagingItemReader: JPA(Java Persistence API)를 통해 데이터베이스에 접근하며, 엔터티 매핑을 활용하여 객체와 테이블 간의 관계를 자동으로 관리합니다.
- 사용 목적
- JdbcPagingItemReader는 SQL 쿼리 최적화가 필요하거나, JPA로 처리하기 어려운 복잡한 쿼리를 실행할 때 적합합니다. 데이터베이스의 특성을 최대한 활용해 성능을 세밀하게 조정하고자 할 때 유리합니다.
- JpaPagingItemReader는 객체 지향적인 데이터 접근 방식이 필요한 경우에 적합하며, 엔터티 매핑과 관계 관리를 통해 개발자가 직접 SQL을 작성하지 않아도 되는 편리함을 제공합니다.
JpaItemReader/Writer는 다음과 같은 장단점을 가지고 있습니다.
장점
- ORM 연동: JPA를 사용하여 다양한 데이터베이스에 데이터를 저장할 수 있습니다.
- 객체 매핑: 엔터티 객체를 직접 저장함으로써 코드가 간결해집니다.
- 유연성: 다양한 설정을 통해 원하는 방식으로 데이터를 저장할 수 있습니다. 또한 데이터베이스 종류 선택에 자유롭습니다.
단점
- 설정 복잡성: JPA 설정 및 쿼리 작성이 복잡할 수 있습니다.
- 오류 가능성: 설정 오류가 발생할 경우 데이터 손상의 위험이 있습니다.
JpaPagingItemReader 개요
- JPA를 사용하여 데이터베이스로부터 데이터를 Page 단위로 읽습니다.
- JPA 기능 활용: JPA 엔티티와 자동 객체 매핑을 통해 효율적으로 데이터를 처리할 수 있습니다.
- 쿼리 최적화: JPA 쿼리 기능을 사용해 성능을 최적화하여 데이터를 읽어올 수 있습니다.
- 커서 제어: JPA Criteria API로 데이터 순회 방식을 제어할 수 있습니다.
JpaPagingItemReader 주요 구성 요소
- EntityManagerFactory: JPA 엔티티 매니저 팩토리를 설정합니다.
- JpaQueryProvider: 데이터를 읽기 위한 JPA 쿼리를 제공합니다.
- PageSize: 페이지 크기를 설정합니다.
- SkippableItemReader: 오류 발생 시 해당 아이템을 건너뛸 수 있도록 합니다.
- ReadListener: 읽기 시작, 종료, 오류 발생 시 이벤트를 처리할 수 있도록 합니다.
- SaveStateCallback: 잡 중단 시 현재 상태를 저장하여 재시작 시 이어서 처리할 수 있도록 합니다.
실습 하기
DB에 저장 되어있는 customer 중 20살 이상인 사람들을 추출 하는 예제를 실습해보겠습니다.
사용한 테이블 내용은 다음을 참고해주세요.
mysql> select * from customer;
+----+---------+------+--------+
| id | name | age | gender |
+----+---------+------+--------+
| 11 | Alice | 30 | F |
| 12 | Bob | 45 | M |
| 13 | Charlie | 25 | M |
| 14 | Diana | 29 | F |
| 15 | Evan | 35 | M |
| 16 | Fiona | 40 | F |
| 17 | George | 55 | M |
| 18 | Hannah | 32 | F |
+----+---------+------+--------+
8 rows in set (0.01 sec)
Cusomer 엔티티 클래스를 생성합니다.
@Entity
@Table(name = "customer")
@NoArgsConstructor
@AllArgsConstructor
@Data
public class Customer {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY) // MySQL 실습 환경에서 진행하고 있습니다.
private int id;
private String name;
private Integer age;
private String gender;
}
작동 순서를 보기 위해 로그만 출력하는 프로세서를 만들었습니다.
/**
* 작동 순서를 보기 위해 전달 받은 Customer 객체에 대한 로그만 출력한다.
*/
@Slf4j
public class CustomerItemProcessor implements ItemProcessor<Customer, Customer> {
@Override
public Customer process(Customer item) throws Exception {
log.info("Item Processor ------------------- {}", item);
return item;
}
}
전체 코드
@Slf4j
@Configuration
public class JpaPagingItemConfig {
public static final int CHUNK_SIZE = 2;
public static final String ENCODING = "UTF-8";
public static final String JPA_PAGING_CHUNK_JOB = "JPA_PAGING_CHUNK_JOB";
@Autowired
DataSource dataSource;
@Autowired
EntityManagerFactory entityManagerFactory;
@Bean
public JpaPagingItemReader<Customer> task06CustomerJpaPagingItemReader() throws Exception {
JpaPagingItemReader<Customer> jpaPagingItemReader = new JpaPagingItemReader<>();
// JPQL 쿼리를 이용하여 20살 이상을 가져오도록 한다.
jpaPagingItemReader.setQueryString(
"SELECT c FROM Customer c WHERE c.age > :age order by id desc"
);
// 엔티티 매니저 지정
jpaPagingItemReader.setEntityManagerFactory(entityManagerFactory);
// 한 번에 읽어올 Page 크기 지정. 보통 청크 크기와 맞춰준다.
jpaPagingItemReader.setPageSize(CHUNK_SIZE);
// JPQL 쿼리에 전달할 파라미터 지정
jpaPagingItemReader.setParameterValues(Collections.singletonMap("age", 20));
return jpaPagingItemReader;
}
/**
* Builder를 이용하는 방법
* 일반 방식과 동일하다.
*/
@Bean
public JpaPagingItemReader<Customer> task06CustomerJpaPagingItemReaderBuilder() throws Exception {
return new JpaPagingItemReaderBuilder<Customer>()
.name("customerJpaPagingItemReader")
.queryString("SELECT c FROM Customer c WHERE c.age > :age order by id desc")
.pageSize(CHUNK_SIZE)
.entityManagerFactory(entityManagerFactory)
.parameterValues(Collections.singletonMap("age", 20))
.build();
}
@Bean
public FlatFileItemWriter<Customer> task06CustomerJpaFlatFileItemWriter() {
log.info("------------------ task06CustomerJpaFlatFileItemWriter -----------------");
return new FlatFileItemWriterBuilder<Customer>()
.name("task06CustomerJpaFlatFileItemWriter")
.resource(new FileSystemResource("./output/task_06_customer_with_jpa.csv"))
.encoding(ENCODING)
.delimited().delimiter("\\t")
.names("Name", "Age", "Gender")
.build();
}
@Bean
public Step customerJpaPagingStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
log.info("------------------ Init customerJpaPagingStep -----------------");
return new StepBuilder("customerJpaPagingStep", jobRepository)
.<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
.reader(task06CustomerJpaPagingItemReaderBuilder())
.processor(new CustomerItemProcessor())
.writer(task06CustomerJpaFlatFileItemWriter())
.build();
}
@Bean
public Job customerJpaPagingJob(Step customerJpaPagingStep, JobRepository jobRepository) {
log.info("------------------ Init customerJpaPagingJob -----------------");
return new JobBuilder(JPA_PAGING_CHUNK_JOB, jobRepository)
.incrementer(new RunIdIncrementer())
.start(customerJpaPagingStep)
.build();
}
}
실행 결과
2024-11-08T16:38:59.942+09:00 INFO 78872 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1371 ms
2024-11-08T16:38:59.966+09:00 INFO 78872 --- [ main] c.e.b.jobs.task01.GreetingTasklet : ----------------- After Properites Sets() --------------
2024-11-08T16:38:59.977+09:00 INFO 78872 --- [ main] c.e.b.j.t.BasicTaskJobConfiguration : ------------------ Init myStep -----------------
2024-11-08T16:38:59.987+09:00 INFO 78872 --- [ main] c.e.b.j.t.BasicTaskJobConfiguration : ------------------ Init myJob -----------------
2024-11-08T16:39:00.002+09:00 INFO 78872 --- [ main] c.e.b.j.task06.JpaPagingReaderJobConfig : ------------------ task06CustomerJpaFlatFileItemWriter -----------------
2024-11-08T16:39:00.002+09:00 INFO 78872 --- [ main] c.e.b.j.task06.JpaPagingReaderJobConfig : ------------------ Init customerJpaPagingStep -----------------
2024-11-08T16:39:00.009+09:00 INFO 78872 --- [ main] c.e.b.j.task06.JpaPagingReaderJobConfig : ------------------ Init customerJpaPagingJob -----------------
2024-11-08T16:39:00.023+09:00 WARN 78872 --- [ main] JpaBaseConfiguration$JpaWebConfiguration : spring.jpa.open-in-view is enabled by default. Therefore, database queries may be performed during view rendering. Explicitly configure spring.jpa.open-in-view to disable this warning
2024-11-08T16:39:00.207+09:00 INFO 78872 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port 8080 (http) with context path '/'
2024-11-08T16:39:00.212+09:00 INFO 78872 --- [ main] c.e.batch_sample.BatchSampleApplication : Started BatchSampleApplication in 1.817 seconds (process running for 2.041)
2024-11-08T16:39:00.213+09:00 INFO 78872 --- [ main] o.s.b.a.b.JobLauncherApplicationRunner : Running default command line with: []
2024-11-08T16:39:00.293+09:00 INFO 78872 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=JPA_PAGING_CHUNK_JOB]] launched with the following parameters: [{'run.id':'{value=8, type=class java.lang.Long, identifying=true}'}]
2024-11-08T16:39:00.317+09:00 INFO 78872 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [customerJpaPagingStep]
2024-11-08T16:39:00.468+09:00 INFO 78872 --- [ main] c.e.b.jobs.task06.CustomerItemProcessor : Item Processor ------------------- Customer(id=18, name=Hannah, age=32, gender=F)
2024-11-08T16:39:00.469+09:00 INFO 78872 --- [ main] c.e.b.jobs.task06.CustomerItemProcessor : Item Processor ------------------- Customer(id=17, name=George, age=55, gender=M)
2024-11-08T16:39:00.481+09:00 INFO 78872 --- [ main] c.e.b.jobs.task06.CustomerItemProcessor : Item Processor ------------------- Customer(id=16, name=Fiona, age=40, gender=F)
2024-11-08T16:39:00.482+09:00 INFO 78872 --- [ main] c.e.b.jobs.task06.CustomerItemProcessor : Item Processor ------------------- Customer(id=15, name=Evan, age=35, gender=M)
2024-11-08T16:39:00.492+09:00 INFO 78872 --- [ main] c.e.b.jobs.task06.CustomerItemProcessor : Item Processor ------------------- Customer(id=14, name=Diana, age=29, gender=F)
2024-11-08T16:39:00.492+09:00 INFO 78872 --- [ main] c.e.b.jobs.task06.CustomerItemProcessor : Item Processor ------------------- Customer(id=13, name=Charlie, age=25, gender=M)
2024-11-08T16:39:00.502+09:00 INFO 78872 --- [ main] c.e.b.jobs.task06.CustomerItemProcessor : Item Processor ------------------- Customer(id=12, name=Bob, age=45, gender=M)
2024-11-08T16:39:00.502+09:00 INFO 78872 --- [ main] c.e.b.jobs.task06.CustomerItemProcessor : Item Processor ------------------- Customer(id=11, name=Alice, age=30, gender=F)
2024-11-08T16:39:00.518+09:00 INFO 78872 --- [ main] o.s.batch.core.step.AbstractStep : Step: [customerJpaPagingStep] executed in 200ms
2024-11-08T16:39:00.530+09:00 INFO 78872 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=JPA_PAGING_CHUNK_JOB]] completed with the following parameters: [{'run.id':'{value=8, type=class java.lang.Long, identifying=true}'}] and the following status: [COMPLETED] in 226ms
생성된 파일(task_06_customer_with_jpa.csv)
Hannah 32 F
George 55 M
Fiona 40 F
Evan 35 M
Diana 29 F
Charlie 25 M
Bob 45 M
Alice 30 F
JpaItemItemReader WrapUp
페이징 처리 시, 보통 페이지 번호(오프셋)를 증가시키며 데이터를 순차적으로 불러옵니다.
페이징 방식은 단순하고 대용량 데이터를 청크 단위로 처리하는 데 유용하지만, 페이지 오프셋이 증가할 때 데이터가 변경되면 누락이 발생할 수 있는 문제가 있습니다.
JpaItemWriter
JpaItemWriter 구성 요소
- EntityManagerFactory: JPA EntityManager 생성을 위한 팩토리 객체
- JpaQueryProvider: 저장할 엔터티에 대한 JPA 쿼리를 생성하는 역할
실습 하기
이번에는 csv 파일(FlatFile)로 저장되어있는 파일을 가져와 배치 작업을 한 뒤, JPA를 이용해 DB 테이블에 저장 해보겠습니다.
전체 코드
@Slf4j
@Configuration
public class JpaItemWriterJobConfig {
public static final int CHUNK_SIZE = 100;
public static final String ENCODING = "UTF-8";
public static final String JPA_ITEM_WRITER_JOB = "JPA_ITEM_WRITER_JOB";
@Autowired
EntityManagerFactory entityManagerFactory;
@Bean
public FlatFileItemReader<Customer> task06FlatFileItemReader() {
return new FlatFileItemReaderBuilder<Customer>()
.name("task06FlatFileItemReader")
.resource(new ClassPathResource("./customer.csv"))
.encoding(ENCODING)
.delimited().delimiter(",")
.names("name", "age", "gender")
.targetType(Customer.class)
.build();
}
/**
* Customer 엔티티를 데이터베이스에 저장하기 위한 JpaItemWriter
* @return Customer 엔티티의 영속화를 위한 JpaItemWriter 인스턴스를 반환
* JpaItemWriter<Customer> 인스턴스를 반환하면 배치 프로세스가 flush를 통해 Customer 데이터를 데이터베이스에 저장하게 되는 것
*/
@Bean
public JpaItemWriter<Customer> jpaItemWriter() {
return new JpaItemWriterBuilder<Customer>()
.entityManagerFactory(entityManagerFactory)
.usePersist(true) // 데이터베이스에 엔티티가 이미 존재하지 않는다고 가정하고 항상 새로운 데이터를 추가하는 방식
.build();
}
@Bean
public Step task06FlatFileStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
log.info("------------------ Init flatFileStep -----------------");
return new StepBuilder("task06FlatFileStep", jobRepository)
.<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
.reader(task06FlatFileItemReader())
.writer(jpaItemWriter())
.build();
}
@Bean
public Job task06FlatFileJob(Step task06FlatFileStep, JobRepository jobRepository) {
log.info("------------------ Init flatFileJob -----------------");
return new JobBuilder(JPA_ITEM_WRITER_JOB, jobRepository)
.incrementer(new RunIdIncrementer())
.start(task06FlatFileStep)
.build();
}
}
실행 전, 데이터베이스에 테이블을 자동 생성하고 삭제할 수 있도록 application.yaml 파일에 해당 설정을 해주었습니다.
jpa:
hibernate:
ddl-auto: update // 자유롭게 설정
실행 결과
mysql> select * from customer2;
+------+----+--------+---------+
| age | id | gender | name |
+------+----+--------+---------+
| 30 | 1 | F | Alice |
| 45 | 2 | M | Bob |
| 25 | 3 | M | Charlie |
| 29 | 4 | F | Diana |
| 35 | 5 | M | Evan |
| 40 | 6 | F | Fiona |
| 55 | 7 | M | George |
| 32 | 8 | F | Hannah |
+------+----+--------+---------+
8 rows in set (0.00 sec)
usePersist(boolean) 설정
usePersist(true)가 없을 경우, 즉 기본 설정인 usePersist(false) 상태에서는 엔티티가 단순히 무시되지 않고 merge가 사용됩니다.
- 엔티티가 이미 존재하면: merge는 동일한 ID(@Id 애너테이션이 붙은 필드 기준)를 가진 엔티티가 영속성 컨텍스트나 데이터베이스에 존재하는 경우, 기존 데이터를 업데이트합니다. 따라서, 내용이 같다면 아무 변화가 없지만, 차이가 있을 경우 기존 데이터를 수정합니다.
- 엔티티가 존재하지 않으면: merge는 새 엔티티를 데이터베이스에 저장하는 대신, 객체를 복제하여 새로운 엔티티로 저장합니다.
현재는 ID 생성 방법을 IDENTITY 로 사용하고 있어서 해당 설정과 무관하게 새로운 엔티티로 저장됩니다.
mysql> select * from customer2;
+----+------+--------+---------+
| id | age | gender | name |
+----+------+--------+---------+
| 1 | 30 | F | Alice |
| 2 | 45 | M | Bob |
| 3 | 25 | M | Charlie |
| 4 | 29 | F | Diana |
| 5 | 35 | M | Evan |
| 6 | 40 | F | Fiona |
| 7 | 55 | M | George |
| 8 | 32 | F | Hannah |
| 9 | 30 | F | Alice |
| 10 | 45 | M | Bob |
| 11 | 25 | M | Charlie |
| 12 | 29 | F | Diana |
| 13 | 35 | M | Evan |
| 14 | 40 | F | Fiona |
| 15 | 55 | M | George |
| 16 | 32 | F | Hannah |
+----+------+--------+---------+
16 rows in set (0.00 sec)
그 외 참고
- https://docs.spring.io/spring-batch/docs/current/api/org/springframework/batch/item/database/JdbcPagingItemReader.html
- https://docs.spring.io/spring-batch/docs/current/api/org/springframework/batch/item/database/JpaPagingItemReader.html
- https://docs.spring.io/spring-batch/reference/readers-and-writers/database.html#JpaPagingItemReader
'알아두면 좋은 개발 지식 > Spring Batch 스터디' 카테고리의 다른 글
[7회차] MyBatisPagingItemReader로 DB내용을 읽고, MyBatisItemWriter로 DB에 쓰기 (3) | 2024.11.16 |
---|---|
[6회차] Spring Batch 스터디: 후기 및 추가 학습 내용 (2) | 2024.11.16 |
[5회차] Spring Batch 스터디: 후기 및 추가 학습 내용 (1) | 2024.11.05 |
[5회차] JdbcPagingItemReader로 DB내용을 읽고, JdbcBatchItemWriter로 DB에 쓰기 (1) | 2024.11.02 |
[4회차] Spring Batch 스터디: 후기 및 추가 학습 내용 (2) | 2024.10.30 |