Spring Batch - Tasklets vs Chunks

1. Въведение

Spring Batch предоставя два различни начина за изпълнение на работа: използване на тасклети и парчета .

В тази статия ще научим как да конфигурираме и внедрим двата метода, като използваме прост пример от реалния живот.

2. Зависимости

Нека започнем, като добавим необходимите зависимости :

 org.springframework.batch spring-batch-core 4.2.0.RELEASE   org.springframework.batch spring-batch-test 4.2.0.RELEASE test 

За да получите най-новата версия на spring-batch-core и spring-batch-test, моля обърнете се към Maven Central.

3. Нашият случай на употреба

Нека разгледаме CSV файл със следното съдържание:

Mae Hodges,10/22/1972 Gary Potter,02/22/1953 Betty Wise,02/17/1968 Wayne Rose,04/06/1977 Adam Caldwell,09/27/1995 Lucille Phillips,05/14/1992

В първата позиция на всеки ред представлява име на човек, а втората позиция представлява неговата / нейната дата на раждане .

Нашият случай е да генерираме друг CSV файл, който съдържа името и възрастта на всеки човек :

Mae Hodges,45 Gary Potter,64 Betty Wise,49 Wayne Rose,40 Adam Caldwell,22 Lucille Phillips,25

Сега, когато нашият домейн е ясен, нека да продължим и да изградим решение, използвайки и двата подхода. Ще започнем с тасклети.

4. Подход на Tasklets

4.1. Въведение и дизайн

Тасклетите са предназначени да изпълняват една задача в рамките на стъпка. Нашата работа ще се състои от няколко стъпки, които се изпълняват една след друга. Всяка стъпка трябва да изпълнява само една определена задача .

Нашата работа ще се състои от три стъпки:

  1. Прочетете редове от входния CSV файл.
  2. Изчислете възрастта за всеки човек във входния CSV файл.
  3. Напишете име и възраст на всеки човек в нов изходен CSV файл.

Сега, когато голямата картина е готова, нека създадем по един клас на стъпка.

LinesReader ще отговаря за четенето на данни от входния файл:

public class LinesReader implements Tasklet { // ... }

LinesProcessor ще изчисли възрастта за всеки човек във файла:

public class LinesProcessor implements Tasklet { // ... }

И накрая, LinesWriter ще носи отговорността да пише имена и възрасти в изходен файл:

public class LinesWriter implements Tasklet { // ... }

На този етап всички наши стъпки изпълняват интерфейса на Tasklet . Това ще ни принуди да приложим неговия метод за изпълнение :

@Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { // ... }

Този метод е мястото, където ще добавим логиката за всяка стъпка. Преди да започнем с този код, нека конфигурираме нашата работа.

4.2. Конфигурация

Трябва да добавим някаква конфигурация към контекста на приложението на Spring . След добавяне на стандартна декларация на боб за класовете, създадени в предишния раздел, ние сме готови да създадем нашата дефиниция на работа:

@Configuration @EnableBatchProcessing public class TaskletsConfig { @Autowired private JobBuilderFactory jobs; @Autowired private StepBuilderFactory steps; @Bean protected Step readLines() { return steps .get("readLines") .tasklet(linesReader()) .build(); } @Bean protected Step processLines() { return steps .get("processLines") .tasklet(linesProcessor()) .build(); } @Bean protected Step writeLines() { return steps .get("writeLines") .tasklet(linesWriter()) .build(); } @Bean public Job job() { return jobs .get("taskletsJob") .start(readLines()) .next(processLines()) .next(writeLines()) .build(); } // ... }

Това означава, че нашата “taskletsJob” ще се състои от три стъпки. Първият ( readLines ) ще изпълни tasklet, дефиниран в bean linesReader и ще премине към следващата стъпка: processLines. ProcessLines ще изпълни tasklet, дефиниран в bean linesProcessor и ще премине към последната стъпка: writeLines .

Нашият работен поток е дефиниран и сме готови да добавим малко логика!

4.3. Модел и полезни инструменти

Тъй като ще манипулираме редове в CSV файл, ще създадем клас Line:

public class Line implements Serializable { private String name; private LocalDate dob; private Long age; // standard constructor, getters, setters and toString implementation }

Моля, обърнете внимание, че Line прилага Serializable. Това е така, защото Line ще действа като DTO за прехвърляне на данни между стъпките. Според Spring Batch обектите, които се прехвърлят между стъпки, трябва да могат да се сериализират .

От друга страна, можем да започнем да мислим за четене и писане на редове.

За това ще използваме OpenCSV:

 com.opencsv opencsv 4.1 

Потърсете най-новата версия на OpenCSV в Maven Central.

След като OpenCSV бъде включен, ние също ще създадем клас FileUtils . Той ще осигури методи за четене и писане на CSV редове:

public class FileUtils { public Line readLine() throws Exception { if (CSVReader == null) initReader(); String[] line = CSVReader.readNext(); if (line == null) return null; return new Line( line[0], LocalDate.parse( line[1], DateTimeFormatter.ofPattern("MM/dd/yyyy"))); } public void writeLine(Line line) throws Exception { if (CSVWriter == null) initWriter(); String[] lineStr = new String[2]; lineStr[0] = line.getName(); lineStr[1] = line .getAge() .toString(); CSVWriter.writeNext(lineStr); } // ... }

Забележете, че readLine действа като обвивка над метода readNext на OpenCSV и връща обект Line .

По същия начин, writeLine обвива OpenCSV's writeNext, получавайки Line обект. Пълното изпълнение на този клас може да бъде намерено в проекта GitHub.

На този етап всички сме готови да започнем с всяка стъпка на изпълнение.

4.4. LinesReader

Нека да продължим и да завършим нашия клас LinesReader :

public class LinesReader implements Tasklet, StepExecutionListener { private final Logger logger = LoggerFactory .getLogger(LinesReader.class); private List lines; private FileUtils fu; @Override public void beforeStep(StepExecution stepExecution) { lines = new ArrayList(); fu = new FileUtils( "taskletsvschunks/input/tasklets-vs-chunks.csv"); logger.debug("Lines Reader initialized."); } @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { Line line = fu.readLine(); while (line != null) { lines.add(line); logger.debug("Read line: " + line.toString()); line = fu.readLine(); } return RepeatStatus.FINISHED; } @Override public ExitStatus afterStep(StepExecution stepExecution) { fu.closeReader(); stepExecution .getJobExecution() .getExecutionContext() .put("lines", this.lines); logger.debug("Lines Reader ended."); return ExitStatus.COMPLETED; } }

Изпълнителният метод на LinesReader създава екземпляр FileUtils над пътя на входния файл. След това добавя редове към списък, докато няма повече редове за четене .

Нашият клас също така изпълнява StepExecutionListener, който предоставя два допълнителни метода: beforeStep и afterStep . Ще използваме тези методи за инициализиране и затваряне на нещата преди и след изпълнението .

Ако разгледаме кода afterStep , ще забележим реда, където списъкът с резултати ( редове) се поставя в контекста на заданието, за да го направи достъпен за следващата стъпка:

stepExecution .getJobExecution() .getExecutionContext() .put("lines", this.lines);

На този етап първата ни стъпка вече е изпълнила своята отговорност: зареждаме CSV линии в списък в паметта. Нека да преминем към втората стъпка и да ги обработим.

4.5. LinesProcessor

LinesProcessor също ще внедри StepExecutionListener и, разбира се, Tasklet . Това означава, че той ще приложииметодите beforeStep , изпълнение и afterStep :

public class LinesProcessor implements Tasklet, StepExecutionListener { private Logger logger = LoggerFactory.getLogger( LinesProcessor.class); private List lines; @Override public void beforeStep(StepExecution stepExecution) { ExecutionContext executionContext = stepExecution .getJobExecution() .getExecutionContext(); this.lines = (List) executionContext.get("lines"); logger.debug("Lines Processor initialized."); } @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { for (Line line : lines) { long age = ChronoUnit.YEARS.between( line.getDob(), LocalDate.now()); logger.debug("Calculated age " + age + " for line " + line.toString()); line.setAge(age); } return RepeatStatus.FINISHED; } @Override public ExitStatus afterStep(StepExecution stepExecution) { logger.debug("Lines Processor ended."); return ExitStatus.COMPLETED; } }

Лесно е да се разбере, че зарежда списък с редове от контекста на работата и изчислява възрастта на всеки човек .

Няма нужда да поставяте друг списък с резултати в контекста, тъй като модификациите се случват на същия обект, който идва от предишната стъпка.

И ние сме готови за последната ни стъпка.

4.6. LinesWriter

Задачата на LinesWriter е да прегледа списъка с редове и да напише име и възраст в изходния файл :

public class LinesWriter implements Tasklet, StepExecutionListener { private final Logger logger = LoggerFactory .getLogger(LinesWriter.class); private List lines; private FileUtils fu; @Override public void beforeStep(StepExecution stepExecution) { ExecutionContext executionContext = stepExecution .getJobExecution() .getExecutionContext(); this.lines = (List) executionContext.get("lines"); fu = new FileUtils("output.csv"); logger.debug("Lines Writer initialized."); } @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { for (Line line : lines) { fu.writeLine(line); logger.debug("Wrote line " + line.toString()); } return RepeatStatus.FINISHED; } @Override public ExitStatus afterStep(StepExecution stepExecution) { fu.closeWriter(); logger.debug("Lines Writer ended."); return ExitStatus.COMPLETED; } }

We're done with our job's implementation! Let's create a test to run it and see the results.

4.7. Running the Job

To run the job, we'll create a test:

@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = TaskletsConfig.class) public class TaskletsTest { @Autowired private JobLauncherTestUtils jobLauncherTestUtils; @Test public void givenTaskletsJob_whenJobEnds_thenStatusCompleted() throws Exception { JobExecution jobExecution = jobLauncherTestUtils.launchJob(); assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus()); } }

ContextConfiguration annotation is pointing to the Spring context configuration class, that has our job definition.

We'll need to add a couple of extra beans before running the test:

@Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } @Bean public JobRepository jobRepository() throws Exception { MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean(); factory.setTransactionManager(transactionManager()); return (JobRepository) factory.getObject(); } @Bean public PlatformTransactionManager transactionManager() { return new ResourcelessTransactionManager(); } @Bean public JobLauncher jobLauncher() throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository()); return jobLauncher; }

Everything is ready! Go ahead and run the test!

After the job has finished, output.csv has the expected content and logs show the execution flow:

[main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader initialized. [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Mae Hodges,10/22/1972] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Gary Potter,02/22/1953] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Betty Wise,02/17/1968] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Wayne Rose,04/06/1977] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Adam Caldwell,09/27/1995] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Lucille Phillips,05/14/1992] [main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader ended. [main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor initialized. [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 64 for line [Gary Potter,02/22/1953] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 49 for line [Betty Wise,02/17/1968] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992] [main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor ended. [main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer initialized. [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Gary Potter,02/22/1953,64] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Betty Wise,02/17/1968,49] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25] [main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer ended.

That's it for Tasklets. Now we can move on to the Chunks approach.

5. Chunks Approach

5.1. Introduction and Design

As the name suggests, this approach performs actions over chunks of data. That is, instead of reading, processing and writing all the lines at once, it'll read, process and write a fixed amount of records (chunk) at a time.

Then, it'll repeat the cycle until there's no more data in the file.

As a result, the flow will be slightly different:

  1. While there're lines:
    • Do for X amount of lines:
      • Read one line
      • Process one line
    • Write X amount of lines.

So, we also need to create three beans for chunk oriented approach:

public class LineReader { // ... }
public class LineProcessor { // ... }
public class LinesWriter { // ... }

Before moving to implementation, let's configure our job.

5.2. Configuration

The job definition will also look different:

@Configuration @EnableBatchProcessing public class ChunksConfig { @Autowired private JobBuilderFactory jobs; @Autowired private StepBuilderFactory steps; @Bean public ItemReader itemReader() { return new LineReader(); } @Bean public ItemProcessor itemProcessor() { return new LineProcessor(); } @Bean public ItemWriter itemWriter() { return new LinesWriter(); } @Bean protected Step processLines(ItemReader reader, ItemProcessor processor, ItemWriter writer) { return steps.get("processLines"). chunk(2) .reader(reader) .processor(processor) .writer(writer) .build(); } @Bean public Job job() { return jobs .get("chunksJob") .start(processLines(itemReader(), itemProcessor(), itemWriter())) .build(); } }

In this case, there's only one step performing only one tasklet.

However, that tasklet defines a reader, a writer and a processor that will act over chunks of data.

Note that the commit interval indicates the amount of data to be processed in one chunk. Our job will read, process and write two lines at a time.

Now we're ready to add our chunk logic!

5.3. LineReader

LineReader will be in charge of reading one record and returning a Line instance with its content.

To become a reader, our class has to implement ItemReader interface:

public class LineReader implements ItemReader { @Override public Line read() throws Exception { Line line = fu.readLine(); if (line != null) logger.debug("Read line: " + line.toString()); return line; } }

The code is straightforward, it just reads one line and returns it. We'll also implement StepExecutionListener for the final version of this class:

public class LineReader implements ItemReader, StepExecutionListener { private final Logger logger = LoggerFactory .getLogger(LineReader.class); private FileUtils fu; @Override public void beforeStep(StepExecution stepExecution) { fu = new FileUtils("taskletsvschunks/input/tasklets-vs-chunks.csv"); logger.debug("Line Reader initialized."); } @Override public Line read() throws Exception { Line line = fu.readLine(); if (line != null) logger.debug("Read line: " + line.toString()); return line; } @Override public ExitStatus afterStep(StepExecution stepExecution) { fu.closeReader(); logger.debug("Line Reader ended."); return ExitStatus.COMPLETED; } }

It should be noticed that beforeStep and afterStep execute before and after the whole step respectively.

5.4. LineProcessor

LineProcessor follows pretty much the same logic than LineReader.

However, in this case, we'll implement ItemProcessor and its method process():

public class LineProcessor implements ItemProcessor { private Logger logger = LoggerFactory.getLogger(LineProcessor.class); @Override public Line process(Line line) throws Exception { long age = ChronoUnit.YEARS .between(line.getDob(), LocalDate.now()); logger.debug("Calculated age " + age + " for line " + line.toString()); line.setAge(age); return line; } }

The process() method takes an input line, processes it and returns an output line. Again, we'll also implement StepExecutionListener:

public class LineProcessor implements ItemProcessor, StepExecutionListener { private Logger logger = LoggerFactory.getLogger(LineProcessor.class); @Override public void beforeStep(StepExecution stepExecution) { logger.debug("Line Processor initialized."); } @Override public Line process(Line line) throws Exception { long age = ChronoUnit.YEARS .between(line.getDob(), LocalDate.now()); logger.debug( "Calculated age " + age + " for line " + line.toString()); line.setAge(age); return line; } @Override public ExitStatus afterStep(StepExecution stepExecution) { logger.debug("Line Processor ended."); return ExitStatus.COMPLETED; } }

5.5. LinesWriter

Unlike reader and processor, LinesWriter will write an entire chunk of lines so that it receives a List of Lines:

public class LinesWriter implements ItemWriter, StepExecutionListener { private final Logger logger = LoggerFactory .getLogger(LinesWriter.class); private FileUtils fu; @Override public void beforeStep(StepExecution stepExecution) { fu = new FileUtils("output.csv"); logger.debug("Line Writer initialized."); } @Override public void write(List lines) throws Exception { for (Line line : lines) { fu.writeLine(line); logger.debug("Wrote line " + line.toString()); } } @Override public ExitStatus afterStep(StepExecution stepExecution) { fu.closeWriter(); logger.debug("Line Writer ended."); return ExitStatus.COMPLETED; } }

LinesWriter code speaks for itself. And again, we're ready to test our job.

5.6. Running the Job

We'll create a new test, same as the one we created for the tasklets approach:

@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = ChunksConfig.class) public class ChunksTest { @Autowired private JobLauncherTestUtils jobLauncherTestUtils; @Test public void givenChunksJob_whenJobEnds_thenStatusCompleted() throws Exception { JobExecution jobExecution = jobLauncherTestUtils.launchJob(); assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus()); } }

After configuring ChunksConfig as explained above for TaskletsConfig, we're all set to run the test!

Once the job is done, we can see that output.csv contains the expected result again, and the logs describe the flow:

[main] DEBUG o.b.t.chunks.LineReader - Line Reader initialized. [main] DEBUG o.b.t.chunks.LinesWriter - Line Writer initialized. [main] DEBUG o.b.t.chunks.LineProcessor - Line Processor initialized. [main] DEBUG o.b.t.chunks.LineReader - Read line: [Mae Hodges,10/22/1972] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Gary Potter,02/22/1953] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 64 for line [Gary Potter,02/22/1953] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Gary Potter,02/22/1953,64] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Betty Wise,02/17/1968] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Wayne Rose,04/06/1977] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 49 for line [Betty Wise,02/17/1968] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Betty Wise,02/17/1968,49] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Adam Caldwell,09/27/1995] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Lucille Phillips,05/14/1992] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25] [main] DEBUG o.b.t.chunks.LineProcessor - Line Processor ended. [main] DEBUG o.b.t.chunks.LinesWriter - Line Writer ended. [main] DEBUG o.b.t.chunks.LineReader - Line Reader ended.

We have the same result and a different flow. Logs make evident how the job executes following this approach.

6. Conclusion

Различните контексти ще покажат необходимостта от единия или другия подход. Докато Tasklets се чувстват по-естествено за сценариите „една задача след друга“, парчетата предоставят просто решение за справяне със странични четения или ситуации, при които не искаме да съхраняваме значително количество данни в паметта.

Пълното изпълнение на този пример може да бъде намерено в проекта GitHub .