Spring Batch 架构介绍
-
从数据库,文件或队列中读取大量记录。
-
以某种方式处理数据。
-
以修改之后的形式写回数据。
Spring Batch 核心概念介绍
什么是 Job
/**
* Batch domain object representing a job. Job is an explicit abstraction
* representing the configuration of a job specified by a developer. It should
* be noted that restart policy is applied to the job as a whole and not to a
* step.
*/
public interface Job {
String getName();
boolean isRestartable();
void execute(JobExecution execution);
JobParametersIncrementer getJobParametersIncrementer();
JobParametersValidator getJobParametersValidator();
}
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob")
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.end()
.build();
}
什么是 JobInstance
public interface JobInstance {
/**
* Get unique id for this JobInstance.
* @return instance id
*/
public long getInstanceId();
/**
* Get job name.
* @return value of 'id' attribute from <job>
*/
public String getJobName();
}
什么是 JobParameters
什么是 JobExecution
public interface JobExecution {
/**
* Get unique id for this JobExecution.
* @return execution id
*/
public long getExecutionId();
/**
* Get job name.
* @return value of 'id' attribute from <job>
*/
public String getJobName();
/**
* Get batch status of this execution.
* @return batch status value.
*/
public BatchStatus getBatchStatus();
/**
* Get time execution entered STARTED status.
* @return date (time)
*/
public Date getStartTime();
/**
* Get time execution entered end status: COMPLETED, STOPPED, FAILED
* @return date (time)
*/
public Date getEndTime();
/**
* Get execution exit status.
* @return exit status.
*/
public String getExitStatus();
/**
* Get time execution was created.
* @return date (time)
*/
public Date getCreateTime();
/**
* Get time execution was last updated updated.
* @return date (time)
*/
public Date getLastUpdatedTime();
/**
* Get job parameters for this execution.
* @return job parameters
*/
public Properties getJobParameters();
}
public enum BatchStatus {STARTING, STARTED, STOPPING,
STOPPED, FAILED, COMPLETED, ABANDONED }
batch_job_execution
, 下面是一个从数据库当中截图的实例:
什么是 Step
什么是 StepExecution
什么是 ExecutionContext
ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
什么是 JobRepository
@EnableBatchProcessing
注解可以为 JobRepository 提供自动配置。
什么是 JobLauncher
public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
什么是 Item Reader
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "NEW");
return new JdbcPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(customerCreditMapper())
.pageSize(1000)
.build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setSelectClause("select id, name, credit");
provider.setFromClause("from customer");
provider.setWhereClause("where status=:status");
provider.setSortKey("id");
return provider;
}
private JdbcCursorItemReader<Map<String, Object>> buildItemReader(final DataSource dataSource, String tableName,
String tenant) {
JdbcCursorItemReader<Map<String, Object>> itemReader = new JdbcCursorItemReader<>();
itemReader.setDataSource(dataSource);
itemReader.setSql("sql here");
itemReader.setRowMapper(new RowMapper());
return itemReader;
}
什么是 Item Writer
什么是 Item Processor
chunk 处理流程
skip 策略和失败处理
批处理操作指南
批处理原则
理解决方案时,应考虑以下关键原则和注意事项。
-
批处理体系结构通常会影响体系结构
-
尽可能简化并避免在单批应用程序中构建复杂的逻辑结构
-
保持数据的处理和存储在物理上靠得很近(换句话说,将数据保存在处理过程中)。
-
最大限度地减少系统资源的使用,尤其是 I / O. 在 internal memory 中执行尽可能多的操作。
-
当数据可以被读取一次并缓存或保存在工作存储中时,读取每个事务的数据。
-
重新读取先前在同一事务中读取数据的事务的数据。
-
导致不必要的表或索引扫描。
-
未在 SQL 语句的 WHERE 子句中指定键值。
在批处理运行中不要做两次一样的事情。例如,如果需要数据汇总以用于报告目的,则应该(如果可能)在最初处理数据时递增存储的总计,因此您的报告应用程序不必重新处理相同的数据。
总是假设数据完整性最差。插入适当的检查和记录验证以维护数据完整性。
尽可能实施校验和以进行内部验证。例如,对于一个文件里的数据应该有一个数据条数纪录,告诉文件中的记录总数以及关键字段的汇总。
在具有真实数据量的类似生产环境中尽早计划和执行压力测试。
如何默认不启动 job
spring.batch.job.enabled=false
在读数据时内存不够
Resource exhaustion event:the JVM was unable to allocate memory from the heap.
-
调整 reader 读数据逻辑,按分页读取,但实现上会麻烦一些,且运行效率会下降
-
增大 service 内存
推荐阅读
1. GitHub 上有什么好玩的项目?
2. Linux 运维必备 150 个命令汇总
3. SpringSecurity + JWT 实现单点登录
本文分享自微信公众号 - Java后端(web_resource)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。
相关文章
暂无评论...