En el mundo actual Internet ha cambiado la forma en que vivimos nuestras vidas y una de las principales razones es el uso de Internet para la mayoría de las tareas diarias. Esto conduce a una enorme cantidad de datos disponibles para su procesamiento.
Algunos de los ejemplos en los que se involucran enormes datos son el procesamiento de nóminas, extractos bancarios, cálculo de intereses, etc. Imagínese que todos estos trabajos se tuvieran que hacer manualmente, y que se tardaran años en terminar estos trabajos.
¿Cómo se hace en la época actual? La respuesta es el Procesamiento por Lotes.
- Introducción
- 1.1 Historia del procesamiento por lotes en Java
- 1.2 Arquitectura de Java Batch
- 1.3 Componentes de procesamiento por lotes.
- 1.3 Pasos en el Trabajo
- 1.3.1 Pasos orientados a trozos
- 1.3.2 Pasos orientados a tareas
- 1.3.3 Procesamiento paralelo
- Herramientas y tecnologías
- Estructura del proyecto
- Objetivo del Programa
- 4.1 Gradle build
- 4.2 Archivo de datos de muestra
- 4.3 Scripts SQL
- 4.4 Clase Modelo
- 4.5 Clase de configuración
- 4.6 Procesador de elementos
- 4.7 Clase JobExecutionSupportListener
- 4.8 Clase de aplicación
- Output
- Resumen
- 7. Descargar el proyecto Eclipse
Introducción
El procesamiento por lotes se realiza sobre datos a granel, sin intervención manual, y de larga duración. Puede ser intensivo en datos o en computación. Los trabajos por lotes pueden ejecutarse según una programación predefinida o pueden iniciarse bajo demanda. Además, como los trabajos por lotes suelen ser trabajos de larga duración, las comprobaciones constantes y el reinicio a partir de un determinado fallo son características comunes que se encuentran en los trabajos por lotes.
1.1 Historia del procesamiento por lotes en Java
El procesamiento por lotes para la plataforma Java se introdujo como parte de la especificación JSR 352, parte de la plataforma Java EE 7, define el modelo de programación para aplicaciones por lotes más un tiempo de ejecución para ejecutar y gestionar los trabajos por lotes.
1.2 Arquitectura de Java Batch
El siguiente diagrama muestra los componentes básicos para el procesamiento por lotes.
Arquitectura para el procesamiento por lotes en Java
La arquitectura para aplicaciones por lotes resuelve problemas de procesamiento por lotes como trabajos, pasos, repositorios, patrones de lector procesador escritor, chunks, checkpoints, procesamiento paralelo, flujo, reintentos, secuenciación, partición, etc.
Entendamos el flujo de la arquitectura.
- El repositorio de trabajos contiene los trabajos que necesitan ser ejecutados.
-
JobLauncher
Saca un trabajo del repositorio de trabajos. - Cada trabajo contiene pasos. Los pasos son
ItemReader
,ItemProcessor
yItemWriter
. - Item Reader es el que lee los datos.
- Item Process es el que procesará los datos basados en la lógica de negocio.
- El Escritor de artículos escribirá los datos de vuelta a la fuente definida.
1.3 Componentes de procesamiento por lotes.
Ahora trataremos de entender los componentes de procesamiento por lotes en detalle.
- Trabajo: Un trabajo comprende todo el proceso por lotes. Contiene uno o más pasos. Un trabajo se compone de un Lenguaje de Especificación de Trabajo (JSL) que especifica el orden en que deben ejecutarse los pasos. En JSR 352, el JSL se especifica en un archivo XML conocido como archivo XML del trabajo. Un trabajo es básicamente un contenedor que contiene pasos.
- Paso: Un paso es un objeto de dominio que contiene una fase independiente y secuencial del trabajo. Un paso contiene toda la lógica y los datos necesarios para realizar el procesamiento real. La definición de un paso se mantiene vaga según la especificación del lote porque el contenido de un paso es puramente específico de la aplicación y puede ser tan complejo o simple como el desarrollador quiera. Hay dos tipos de pasos: los orientados a trozos y los orientados a tareas.
- Operador de trabajos: Proporciona una interfaz para gestionar todos los aspectos del procesamiento de trabajos, que incluye comandos operativos, como iniciar, reiniciar y detener, así como comandos del repositorio de trabajos, como la recuperación de ejecuciones de trabajos y pasos.
- Repositorio de trabajos: Contiene información sobre los trabajos que se están ejecutando actualmente y datos históricos sobre el trabajo.
JobOperator
proporciona APIs para acceder a este repositorio. UnJobRepository
podría ser implementado usando, una base de datos o un sistema de archivos.
La siguiente sección ayudará a entender algunos caracteres comunes de una arquitectura de lotes.
1.3 Pasos en el Trabajo
Un Paso es una fase independiente de un Trabajo. Como se ha comentado anteriormente, hay dos tipos de pasos en un Job. Trataremos de entender ambos tipos en detalle a continuación.
1.3.1 Pasos orientados a trozos
Los pasos a trozos leerán y procesarán un elemento a la vez y agruparán los resultados en un trozo. Los resultados se almacenan cuando el chunk alcanza un tamaño predefinido. El procesamiento orientado a trozos hace que el almacenamiento de los resultados sea más eficiente cuando el conjunto de datos es enorme. Contiene tres partes.
- El lector de elementos lee la entrada uno tras otro desde una fuente de datos que puede ser una base de datos, un archivo plano, un archivo de registro, etc.
- El procesador procesará los datos uno por uno basándose en la lógica de negocio definida.
- Un escritor escribe los datos en trozos. El tamaño del chunk está predefinido y es configurable
Como parte de los pasos del chunk, hay puntos de control que proporcionan información al framework para la finalización de los chunks. Si hay un error durante el procesamiento de un chunk, el proceso puede reiniciarse basándose en el último checkpoint.
1.3.2 Pasos orientados a tareas
Ejecuta tareas distintas al procesamiento de elementos de una fuente de datos. Lo que incluye la creación o eliminación de directorios, mover archivos, crear o eliminar tablas de la base de datos, etc. Los pasos de tareas no suelen ser de larga duración en comparación con los pasos de trozos.
En un escenario normal, los pasos orientados a tareas se utilizan después de los pasos orientados a trozos cuando se necesita una limpieza. Por ejemplo, obtenemos archivos de registro como salida de una aplicación. Los pasos de trozos se utilizan para procesar los datos y obtener información significativa de los archivos de registro.
El paso de tarea se utiliza entonces para limpiar los archivos de registro más antiguos que ya no son necesarios.
1.3.3 Procesamiento paralelo
Los trabajos por lotes suelen realizar operaciones de cálculo costosas y procesan grandes cantidades de datos. Las aplicaciones por lotes pueden beneficiarse del procesamiento paralelo en dos escenarios.
- Los pasos que son independientes por naturaleza pueden ejecutarse en diferentes hilos.
- Los pasos orientados a trozos en los que el procesamiento de cada elemento es independiente de los resultados del procesamiento de los elementos anteriores pueden ejecutarse en más de un hilo.
El procesamiento por lotes ayuda a terminar las tareas y realizar las operaciones más rápidamente para datos enormes.
Herramientas y tecnologías
Veamos las tecnologías y la herramienta utilizada para construir el programa.
- Eclipse Oxygen.2 Release (4.7.2)
- Java – versión 9.0.4
- Gradle- 4.3
- Spring boot – 2.0.1-Release
- Base de datos HSQL
Estructura del proyecto
La estructura del proyecto quedará como se muestra en la siguiente imagen.
Estructura del proyecto para Java Batch
La estructura del proyecto anterior está utilizando Gradle. Este proyecto también puede ser creado usando maven y el build.gralde será reemplazado por el archivo pom.xml. La estructura del proyecto se diferirá ligeramente con el uso de Maven para la construcción.
Objetivo del Programa
Como parte del programa, trataremos de crear una simple aplicación java por lotes usando spring boot. Esta aplicación realizará las siguientes tareas.
- Leer: – Leer los datos de los empleados de un archivo CSV.
- Procesar los datos: – Convertir los datos de los empleados en todas las mayúsculas.
- Escribir: – Escribir los datos de los empleados procesados de nuevo en la base de datos.
4.1 Gradle build
Estamos utilizando Gradle para la construcción como parte del programa. El archivo build.gradle
tendrá el aspecto que se muestra a continuación.
build.gradle
buildscript { repositories { mavenCentral() } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:2.0.1.RELEASE") }}apply plugin: 'java'apply plugin: 'eclipse'apply plugin: 'idea'apply plugin: 'org.springframework.boot'apply plugin: 'io.spring.dependency-management'bootJar { baseName = 'java-batch' version = '1.0'}repositories { mavenCentral()}sourceCompatibility = 1.8targetCompatibility = 1.8dependencies { compile("org.springframework.boot:spring-boot-starter-batch") compile("org.hsqldb:hsqldb") testCompile("junit:junit")}
En el archivo build.gradle
anterior apply plugin: 'java'
nos indica el plugin que hay que configurar. Para nosotros, es el plugin de Java.repositories{}
nos permite conocer el repositorio del que se debe extraer la dependencia. Hemos elegido mavenCentral
para extraer los jars de la dependencia. Podemos usar jcenter
también para extraer los respectivos jars de dependencia.
dependencies {}
etiqueta se utiliza para proporcionar los detalles necesarios del archivo jar que debe ser tirado para el proyecto. apply plugin: 'org.springframework.boot'
este plugin se utiliza para especificar un proyecto spring-boot. boot jar{}
especificará las propiedades del jar que se generará a partir de la compilación.
4.2 Archivo de datos de muestra
Con el fin de proporcionar datos para la fase de lectura, utilizaremos un archivo CSV que contiene datos de los empleados.
El archivo tendrá el aspecto que se muestra a continuación.
Archivo CSV de muestra
John,FosterJoe,ToyJustin,TaylorJane,ClarkJohn,Steve
El archivo de datos de muestra contiene el nombre y el apellido del empleado. Utilizaremos los mismos datos para procesarlos y luego insertarlos en la base de datos.
4.3 Scripts SQL
Estamos utilizando la base de datos HSQL que es una base de datos basada en memoria. El script se verá como se muestra a continuación.
Script SQL
DROP TABLE employee IF EXISTS;CREATE TABLE employee ( person_id BIGINT IDENTITY NOT NULL PRIMARY KEY, first_name VARCHAR(20), last_name VARCHAR(20));
Spring Boot ejecuta schema-@@platform@@.sql
automáticamente cuando se inicia. -all
es el predeterminado para todas las plataformas. Así que la creación de la tabla ocurrirá por sí sola cuando la aplicación se inicie y estará disponible hasta que la aplicación esté en funcionamiento.
4.4 Clase Modelo
Vamos a crear una clase Employee.java
como clase modelo. La clase tendrá el aspecto que se muestra a continuación.
Clase modelo para el programa
package com.batch;public class Employee { private String lastName; private String firstName; public Employee() { } public Employee(String firstName, String lastName) { this.firstName = firstName; this.lastName = lastName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getFirstName() { return firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } @Override public String toString() { return "firstName: " + firstName + ", lastName: " + lastName; }}
@Override
se utiliza para anular la implementación por defecto del método toString()
.
4.5 Clase de configuración
Vamos a crear una clase BatchConfiguration.java
que será la clase de configuración para el procesamiento por lotes. El archivo java se verá como se muestra a continuación.
BatchConfiguration.java
package com.batch.config;import javax.sql.DataSource;import org.springframework.batch.core.Job;import org.springframework.batch.core.JobExecutionListener;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.core.launch.support.RunIdIncrementer;import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;import org.springframework.batch.item.database.JdbcBatchItemWriter;import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;import org.springframework.batch.item.file.FlatFileItemReader;import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;import org.springframework.batch.item.file.mapping.DefaultLineMapper;import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.ClassPathResource;import org.springframework.jdbc.core.JdbcTemplate;import com.batch.Employee;import com.batch.processor.EmployeeItemProcessor;@Configuration@EnableBatchProcessingpublic class BatchConfiguration { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; // tag::readerwriterprocessor @Bean public FlatFileItemReader reader() { return new FlatFileItemReaderBuilder() .name("EmployeeItemReader") .resource(new ClassPathResource("sample-data.csv")) .delimited() .names(new String{"firstName", "lastName"}) .fieldSetMapper(new BeanWrapperFieldSetMapper() {{ setTargetType(Employee.class); }}) .build(); } @Bean public EmployeeItemProcessor processor() { return new EmployeeItemProcessor(); } @Bean public JdbcBatchItemWriter writer(DataSource dataSource) { return new JdbcBatchItemWriterBuilder() .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()) .sql("INSERT INTO employee (first_name, last_name) VALUES (:firstName, :lastName)") .dataSource(dataSource) .build(); } // end::readerwriterprocessor // tag::jobstep @Bean public Job importUserJob(JobCompletionNotificationListener listener, Step step1) { return jobBuilderFactory.get("importUserJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(step1) .end() .build(); } @Bean public Step step1(JdbcBatchItemWriter writer) { return stepBuilderFactory.get("step1") .<Employee, Employee> chunk(10) .reader(reader()) .processor(processor()) .writer(writer) .build(); } // end::jobstep}
@EnableBatchProcessing
la anotación se utiliza para habilitar el procesamiento por lotes.JobBuilderFactory
es la fábrica que se utiliza para construir un trabajo.StepBuilderFactory
se utiliza para la creación de pasos.
El método step1()
tiene una propiedad chunk()
. Esta es la propiedad utilizada para fragmentar la entrada en un tamaño definido. Para nosotros, el tamaño es 10.
4.6 Procesador de elementos
El procesador de elementos es una interfaz que se encargará de procesar los datos. Implementaremos la interfaz en EmployeeItemProcessor.java
. La clase java tendrá el aspecto que se muestra a continuación.
EmployeeItemProcessor.java
package com.batch.processor;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.batch.item.ItemProcessor;import com.batch.Employee;public class EmployeeItemProcessor implements ItemProcessor<Employee, Employee> { private static final Logger log = LoggerFactory.getLogger(EmployeeItemProcessor.class); @Override public Employee process(Employee emp) throws Exception { final String firstName = emp.getFirstName().toUpperCase(); final String lastName = emp.getLastName().toUpperCase(); final Employee transformedEmployee = new Employee(firstName, lastName); log.info("Converting (" + emp + ") into (" + transformedEmployee + ")"); return transformedEmployee; }}
En el método process()
obtendremos los datos y los transformaremos en el nombre en mayúsculas.
4.7 Clase JobExecutionSupportListener
JobExecutionListenerSupport
es la interfaz que notificará cuando se complete el trabajo. Como parte de la interfaz, tenemos el método afterJob
. Este método se utiliza para publicar la finalización del trabajo.
JobCompletionNotificationListener.java
package com.batch.config;import java.util.List;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.batch.core.BatchStatus;import org.springframework.batch.core.JobExecution;import org.springframework.batch.core.listener.JobExecutionListenerSupport;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jdbc.core.JdbcTemplate;import org.springframework.jdbc.core.RowMapper;import org.springframework.stereotype.Component;import com.batch.Employee;@Componentpublic class JobCompletionNotificationListener extends JobExecutionListenerSupport {private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);private final JdbcTemplate jdbcTemplate;@Autowiredpublic JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {this.jdbcTemplate = jdbcTemplate;}@Overridepublic void afterJob(JobExecution jobExecution) {RowMapper rowMapper = (rs, rowNum) -> {Employee e = new Employee();e.setFirstName(rs.getString(1));e.setLastName(rs.getString(2));return e;};if(jobExecution.getStatus() == BatchStatus.COMPLETED) {log.info("!!! JOB FINISHED! Time to verify the results");List empList= jdbcTemplate.query("SELECT first_name, last_name FROM employee",rowMapper);log.info("Size of List "+empList.size());for (Employee emp: empList) {log.info("Found: "+emp.getFirstName()+" "+emp.getLastName());}}}}
En este método, estamos obteniendo los datos de la base de datos después de la finalización del trabajo y estamos imprimiendo el resultado en la consola para verificar el procesamiento que se realizó en los datos.
4.8 Clase de aplicación
Crearemos una clase de aplicación que contendrá el método principal responsable de desencadenar el programa por lotes de java. La clase tendrá el aspecto que se muestra a continuación.
Application.java
package com.batch;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class Application { public static void main(String args) throws Exception { SpringApplication.run(Application.class, args); }}
@SpringBootApplication
es la anotación que se utiliza para especificar un programa como programa de Spring Boot.
Output
Ejecutaremos la aplicación como una aplicación Java. Obtendremos la siguiente salida en la consola.
La salida del programa JavaBatch
El flujo de trabajo del programa batch está muy claramente disponible en la salida. El Job comienza con importUserJob
, luego comienza la ejecución del paso 1 donde convierte los datos leídos en mayúsculas.
Post-procesamiento del paso, podemos ver el resultado en mayúsculas en la consola.
Resumen
En este tutorial, hemos aprendido las siguientes cosas:
- Java batch contiene Jobs que pueden contener múltiples pasos.
- Cada paso es una combinación de lectura, proceso, escritura.
- Podemos trocear los datos en diferentes tamaños para su procesamiento.
7. Descargar el proyecto Eclipse
Este fue un tutorial para JavaBatch con SpringBoot.