Java Batch Tutorial

În lumea de astăzi, internetul a schimbat modul în care ne trăim viețile și unul dintre motivele principale este utilizarea internetului pentru majoritatea treburilor zilnice. Acest lucru a dus la o cantitate uriașă de date disponibile pentru procesare.

Câteva dintre exemplele în care sunt implicate date uriașe sunt procesarea statelor de plată, a extraselor bancare, calcularea dobânzilor etc. Deci, imaginați-vă că dacă toate aceste lucrări ar trebui să fie făcute manual, va dura o veșnicie pentru a termina aceste lucrări.

Cum se procedează în epoca actuală? Răspunsul este procesarea pe loturi.

Introducere

Procesarea pe loturi se realizează pe date în masă, fără intervenție manuală și de lungă durată. Ea poate fi intensivă din punct de vedere al datelor sau al calculului. Lucrările pe loturi pot fi executate după un program predefinit sau pot fi inițiate la cerere. De asemenea, deoarece lucrările pe loturi sunt, de obicei, lucrări de lungă durată, verificările constante și repornirea de la un anumit eșec sunt caracteristici comune întâlnite în lucrările pe loturi.

1.1 Istoricul procesării pe loturi Java

Prelucrarea pe loturi pentru platforma Java a fost introdusă ca parte a specificației JSR 352, parte a platformei Java EE 7, definește modelul de programare pentru aplicațiile pe loturi plus un timp de execuție pentru a rula și gestiona lucrările pe loturi.

1.2 Arhitectura Java Batch

Diagrama de mai jos prezintă componentele de bază pentru procesarea pe loturi.

Arhitectura pentru procesarea pe loturi Java

Arhitectura pentru aplicațiile de procesare pe loturi rezolvă problemele de procesare pe loturi, cum ar fi lucrările, etapele, depozitele, modelele de cititori procesatori scriitori, bucăți, puncte de control, puncte de control, procesare paralelă, flux, reluări, secvențiere, partiționare etc.

Să înțelegem fluxul arhitecturii.

  • Rezervația de lucrări conține lucrările care trebuie să fie executate.
  • JobLauncher extrage o lucrare din Rezervația de lucrări.
  • Care lucrare conține pași. Etapele sunt ItemReader, ItemProcessor și ItemWriter.
  • Item Reader este cel care citește datele.
  • Item Process este cel care va procesa datele pe baza logicii de afaceri.
  • Procesorul de elemente va scrie datele înapoi la sursa definită.

1.3 Componente de procesare pe loturi.

Acum vom încerca să înțelegem în detaliu componentele de procesare pe loturi.

  • Job: O sarcină cuprinde întregul proces de procesare pe loturi. Acesta conține una sau mai multe etape. Un job este alcătuit cu ajutorul unui limbaj de specificare a jobului (Job Specification Language – JSL) care specifică ordinea în care trebuie executate etapele. În JSR 352, JSL este specificat într-un fișier XML cunoscut sub numele de fișier XML al lucrării. Un job este practic un container care conține pași.
  • Pas: Un pas este un obiect de domeniu care conține o fază independentă, secvențială a sarcinii. Un pas conține toată logica și datele necesare pentru a efectua procesarea efectivă. Definiția unui pas este menținută vagă conform specificației lotului deoarece conținutul unui pas este pur și simplu specific aplicației și poate fi atât de complex sau simplu pe cât dorește dezvoltatorul. Există două tipuri de pași: orientați pe bucăți și orientați pe sarcini.
  • Job Operator: Oferă o interfață pentru gestionarea tuturor aspectelor de procesare a lucrărilor, care include comenzi operaționale, cum ar fi start, restart și stop, precum și comenzi de arhivare a lucrărilor, cum ar fi recuperarea execuțiilor lucrărilor și etapelor.
  • Job Repository: Acesta conține informații despre lucrările care se execută în prezent și date istorice despre lucrări. JobOperator oferă API-uri pentru a accesa acest depozit. Un JobRepository ar putea fi implementat folosind, o bază de date sau un sistem de fișiere.

Secțiunea următoare va ajuta la înțelegerea unor caractere comune ale unei arhitecturi de tip batch.

1.3 Pași în lucrare

Un pas este o fază independentă a unei lucrări. După cum s-a discutat mai sus, există două tipuri de pași într-un Job. Vom încerca să înțelegem ambele tipuri în detaliu mai jos.

1.3.1 Pași orientați pe bucăți

Pasii pe bucăți vor citi și procesa câte un element la un moment dat și vor grupa rezultatele într-un chunk. Rezultatele sunt apoi stocate atunci când chunk-ul atinge o dimensiune predefinită. Procesarea orientată pe bucăți face ca stocarea rezultatelor să fie mai eficientă atunci când setul de date este imens. Ea conține trei părți.

  • Lectorul de elemente citește datele de intrare una după alta dintr-o sursă de date care poate fi o bază de date, un fișier plat, un fișier jurnal etc.
  • Procesorul va procesa datele una câte una pe baza logicii de afaceri definite.
  • Un scriitor scrie datele în bucăți. Dimensiunea chunk-ului este predefinită și este configurabilă

Ca parte a pașilor chunk, există puncte de control care furnizează informații cadrului pentru finalizarea chunk-urilor. Dacă există o eroare în timpul procesării unui chunk, procesul poate reporni pe baza ultimului punct de control.

1.3.2 Etape orientate pe sarcini

Execută alte sarcini decât procesarea elementelor dintr-o sursă de date. Care include crearea sau eliminarea de directoare, mutarea fișierelor, crearea sau ștergerea tabelelor din baza de date etc. Etapele de tip task nu sunt, de obicei, de lungă durată în comparație cu etapele de tip chunk.

Într-un scenariu normal, etapele orientate spre task sunt utilizate după etapele orientate spre chunk atunci când este necesară o curățare. De exemplu, primim fișiere jurnal ca ieșire a unei aplicații. Etapele de tip chunk sunt utilizate pentru procesarea datelor și pentru a obține informații semnificative din fișierele jurnal.

Etapa de tip task este apoi utilizată pentru a curăța fișierele jurnal mai vechi care nu mai sunt necesare.

1.3.3 Procesare paralelă

Locurile de lucru pe loturi efectuează adesea operații de calcul costisitoare și procesează cantități mari de date. Aplicațiile pe loturi pot beneficia de procesarea paralelă în două scenarii.

  • Etapele care sunt independente prin natura lor pot rula pe diferite fire de execuție.
  • Etapele orientate pe bucăți în care procesarea fiecărui element este independentă de rezultatele procesării elementelor anterioare pot rula pe mai multe fire de execuție.

Procesarea pe loturi ajută la terminarea sarcinilor și la efectuarea mai rapidă a operațiilor pentru date uriașe.

Unelte și tehnologii

Să ne uităm la tehnologiile și unealta folosite pentru construirea programului.

  • Eclipse Oxygen.2 Release (4.7.2)
  • Java – versiunea 9.0.4
  • Gradle- 4.3
  • Spring boot – 2.0.1-Release
  • Bază de date HSQL

Structura proiectului

Structura proiectului va arăta așa cum se arată în imaginea de mai jos.

Structura proiectului pentru Java Batch

Structura proiectului de mai sus folosește Gradle. Acest proiect poate fi creat, de asemenea, utilizând maven, iar build.gralde va fi înlocuit cu fișierul pom.xml. Structura proiectului va fi ușor amânată odată cu utilizarea Maven pentru compilare.

Un obiectiv al programului

Ca parte a programului, vom încerca să creăm o aplicație java batch simplă folosind spring boot. Această aplicație va efectua următoarele sarcini.

  1. Citire: – Citirea datelor angajaților dintr-un fișier CSV.
  2. Prelucrarea datelor: – Convertește datele angajaților în majuscule.
  3. Scrie: – Scrieți datele prelucrate ale angajaților înapoi în baza de date.

4.1 Gradle build

Utilizăm Gradle pentru build ca parte a programului. Fișierul build.gradle va arăta așa cum se arată mai jos.

build.gradle

buildscript { repositories { mavenCentral() } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:2.0.1.RELEASE") }}apply plugin: 'java'apply plugin: 'eclipse'apply plugin: 'idea'apply plugin: 'org.springframework.boot'apply plugin: 'io.spring.dependency-management'bootJar { baseName = 'java-batch' version = '1.0'}repositories { mavenCentral()}sourceCompatibility = 1.8targetCompatibility = 1.8dependencies { compile("org.springframework.boot:spring-boot-starter-batch") compile("org.hsqldb:hsqldb") testCompile("junit:junit")}

În fișierul build.gradle de mai sus, apply plugin: 'java' ne spune plugin-ul care trebuie setat. Pentru noi, este vorba de pluginul Java.
repositories{} ne permite să cunoaștem depozitul din care trebuie să fie extrasă dependența. Noi am ales mavenCentral pentru a extrage jar-urile de dependență. Putem folosi jcenter, de asemenea, pentru a extrage borcanele de dependență respective.

dependencies {} tagul

dependencies {} este utilizat pentru a furniza detaliile necesare ale fișierelor jar care trebuie extrase pentru proiect. apply plugin: 'org.springframework.boot' acest plugin este utilizat pentru specificarea unui proiect spring-boot. boot jar{} va specifica proprietățile jar-ului care va fi generat în urma compilării.

4.2 Fișier de date de probă

Pentru a furniza date pentru faza de citire, vom folosi un fișier CSV care conține datele angajaților.

Fisierul va arăta așa cum se arată mai jos.

Fisier CSV de probă

John,FosterJoe,ToyJustin,TaylorJane,ClarkJohn,Steve

Fisierul de date de probă conține numele și prenumele angajatului. Vom folosi aceleași date pentru procesare și apoi pentru inserare în baza de date.

4.3 Scripturi SQL

Utilizăm baza de date HSQL, care este o bază de date bazată pe memorie. Scriptul va arăta așa cum se arată mai jos.

Scriptul SQL

DROP TABLE employee IF EXISTS;CREATE TABLE employee ( person_id BIGINT IDENTITY NOT NULL PRIMARY KEY, first_name VARCHAR(20), last_name VARCHAR(20));

Spring Boot rulează schema-@@platform@@.sql automat la pornire. -all este implicit pentru toate platformele. Așadar, crearea tabelului se va întâmpla de la sine atunci când aplicația pornește și va fi disponibilă până când aplicația este în funcțiune.

4.4 Clasa model

Vom crea o clasă Employee.java ca clasă model. Clasa va arăta așa cum se arată mai jos.

Clasa model pentru program

package com.batch;public class Employee { private String lastName; private String firstName; public Employee() { } public Employee(String firstName, String lastName) { this.firstName = firstName; this.lastName = lastName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getFirstName() { return firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } @Override public String toString() { return "firstName: " + firstName + ", lastName: " + lastName; }}

@Override este utilizată pentru a suprascrie implementarea implicită a metodei toString().

4.5 Clasa de configurare

Vom crea o clasă BatchConfiguration.java care va fi clasa de configurare pentru procesarea pe loturi. Fișierul java va arăta așa cum se arată mai jos.

BatchConfiguration.java

package com.batch.config;import javax.sql.DataSource;import org.springframework.batch.core.Job;import org.springframework.batch.core.JobExecutionListener;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.core.launch.support.RunIdIncrementer;import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;import org.springframework.batch.item.database.JdbcBatchItemWriter;import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;import org.springframework.batch.item.file.FlatFileItemReader;import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;import org.springframework.batch.item.file.mapping.DefaultLineMapper;import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.ClassPathResource;import org.springframework.jdbc.core.JdbcTemplate;import com.batch.Employee;import com.batch.processor.EmployeeItemProcessor;@Configuration@EnableBatchProcessingpublic class BatchConfiguration { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; // tag::readerwriterprocessor @Bean public FlatFileItemReader reader() { return new FlatFileItemReaderBuilder() .name("EmployeeItemReader") .resource(new ClassPathResource("sample-data.csv")) .delimited() .names(new String{"firstName", "lastName"}) .fieldSetMapper(new BeanWrapperFieldSetMapper() {{ setTargetType(Employee.class); }}) .build(); } @Bean public EmployeeItemProcessor processor() { return new EmployeeItemProcessor(); } @Bean public JdbcBatchItemWriter writer(DataSource dataSource) { return new JdbcBatchItemWriterBuilder() .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()) .sql("INSERT INTO employee (first_name, last_name) VALUES (:firstName, :lastName)") .dataSource(dataSource) .build(); } // end::readerwriterprocessor // tag::jobstep @Bean public Job importUserJob(JobCompletionNotificationListener listener, Step step1) { return jobBuilderFactory.get("importUserJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(step1) .end() .build(); } @Bean public Step step1(JdbcBatchItemWriter writer) { return stepBuilderFactory.get("step1") .<Employee, Employee> chunk(10) .reader(reader()) .processor(processor()) .writer(writer) .build(); } // end::jobstep}

@EnableBatchProcessing adnotarea este utilizată pentru activarea procesării pe loturi.
JobBuilderFactory este fabrica care este utilizată pentru construirea unui job.
StepBuilderFactory este utilizată pentru crearea pașilor.
Metoda step1() are o proprietate chunk(). Aceasta este proprietatea utilizată pentru fragmentarea intrării într-o dimensiune definită. Pentru noi, dimensiunea este 10.

4.6 Procesorul de elemente

Procesorul de elemente este o interfață care va fi responsabilă pentru procesarea datelor. Vom implementa interfața în EmployeeItemProcessor.java. Clasa java va arăta așa cum se arată mai jos.

EmployeeItemProcessor.java

package com.batch.processor;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.batch.item.ItemProcessor;import com.batch.Employee;public class EmployeeItemProcessor implements ItemProcessor<Employee, Employee> { private static final Logger log = LoggerFactory.getLogger(EmployeeItemProcessor.class); @Override public Employee process(Employee emp) throws Exception { final String firstName = emp.getFirstName().toUpperCase(); final String lastName = emp.getLastName().toUpperCase(); final Employee transformedEmployee = new Employee(firstName, lastName); log.info("Converting (" + emp + ") into (" + transformedEmployee + ")"); return transformedEmployee; }}

În metoda process() vom obține datele și le vom transforma în nume cu majuscule.

4.7 Clasa JobExecutionSupportListener

JobExecutionListenerSupport este interfața care va notifica atunci când sarcina este finalizată. Ca parte a interfeței, avem metoda afterJob. Această metodă este utilizată pentru a afișa finalizarea lucrării.

JobCompletionNotificationListener.java

package com.batch.config;import java.util.List;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.batch.core.BatchStatus;import org.springframework.batch.core.JobExecution;import org.springframework.batch.core.listener.JobExecutionListenerSupport;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jdbc.core.JdbcTemplate;import org.springframework.jdbc.core.RowMapper;import org.springframework.stereotype.Component;import com.batch.Employee;@Componentpublic class JobCompletionNotificationListener extends JobExecutionListenerSupport {private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);private final JdbcTemplate jdbcTemplate;@Autowiredpublic JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {this.jdbcTemplate = jdbcTemplate;}@Overridepublic void afterJob(JobExecution jobExecution) {RowMapper rowMapper = (rs, rowNum) -> {Employee e = new Employee();e.setFirstName(rs.getString(1));e.setLastName(rs.getString(2));return e;};if(jobExecution.getStatus() == BatchStatus.COMPLETED) {log.info("!!! JOB FINISHED! Time to verify the results");List empList= jdbcTemplate.query("SELECT first_name, last_name FROM employee",rowMapper);log.info("Size of List "+empList.size());for (Employee emp: empList) {log.info("Found: "+emp.getFirstName()+" "+emp.getLastName());}}}}

În această metodă, obținem datele din baza de date după finalizarea lucrării și imprimăm rezultatul pe consolă pentru a verifica procesarea care a fost efectuată asupra datelor.

4.8 Clasa de aplicație

Vom crea o clasă de aplicație care va conține metoda principală responsabilă pentru declanșarea programului java batch. Clasa va arăta așa cum se arată mai jos.

Application.java

package com.batch;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class Application { public static void main(String args) throws Exception { SpringApplication.run(Application.class, args); }}

@SpringBootApplication este adnotarea folosită pentru a specifica un program ca fiind un program spring boot.

Output

Să executăm aplicația ca o aplicație Java. Vom obține următoarea ieșire pe consolă.

Lovitura programului JavaBatch

Fluxul de lucru al programului batch este foarte clar disponibil în ieșire. Job-ul începe cu importUserJob, apoi începe execuția pasului 1 în care se convertește datele citite în majuscule.

Post-procesarea pasului, putem vedea rezultatul cu majuscule pe consolă.

Summary

În acest tutorial, am învățat următoarele lucruri:

  1. Java batch conține Jobs care pot conține mai mulți pași.
  2. Care pas este o combinație de citire, procesare, scriere.
  3. Putem fragmenta datele în diferite dimensiuni pentru procesare.

7. Descărcați proiectul Eclipse

Acesta a fost un tutorial pentru JavaBatch cu SpringBoot.

.