ilot
ilot의 블로그
ilot
전체 방문자
오늘
어제
  • 분류 전체보기 (17)
    • Algorithm (0)
    • Data Structure (0)
    • Database (0)
    • Operating System (0)
    • Network (0)
    • OOP (1)
    • Design Pattern (5)
    • Java (2)
    • Spring (5)
    • Mybatis (1)
    • JavaScript & TypeScript (0)
    • React (0)
    • Coding Test (2)
    • 독후감 (1)
    • 일상 (0)

블로그 메뉴

  • 홈
  • Github

공지사항

인기 글

태그

최근 댓글

최근 글

티스토리

hELLO · Designed By 정상우.
ilot

ilot의 블로그

Spring

[Spring Batch] Tasklet & Chunk 예시

2022. 7. 13. 23:16

1. 소개

  • Spring Batch는 tasklet 방식과 chunk 방식을 제공한다.

 

2. Dependencies

  • spring-batch-core와 spring-batch-test 의존성이 필요하다.
  • maven
<!-- ... -->

<dependency>
  <groupId>org.springframework.batch</groupId>
  <artifactId>spring-batch-core</artifactId>
  <version>4.3.0</version>
</dependency>
<dependency>
  <groupId>org.springframework.batch</groupId>
  <artifactId>spring-batch-test</artifactId>
  <version>4.3.0</version>
  <scope>test</scope>
</dependency>

<!-- csv 읽기 위한 의존성 -->
<dependency>
  <groupId>com.opencsv</groupId>
  <artifactId>opencsv</artifactId>
  <version>4.1</version>
</dependency>

<!-- ... -->

 

3. 요구사항

  • 아래와 같이 이름과 생년월일이 작성된 CSV 파일을
Mae Hodges,10/22/1972
Gary Potter,02/22/1953
Betty Wise,02/17/1968
Wayne Rose,04/06/1977
Adam Caldwell,09/27/1995
Lucille Phillips,05/14/1992
  • 나이를 계산한 CSV파일로 변환하는 batch job을 만들고자 한다.
Mae Hodges,45
Gary Potter,64
Betty Wise,49
Wayne Rose,40
Adam Caldwell,22
Lucille Phillips,25
  • 위 요구사항을 Tasklet 방식과 Chunk 방식으로 만들어보자.

 

4. Tasklet 방식

4.1 Tasklet 소개

  • Tasklet 이란 한 step 내에서 단일 task를 수행하는 것이다. Tasklet Job은 여러 step으로 구성된다. 긱 step은 정의된 task 단 하나만 수행해야 한다.
  • 요구사항대로 step을 정의하면 크게 3 step으로 나눌 수 있다.
    1. input CSV 파일로부터 라인들을 읽어온다. → LinesReader
    2. input CSV 파일 내의 모든 사람들의 나이를 계산한다. → LinesProcessor
    3. output CSV 파일에 사람들의 이름과 나이를 작성한다. → LinesWriter
  • 위와 같이, 모든 step에는 Tasklet 인터페이스를 implements 해야 한다. execute 함수를 오버라이드 하여 각 step에서 수행할 일을 구현한다.
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
	// ...
}

 

4.2 Configuration

  • Tasklet Job을 정의하는 config 파일을 만든다.
  • 4.1 과정에서 생성한 class를 @Bean으로 설정한 뒤, job 순서 및 내용을 정의하면 된다.
package com.glory.springbatch.batch.tasklet;

// import 생략

@Configuration
@EnableBatchProcessing
public class TaskletsConfig {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    // JUnit 테스트를 위한 파일
    @Bean
    public JobLauncherTestUtils jobLauncherTestUtils() {
        return new JobLauncherTestUtils();
    }

    @Bean
    public LinesReader linesReader() {
        return new LinesReader();
    }

    @Bean
    public LinesProcessor linesProcessor() {
        return new LinesProcessor();
    }

    @Bean
    public LinesWriter linesWriter() {
        return new LinesWriter();
    }

    // 일반 클래스를 Step 으로 구현
    @Bean
    protected Step readLines() {
        return steps
                .get("readLines")
                .tasklet(linesReader())
                .build();
    }

    @Bean
    protected Step processLines() {
        return steps
                .get("processLines")
                .tasklet(linesProcessor())
                .build();
    }

    @Bean
    protected Step writeLines() {
        return steps
                .get("writeLines")
                .tasklet(linesWriter())
                .build();
    }

    // Step의 순서 및 실제 Job을 구현한 Job
    @Bean
    public Job job() {
        return jobs
                .get("taskletsJob")
                .start(readLines())
                .next(processLines())
                .next(writeLines())
                .build();
    }

}

 

4.3 Model & Utils

4.3.1 클래스 Line

  • CSV 파일을 조작하기 위한 class Line을 생성한다.
  • Line은 반드시 Serializable을 인터페이스 받아야 한다. Line은 Tasklet의 step 사이에서 DTO의 역할을 하기 때문이다. Spring Batch의 step 간 전송되는 객체는 serializable 가능해야 한다.
package com.glory.springbatch.model;

// import 생략

@Getter
@Setter
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class Line implements Serializable {

    private String name;
    private LocalDate dob;
    private Long age;

    public Line(String name, LocalDate dob) {
        this.name = name;
        this.dob = dob;
    }
}

 

4.3.2 Utils FileUtils

  • CSV 파일로부터 Line을 읽거나, CSV 파일에 Line을 쓸 수 있는 클래스 FileUtils을 생성한다.
package com.glory.springbatch.util;

// import 생략

public class FileUtils {

    private final Logger logger = LoggerFactory.getLogger(FileUtils.class);

    private String fileName;
    private CSVReader CSVReader;
    private CSVWriter CSVWriter;
    private FileReader fileReader;
    private FileWriter fileWriter;
    private File file;

    public FileUtils(String fileName) {
        this.fileName = fileName;
    }

    // readNext의 전후처리를 위해, wrapper method로 구현되었음.
    public Line readLine() {
        try {
            if (CSVReader == null) {
                initReader();
            }

            String[] line = CSVReader.readNext();

            if (line == null) return null;

            return new Line(line[0], LocalDate.parse(line[1], DateTimeFormatter.ofPattern("MM/dd/yyyy")));
        } catch (Exception e) {
            logger.error("Error while reading line in file: " + this.fileName);
            return null;
        }
    }

    // writeNext의 전후처리를 위해, wrapper method로 구현되었음.
    public void writeLine(Line line) {
        try {
            if (CSVWriter == null) {
                initWriter();
            }

            String[] lineStr = new String[2];
            lineStr[0] = line.getName();
            lineStr[1] = line.getAge().toString();

            CSVWriter.writeNext(lineStr);
        } catch (Exception e) {
            logger.error("Error while writing line in file: " + this.fileName);
        }
    }

    private void initReader() throws Exception {
        ClassLoader classLoader = this
                .getClass()
                .getClassLoader();

        if (file == null) {
            file = new File(classLoader
                    .getResource(fileName)
                    .getFile());
        }

        if (fileReader == null) fileReader = new FileReader(file);
        if (CSVReader == null) CSVReader = new CSVReader(fileReader);
    }

    private void initWriter() throws Exception {
        if (file == null) {
            file = new File(fileName);
            file.createNewFile();
        }

        if (fileWriter == null) fileWriter = new FileWriter(file, true);
        if (CSVWriter == null) CSVWriter = new CSVWriter(fileWriter);
    }

    public void closeWriter() {
        try {
            CSVWriter.close();
            fileWriter.close();
        } catch (IOException e) {
            logger.error("Error while closing writer.");
        }
    }

    public void closeReader() {
        try {
            CSVReader.close();
            fileReader.close();
        } catch (IOException e) {
            logger.error("Error while closing reader.");
        }
    }

}

 

4.4 LinesReader

  • Tasklet에 의해 오버라이드된 메서드 : execute
    • execute : step에서 행해야 할 작업을 구현한다.
  • StepExecutionListener에 의해 오버라이드된 메서드 : beforeStep, afterStep
    • beforeStep : execute가 실행되기 전 실행되는 메서드
    • afterStep : execute가 실행된 후 실행되는 메서드
package com.glory.springbatch.batch.tasklet.step;

// import 생략

public class LinesReader implements Tasklet, StepExecutionListener {

    private final Logger logger = LoggerFactory.getLogger(LinesReader.class);

    private List<Line> lines;
    private FileUtils fu;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        lines = new ArrayList<>();
        fu = new FileUtils("input/test.csv"); // resource 디렉토리 하위의 파일을 읽는다.
        logger.debug("Lines Reader initialized.");
    }

    @Override
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
        Line line = fu.readLine();
        while (line != null) {
            lines.add(line);
            logger.debug("Read line: " + line.toString());
            line = fu.readLine();
        }
        return RepeatStatus.FINISHED;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        fu.closeReader();
        stepExecution
                .getJobExecution()
                .getExecutionContext()
                .put("lines", this.lines);
        logger.debug("Lines Reader ended.");

        return ExitStatus.COMPLETED;
    }

}
  • step 간 데이터를 주고받기 위해서, key-value 형태로 값을 저장하고, 다음 step에서 key를 통해 접근한다
stepExecution
    .getJobExecution()
    .getExecutionContext()
    .put("lines", this.lines); // key = lines, value = this.lines

 

4.5 LinesProcessor

package com.glory.springbatch.batch.tasklet.step;

// import 생략

public class LinesProcessor implements Tasklet, StepExecutionListener {

    private final Logger logger = LoggerFactory.getLogger(LinesReader.class);

    private List<Line> lines;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        ExecutionContext executionContext = stepExecution
                .getJobExecution()
                .getExecutionContext();
        lines = (List<Line>) executionContext.get("lines");
        logger.debug("Lines Processor initialized.");
    }

    @Override
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {

        lines.forEach(line -> {
            long age = ChronoUnit.YEARS.between(line.getDob(), LocalDate.now());
            logger.debug("Calculated age " + age + " for line " + line.toString());
            line.setAge(age);
        });

        return RepeatStatus.FINISHED;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        logger.debug("Lines Processor ended.");
        return ExitStatus.COMPLETED;
    }

}
  • 이전 step에서 데이터를 꺼내는 코드에 주목하자.
ExecutionContext executionContext = stepExecution.getJobExecution().getExecutionContext();
lines = (List<Line>) executionContext.get("lines");

 

4.6 LinesWriter

package com.glory.springbatch.batch.tasklet.step;

// import 생략

public class LinesWriter implements Tasklet, StepExecutionListener {

    private final Logger logger = LoggerFactory.getLogger(LinesReader.class);

    private List<Line> lines;
    private FileUtils fu;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        ExecutionContext executionContext = stepExecution.getJobExecution().getExecutionContext();
        this.lines = (List<Line>) executionContext.get("lines");
        fu = new FileUtils("output.csv");
        logger.debug("Lines Writer initialized.");
    }

    @Override
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
        lines.forEach(line -> {
            fu.writeLine(line);
            logger.debug("Wrote line " + line.toString());
        });

        return RepeatStatus.FINISHED;
    }


    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        fu.closeWriter();
        logger.debug("Lines Writer ended.");
        return ExitStatus.COMPLETED;
    }

}

 

4.7 테스트 코드 작성

  • @ContextConfiguration 은 job 정의가 존재하는 Spring Context Configuration 클래스를 가리키고 있다.
package com.glory.springbatch.batch.tasklet;

// import 생략

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = TaskletsConfig.class)
public class TaskletsConfigTest {

    @Autowired
    JobLauncherTestUtils jobLauncherTestUtils;

    @Test
    public void testTaskletsJob() throws Exception {
        JobExecution jobExecution = jobLauncherTestUtils.launchJob();
        Assert.assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
    }

}

 

5. Chunk 방식

5.1 Chunk 소개

  • Chunk는 job의 단위를 step으로 나누는 것이 아니라, job이 수행될 데이터를 고정된 크기로 나누어 Batch를 실행하는 방식이다.
  • 고정된 크기로 나누어진 데이터를 Chunk라고 불린다.
  • Chunk가 없어질 때까지 job을 수행한다.
  • Tasklet 방식과는 수도코드가 약간 바뀌게 된다.
Chunk의 크기를 X로 지정

line을 모두 소모할 때 까지:
  X개의 line만큼 아래 작업 수행:
    1개의 line을 읽는다.
    1개의 line에 대해 작업을 수행한다. (위 예시에서는, age로 변환하는 작업이 되겠다.)
  X개의 line을 output 파일에 작성한다.

 

5.2 Configuration

  • 아래 tasklet은 오로지 1 step으로 이루어져 있다. 하지만, 기존 tasklet과는 달리 데이터 chunk들을 오가며 작동할 reader, writer, processor를 정의한다.
  • commit 간격은 1개의 chunk 단위로 지정된다. 아래 예시에는 2로 설정하였다.
package com.glory.springbatch.batch.chunk;

// import 생략

@Configuration
@EnableBatchProcessing
public class ChunksConfig {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public JobLauncherTestUtils jobLauncherTestUtils() {
        return new JobLauncherTestUtils();
    }

    @Bean
    public ItemReader<Line> itemReader() {
        return new LinesReader();
    }

    @Bean
    public ItemProcessor<Line, Line> itemProcessor() {
        return new LinesProcessor();
    }

    @Bean
    public ItemWriter<Line> itemWriter() {
        return new LinesWriter();
    }

    @Bean
    protected Step processLines(ItemReader<Line> reader, ItemProcessor<Line, Line> processor, ItemWriter<Line> writer) {
        return steps.get("processLines").<Line, Line> chunk(2) // chunk 단위 설정
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }

    @Bean
    public Job job() {
        return jobs
                .get("chunksJob")
                .start(processLines(itemReader(), itemProcessor(), itemWriter()))
                .build();
    }
}

 

5.3 LineReader

  • LineReader는 CSV 파일로부터 1 줄을 읽고, Line으로 변환하는 작업을 한다. 파일을 읽기 위해 ItemReader interface를 구현하였고, read 메서드를 오버라이드 하였다.
package com.glory.springbatch.batch.chunk.step;

// import 생략

public class LinesReader implements ItemReader<Line>, StepExecutionListener {

    private final Logger logger = LoggerFactory.getLogger(LinesReader.class);
    private FileUtils fu;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        fu = new FileUtils("input/test.csv");
        logger.debug("Line Reader initialized.");
    }

    @Override
    public Line read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        Line line = fu.readLine();
        if (line != null) {
            logger.debug("Read line : " + line.toString());
        }
        return line;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        fu.closeReader();
        logger.debug("Line Reader ended.");
        return ExitStatus.COMPLETED;
    }
}

 

5.4 LineProcessor

  • LineProcessor는 ItemProcessor를 상속받아 process 메서드를 오버라이드 한다.
  • tasklet 방식과는 달리, step 간 데이터를 주고받기 위해 stepExecution.getJobExecution().getExecutionContext()를 수행하여 key-value 형태로 가져왔었어야 했다. itemReader(), itemProcessor(), itemWriter() 끼리 아규먼트로 값을 주고받기 때문에, chunk 방식에서는 해당 코드는 생략될 수 있었고, 더욱 간편히 코드를 작성할 수 있다.
package com.glory.springbatch.batch.chunk.step;

// import 생략

public class LinesProcessor implements ItemProcessor<Line, Line>, StepExecutionListener {

    private final Logger logger = LoggerFactory.getLogger(LinesReader.class);

    @Override
    public void beforeStep(StepExecution stepExecution) {
        logger.debug("Lines Processor initialized.");
    }

    @Override
    public Line process(Line line) throws Exception {
        long age = ChronoUnit.YEARS.between(line.getDob(), LocalDate.now());
        logger.debug("Calculated age " + age + " for line " + line.toString());
        line.setAge(age);
        return line;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        logger.debug("Lines Processor ended.");
        return ExitStatus.COMPLETED;
    }

}

 

5.5 LinesWriter

  • 1 Line씩 처리하던 reader와 processor와는 달리, LinesWriter는 청크 단위의 라인의 List를 받는다.
package com.glory.springbatch.batch.chunk.step;

// import 생략

import java.util.List;

public class LinesWriter implements ItemWriter<Line>, StepExecutionListener {

    private final Logger logger = LoggerFactory.getLogger(LinesReader.class);

    private FileUtils fu;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        fu = new FileUtils("output.csv");
        logger.debug("Lines Writer initialized.");
    }

    @Override
    public void write(List<? extends Line> list) throws Exception {
        list.forEach(line -> {
            fu.writeLine(line);
            logger.debug("Wrote line " + line.toString());
        });
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        fu.closeWriter();
        logger.debug("Lines Writer ended.");
        return ExitStatus.COMPLETED;
    }

}

 

5.6 테스트 코드 작성

package com.glory.springbatch.batch.chunk;

// import 생략

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = ChunksConfig.class)
public class ChunksConfigTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Test
    public void testChunksJob() throws Exception {
        JobExecution jobExecution = jobLauncherTestUtils.launchJob();
        assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
    }

}

 

6. 결론

  • Tasklet 방식은 작업의 순서가 바뀔 수 있는 Job에 사용하면 유지보수가 편하다.
  • Chunk 방식은 특정 테이블을 페이지 방식으로 읽는 Job이나, 한번에 데이터가 너무 많이 처리되어 메모리에 무리가 갈 수 있어 Chunk 단위로 나눠 메모리의 과부화를 방지하기 위해 사용한다.

 

참고 및 번역 : https://www.baeldung.com/spring-batch-tasklet-chunk
작성한 코드 : https://github.com/96glory/spring-batch-test

'Spring' 카테고리의 다른 글

[Spring] 서비스는 트랜잭션, 도메인의 순서 보장 역할만 제공한다  (0) 2022.11.09
[Spring] Async  (0) 2022.10.28
[Spring] 트랜잭션  (0) 2022.10.19
[Spring] 왜 Spring 인가  (0) 2022.08.14
    'Spring' 카테고리의 다른 글
    • [Spring] 서비스는 트랜잭션, 도메인의 순서 보장 역할만 제공한다
    • [Spring] Async
    • [Spring] 트랜잭션
    • [Spring] 왜 Spring 인가
    ilot
    ilot
    _

    티스토리툴바