Spring Batch с помощта на Partitioner

1. Общ преглед

В предишното ни въведение в Spring Batch, ние представихме рамката като инструмент за групова обработка. Също така проучихме подробностите за конфигурацията и внедряването за изпълнение на задача с един поток, един процес.

За да се приложи задача с някаква паралелна обработка, се предлагат редица опции. На по-високо ниво има два режима на паралелна обработка:

  1. Единичен процес, многонишков
  2. Многопроцесорен

В тази бърза статия ще обсъдим разделянето на Step , което може да бъде приложено както за еднопроцесни, така и за многопроцесорни задачи.

2. Разделяне на стъпка

Spring Batch с разделяне ни предоставя възможност да разделяме изпълнението на стъпка :

Преглед на разделянето

Горната снимка показва изпълнение на работа с разделена стъпка .

Има стъпка, наречена "Master", чието изпълнение е разделено на някои "Slave" стъпки. Тези роби могат да заемат мястото на господар и резултатът ще остане непроменен. И master, и slave са екземпляри на Step . Подчинените могат да бъдат отдалечени услуги или просто локално изпълняващи нишки.

Ако е необходимо, можем да предадем данни от главния на подчинения. Метаданните (т.е. JobRepository ) гарантират, че всеки slave се изпълнява само веднъж в едно изпълнение на Job.

Ето схемата на последователността, показваща как работи всичко:

Стъпка на разделяне

Както е показано, PartitionStep управлява изпълнението. В PartitionHandler е отговорен за разделяне на работата на "магистър" в "роби". Най-дясната стъпка е робът.

3. Maven POM

Зависимостите на Maven са същите, както е споменато в предишната ни статия. Тоест Spring Core, Spring Batch и зависимостта за базата данни (в нашия случай SQLite ).

4. Конфигурация

В нашата уводна статия видяхме пример за конвертиране на някои финансови данни от CSV в XML файл. Нека разширим същия пример.

Тук ще преобразуваме финансовата информация от 5 CSV файла в съответни XML файлове, като използваме многонишкова реализация.

Можем да постигнем това, като използваме разделяне на задание и стъпка . Ще имаме пет нишки, по една за всеки от CSV файловете.

Първо, нека създадем работа:

@Bean(name = "partitionerJob") public Job partitionerJob() throws UnexpectedInputException, MalformedURLException, ParseException { return jobs.get("partitioningJob") .start(partitionStep()) .build(); }

Както виждаме, тази работа започва с PartitioningStep . Това е нашата основна стъпка, която ще бъде разделена на различни подчинени стъпки:

@Bean public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException { return steps.get("partitionStep") .partitioner("slaveStep", partitioner()) .step(slaveStep()) .taskExecutor(taskExecutor()) .build(); }

Тук ще създадем PartitioningStep, използвайки StepBuilderFactory . За това трябва да дадем информацията за SlaveSteps и Partitioner .

В Partitioner е интерфейс, който осигурява на съоръжението, за да се определи набор от входни стойности за всеки един от робите. С други думи, логиката за разделяне на задачите на съответните нишки отива тук.

Нека създадем негова реализация, наречена CustomMultiResourcePartitioner , където ще поставим имената на входните и изходните файлове в ExecutionContext, за да ги предадем на всяка подчинена стъпка:

public class CustomMultiResourcePartitioner implements Partitioner { @Override public Map partition(int gridSize) { Map map = new HashMap(gridSize); int i = 0, k = 1; for (Resource resource : resources) { ExecutionContext context = new ExecutionContext(); Assert.state(resource.exists(), "Resource does not exist: " + resource); context.putString(keyName, resource.getFilename()); context.putString("opFileName", "output"+k+++".xml"); map.put(PARTITION_KEY + i, context); i++; } return map; } }

Също така ще създадем bean за този клас, където ще дадем директорията на източника за входни файлове:

@Bean public CustomMultiResourcePartitioner partitioner() { CustomMultiResourcePartitioner partitioner = new CustomMultiResourcePartitioner(); Resource[] resources; try { resources = resoursePatternResolver .getResources("file:src/main/resources/input/*.csv"); } catch (IOException e) { throw new RuntimeException("I/O problems when resolving" + " the input file pattern.", e); } partitioner.setResources(resources); return partitioner; }

Ще дефинираме подчинената стъпка, както всяка друга стъпка с четеца и писателя. Четецът и писателят ще бъдат същите, както видяхме в нашия уводен пример, с изключение на това, че ще получат параметъра на името на файла от StepExecutionContext.

Имайте предвид, че тези зърна трябва да бъдат обхванати стъпка, така че да могат да получават параметрите stepExecutionContext на всяка стъпка. Ако те не биха могли да бъдат обхванати стъпка, техните компоненти ще бъдат създадени първоначално и няма да приемат имената на файлове на ниво стъпка:

@StepScope @Bean public FlatFileItemReader itemReader( @Value("#{stepExecutionContext[fileName]}") String filename) throws UnexpectedInputException, ParseException { FlatFileItemReader reader = new FlatFileItemReader(); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); String[] tokens = {"username", "userid", "transactiondate", "amount"}; tokenizer.setNames(tokens); reader.setResource(new ClassPathResource("input/" + filename)); DefaultLineMapper lineMapper = new DefaultLineMapper(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(new RecordFieldSetMapper()); reader.setLinesToSkip(1); reader.setLineMapper(lineMapper); return reader; } 
@Bean @StepScope public ItemWriter itemWriter(Marshaller marshaller, @Value("#{stepExecutionContext[opFileName]}") String filename) throws MalformedURLException { StaxEventItemWriter itemWriter = new StaxEventItemWriter(); itemWriter.setMarshaller(marshaller); itemWriter.setRootTagName("transactionRecord"); itemWriter.setResource(new ClassPathResource("xml/" + filename)); return itemWriter; }

Докато споменаваме четеца и писателя в подчинената стъпка, можем да предадем аргументите като null, тъй като тези имена на файлове няма да бъдат използвани, тъй като те ще получат имената на файлове от stepExecutionContext :

@Bean public Step slaveStep() throws UnexpectedInputException, MalformedURLException, ParseException { return steps.get("slaveStep").chunk(1) .reader(itemReader(null)) .writer(itemWriter(marshaller(), null)) .build(); }

5. Заключение

В този урок обсъдихме как да приложим задача с паралелна обработка с помощта на Spring Batch.

Както винаги, пълното внедряване за този пример е достъпно в GitHub.