Java Batch Tutorial

A mai világban az internet megváltoztatta az életünket, és ennek egyik fő oka az internet használata a legtöbb napi feladathoz. Ez hatalmas mennyiségű feldolgozható adathoz vezetett.

Azok a példák, ahol hatalmas mennyiségű adatról van szó, a fizetési számlák, bankszámlakivonatok, kamatszámítás stb. feldolgozása. Képzeljük el, ha mindezeket a feladatokat kézzel kellene elvégezni, akkor ezeknek a munkáknak a befejezése évszázadokig tartana.

Hogyan történik ez a mai korban? A válasz a kötegelt feldolgozás.

Bevezetés

A kötegelt feldolgozás tömeges adatokon történik, kézi beavatkozás nélkül, és hosszú ideig tart. Lehet adat- vagy számításigényes. A kötegelt feladatok előre meghatározott ütemezés szerint futtathatók vagy igény szerint indíthatók. Továbbá, mivel a kötegelt feladatok általában hosszú futásúak, a folyamatos ellenőrzések és a bizonyos hiba esetén történő újraindítás a kötegelt feladatok általános jellemzői.

1.1 A Java Batch feldolgozás története

A Java platformon történő kötegelt feldolgozás a Java EE 7 platform részét képező JSR 352 specifikáció részeként került bevezetésre, amely meghatározza a kötegelt alkalmazások programozási modelljét, valamint egy futási időt a kötegelt feladatok futtatásához és kezeléséhez.

1.2 A Java Batch felépítése

Az alábbi ábra a kötegelt feldolgozás alapvető összetevőit mutatja.

A Java kötegelt feldolgozás architektúrája

A kötegelt alkalmazások architektúrája olyan kötegelt feldolgozási problémákat old meg, mint a feladatok, lépések, tárolók, olvasó-feldolgozó-író minták, darabok, ellenőrző pontok, párhuzamos feldolgozás, áramlás, újrapróbálkozás, szekvenálás, particionálás stb.

Megértjük az architektúra áramlását.

  • A munkaadattár tartalmazza a futtatandó feladatokat.
  • JobLauncher kihúz egy feladatot a munkaadattárból.
  • Minden feladat tartalmaz lépéseket. A lépések a ItemReader, ItemProcessor és ItemWriter.
  • Item Reader az, amelyik beolvassa az adatokat.
  • Item Process az, amelyik az üzleti logika alapján feldolgozza az adatokat.
  • Az Item writer írja vissza az adatokat a meghatározott forrásba.

1.3 Kötegelt feldolgozási komponensek.

Most megpróbáljuk részletesen megérteni a kötegelt feldolgozási komponenseket.

  • Job: A feladat magában foglalja a teljes kötegelt feldolgozási folyamatot. Egy vagy több lépést tartalmaz. A feladatot egy JSL (Job Specification Language) segítségével állítjuk össze, amely meghatározza a lépések végrehajtási sorrendjét. A JSR 352-ben a JSL egy XML-fájlban, az úgynevezett feladat XML-fájlban van megadva. A feladat alapvetően egy lépéseket tartalmazó konténer.
  • Step: A lépés egy olyan tartományobjektum, amely a feladat egy független, szekvenciális fázisát tartalmazza. Egy lépés tartalmazza az összes szükséges logikát és adatot a tényleges feldolgozás elvégzéséhez. A lépés definíciója a kötegspecifikációnak megfelelően homályos marad, mivel a lépés tartalma tisztán alkalmazásspecifikus, és olyan összetett vagy egyszerű lehet, amilyet a fejlesztő akar. Kétféle lépés létezik: darabos és feladatorientált.
  • Job Operator: A feladatfeldolgozás minden aspektusának kezelésére szolgáló felületet biztosít, amely magában foglalja az operatív parancsokat, mint például az indítás, újraindítás és leállítás, valamint a feladattároló parancsokat, mint például a feladat- és lépésvégrehajtások lekérdezése.
  • Feladattároló: A jelenleg futó munkákról szóló információkat és a munkára vonatkozó múltbeli adatokat tartalmazza. JobOperator API-kat biztosít a tárolóhoz való hozzáféréshez. Egy JobRepository megvalósítható, egy adatbázis vagy egy fájlrendszer segítségével.

A következő szakasz segít a kötegelt architektúra néhány általános karakterének megértésében.

1.3 Lépések a munkában

A lépés a munkának egy független fázisa. A fentiekben tárgyaltak szerint kétféle lépéstípus létezik egy Jobban. Az alábbiakban mindkét típust megpróbáljuk részletesen megérteni.

1.3.1 Chunk-orientált lépések

A chunk-lépések egyszerre egy elemet olvasnak be és dolgoznak fel, és az eredményeket egy chunkba csoportosítják. Az eredmények akkor kerülnek tárolásra, amikor a fürt elér egy előre meghatározott méretet. A daraborientált feldolgozás hatékonyabbá teszi az eredmények tárolását, ha az adathalmaz hatalmas. Három részből áll.

  • A tételolvasó egymás után olvassa be a bemenetet egy adatforrásból, amely lehet adatbázis, síkfájl, naplófájl stb.
  • A feldolgozó a meghatározott üzleti logika alapján egyesével dolgozza fel az adatokat.
  • Az író az adatokat darabokban írja. A chunk mérete előre meghatározott és konfigurálható

A chunk lépések részeként vannak ellenőrző pontok, amelyek információt szolgáltatnak a keretrendszer számára a chunkok befejezéséről. Ha a chunk feldolgozása során hiba lép fel, a folyamat az utolsó ellenőrzőpont alapján újraindítható.

1.3.2 Feladatorientált lépések

Az adatforrásból származó elemek feldolgozásán kívül más feladatot is végrehajt. Ami magában foglalja a könyvtárak létrehozását vagy eltávolítását, fájlok mozgatását, adatbázis táblák létrehozását vagy törlését stb. A feladat-orientált lépések általában nem hosszú futamidejűek a darabos lépésekhez képest.

A feladat-orientált lépéseket normál esetben a darabos lépések után használják, amikor tisztításra van szükség. Például egy alkalmazás kimeneteként naplófájlokat kapunk. A darabos lépéseket az adatok feldolgozására és a naplófájlokból értelmes információk kinyerésére használjuk.

A feladatorientált lépést ezután a régebbi, már nem szükséges naplófájlok megtisztítására használjuk.

1.3.3 Párhuzamos feldolgozás

A kötegelt feladatok gyakran drága számítási műveleteket végeznek és nagy mennyiségű adatot dolgoznak fel. A kötegelt alkalmazások két esetben profitálhatnak a párhuzamos feldolgozásból.

  • A független jellegű lépések különböző szálakon futhatnak.
  • A daraborientált lépések, ahol az egyes elemek feldolgozása független az előző elemek feldolgozásának eredményétől, egynél több szálon futhatnak.

A kötegelt feldolgozás segít a feladatok befejezésében és a műveletek gyorsabb elvégzésében hatalmas adatok esetén.

Szerszámok és technológiák

Nézzük meg a program elkészítéséhez használt technológiákat és eszközöket.

  • Eclipse Oxygen.2 Release (4.7.2)
  • Java – 9.0.4 verzió
  • Gradle- 4.3
  • Spring boot – 2.0.1-Release
  • HSQL adatbázis

Projekt felépítése

A projekt felépítése az alábbi képen látható módon fog kinézni.

Projektstruktúra a Java Batch számára

A fenti projektstruktúra a Gradle-t használja. Ez a projekt a maven segítségével is létrehozható, és a build.gralde fájl helyébe a pom.xml fájl kerül. A projekt felépítése kissé eltolódik a Maven használatával a buildhez.

A program célja

A program részeként egy egyszerű java batch alkalmazást próbálunk létrehozni a spring boot segítségével. Ez az alkalmazás a következő feladatokat fogja elvégezni.

  1. Olvasás: – Alkalmazotti adatok beolvasása egy CSV fájlból.
  2. Az adatok feldolgozása: – Az alkalmazottak adatainak nagybetűvé alakítása.
  3. Írása: – A feldolgozott munkavállalói adatok visszaírása az adatbázisba.

4.1 Gradle build

A program részeként Gradle-t használunk a buildhez. A build.gradle fájl az alábbiakban látható módon fog kinézni.

build.gradle

buildscript { repositories { mavenCentral() } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:2.0.1.RELEASE") }}apply plugin: 'java'apply plugin: 'eclipse'apply plugin: 'idea'apply plugin: 'org.springframework.boot'apply plugin: 'io.spring.dependency-management'bootJar { baseName = 'java-batch' version = '1.0'}repositories { mavenCentral()}sourceCompatibility = 1.8targetCompatibility = 1.8dependencies { compile("org.springframework.boot:spring-boot-starter-batch") compile("org.hsqldb:hsqldb") testCompile("junit:junit")}

A fenti build.gradle fájlban apply plugin: 'java' közli velünk a beállítandó plugint. Nálunk ez a Java plugin.
repositories{} közli velünk a tárolót, ahonnan a függőséget ki kell húzni. Mi a mavenCentral-t választottuk a függőségi jar-ek lehívásához. Használhatjuk a jcenter-t is a megfelelő függőségi jar-ek lehívásához.

dependencies {} tag a szükséges jar-fájlok adatainak megadására szolgál, amelyeket a projekthez kell lehívni. apply plugin: 'org.springframework.boot' ezt a bővítményt a spring-boot projekt megadására használjuk. boot jar{} megadja annak a jarnak a tulajdonságait, amely a build során generálásra kerül.

4.2 Minta adatfájl

Az olvasási fázis adatainak megadásához egy CSV fájlt fogunk használni, amely az alkalmazottak adatait tartalmazza.

A fájl az alábbiakban látható módon fog kinézni.

Minta CSV fájl

John,FosterJoe,ToyJustin,TaylorJane,ClarkJohn,Steve

A minta adatfájl tartalmazza az alkalmazottak kereszt- és vezetéknevét. Ugyanezeket az adatokat fogjuk felhasználni a feldolgozáshoz, majd az adatbázisba való beillesztéshez.

4.3 SQL szkriptek

A HSQL adatbázist használjuk, amely egy memória alapú adatbázis. A szkript az alábbiakban látható módon fog kinézni.

SQL szkript

DROP TABLE employee IF EXISTS;CREATE TABLE employee ( person_id BIGINT IDENTITY NOT NULL PRIMARY KEY, first_name VARCHAR(20), last_name VARCHAR(20));

A Spring Boot indításkor automatikusan lefut schema-@@platform@@.sql. A -all az alapértelmezett minden platformon. Tehát a tábla létrehozása magától megtörténik az alkalmazás indításakor, és addig lesz elérhető, amíg az alkalmazás fut.

4.4 Modell osztály

Egy Employee.java osztályt fogunk létrehozni modell osztályként. Az osztály az alábbiakban látható módon fog kinézni.

Modell osztály a programhoz

package com.batch;public class Employee { private String lastName; private String firstName; public Employee() { } public Employee(String firstName, String lastName) { this.firstName = firstName; this.lastName = lastName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getFirstName() { return firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } @Override public String toString() { return "firstName: " + firstName + ", lastName: " + lastName; }}

@Override a toString() metódus alapértelmezett implementációjának felülírására szolgál.

4.5 Konfigurációs osztály

Elkészítünk egy BatchConfiguration.java osztályt, amely a kötegelt feldolgozás konfigurációs osztálya lesz. A java fájl az alábbi módon fog kinézni.

BatchConfiguration.java

package com.batch.config;import javax.sql.DataSource;import org.springframework.batch.core.Job;import org.springframework.batch.core.JobExecutionListener;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.core.launch.support.RunIdIncrementer;import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;import org.springframework.batch.item.database.JdbcBatchItemWriter;import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;import org.springframework.batch.item.file.FlatFileItemReader;import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;import org.springframework.batch.item.file.mapping.DefaultLineMapper;import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.ClassPathResource;import org.springframework.jdbc.core.JdbcTemplate;import com.batch.Employee;import com.batch.processor.EmployeeItemProcessor;@Configuration@EnableBatchProcessingpublic class BatchConfiguration { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; // tag::readerwriterprocessor @Bean public FlatFileItemReader reader() { return new FlatFileItemReaderBuilder() .name("EmployeeItemReader") .resource(new ClassPathResource("sample-data.csv")) .delimited() .names(new String{"firstName", "lastName"}) .fieldSetMapper(new BeanWrapperFieldSetMapper() {{ setTargetType(Employee.class); }}) .build(); } @Bean public EmployeeItemProcessor processor() { return new EmployeeItemProcessor(); } @Bean public JdbcBatchItemWriter writer(DataSource dataSource) { return new JdbcBatchItemWriterBuilder() .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()) .sql("INSERT INTO employee (first_name, last_name) VALUES (:firstName, :lastName)") .dataSource(dataSource) .build(); } // end::readerwriterprocessor // tag::jobstep @Bean public Job importUserJob(JobCompletionNotificationListener listener, Step step1) { return jobBuilderFactory.get("importUserJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(step1) .end() .build(); } @Bean public Step step1(JdbcBatchItemWriter writer) { return stepBuilderFactory.get("step1") .<Employee, Employee> chunk(10) .reader(reader()) .processor(processor()) .writer(writer) .build(); } // end::jobstep}

@EnableBatchProcessing annotációval engedélyezzük a kötegelt feldolgozást.
JobBuilderFactory a gyár, amelyet a feladat létrehozásához használunk.
StepBuilderFactory a lépés létrehozásához.
A step1() metódusnak van egy chunk() tulajdonsága. Ezt a tulajdonságot a bemenet meghatározott méretűre darabolására használják. Nálunk ez a méret 10.

4.6 Item Processor

Az Item processor egy interfész, amely az adatok feldolgozásáért lesz felelős. Az interfészt a EmployeeItemProcessor.java-ban fogjuk implementálni. A java osztály az alábbi módon fog kinézni.

EmployeeItemProcessor.java

package com.batch.processor;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.batch.item.ItemProcessor;import com.batch.Employee;public class EmployeeItemProcessor implements ItemProcessor<Employee, Employee> { private static final Logger log = LoggerFactory.getLogger(EmployeeItemProcessor.class); @Override public Employee process(Employee emp) throws Exception { final String firstName = emp.getFirstName().toUpperCase(); final String lastName = emp.getLastName().toUpperCase(); final Employee transformedEmployee = new Employee(firstName, lastName); log.info("Converting (" + emp + ") into (" + transformedEmployee + ")"); return transformedEmployee; }}

A process() metódusban megkapjuk az adatokat, és nagybetűs névvé alakítjuk őket.

4.7 JobExecutionSupportListener osztály

JobExecutionListenerSupport az az interfész, amely értesíteni fog, ha a munka befejeződött. Az interfész részeként van afterJob metódusunk. Ezt a metódust használjuk a munka befejezéséről szóló üzenet küldésére.

JobCompletionNotificationListener.java

package com.batch.config;import java.util.List;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.batch.core.BatchStatus;import org.springframework.batch.core.JobExecution;import org.springframework.batch.core.listener.JobExecutionListenerSupport;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jdbc.core.JdbcTemplate;import org.springframework.jdbc.core.RowMapper;import org.springframework.stereotype.Component;import com.batch.Employee;@Componentpublic class JobCompletionNotificationListener extends JobExecutionListenerSupport {private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);private final JdbcTemplate jdbcTemplate;@Autowiredpublic JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {this.jdbcTemplate = jdbcTemplate;}@Overridepublic void afterJob(JobExecution jobExecution) {RowMapper rowMapper = (rs, rowNum) -> {Employee e = new Employee();e.setFirstName(rs.getString(1));e.setLastName(rs.getString(2));return e;};if(jobExecution.getStatus() == BatchStatus.COMPLETED) {log.info("!!! JOB FINISHED! Time to verify the results");List empList= jdbcTemplate.query("SELECT first_name, last_name FROM employee",rowMapper);log.info("Size of List "+empList.size());for (Employee emp: empList) {log.info("Found: "+emp.getFirstName()+" "+emp.getLastName());}}}}

Ezzel a metódussal a munka befejezése után megkapjuk az adatokat az adatbázisból, és az eredményt kiírjuk a konzolra, hogy ellenőrizzük az adatokon végzett feldolgozást.

4.8 Alkalmazási osztály

Elkészítünk egy alkalmazási osztályt, amely tartalmazza a java batch program indításáért felelős fő metódust. Az osztály az alábbiakban látható módon fog kinézni.

Application.java

package com.batch;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class Application { public static void main(String args) throws Exception { SpringApplication.run(Application.class, args); }}

@SpringBootApplication ez az annotáció a program spring boot programként való megadására szolgál.

Output

Futtassuk az alkalmazást Java alkalmazásként. A konzolon a következő kimenetet kapjuk.

A JavaBatch program kimenete

A kimeneten nagyon jól látható a batch program munkafolyamata. A Job a importUserJob-vel indul, majd az 1. lépés végrehajtása kezdődik, ahol a beolvasott adatokat nagybetűvé alakítja.

A lépés feldolgozása után a konzolon láthatjuk a nagybetűs eredményt.

Összefoglaló

Az oktatóanyagban a következő dolgokat tanultuk meg:

  1. A Java batch olyan Jobokat tartalmaz, amelyek több lépést is tartalmazhatnak.
  2. Minden lépés az olvasás, feldolgozás, írás kombinációja.
  3. Az adatokat különböző méretűre darabolhatjuk a feldolgozáshoz.

7. Az Eclipse projekt letöltése

Ez volt a JavaBatch és a SpringBoot bemutatója.