房产公司网站建设,wordpress主题修改ftp,上网用哪家公司的比较好,陕西煤业化工建设集团有限公司网站来源#xff1a;blog.csdn.net/topdeveloperr/article/details/84337956spring batch简介Spring Batch架构介绍Spring Batch核心概念介绍chunk 处理流程批处理操作指南spring batch简介 spring batch是spring提供的一个数据处理框架。企业域中的许多应用程序需要批量处理才能在… 来源blog.csdn.net/topdeveloperr/article/details/84337956spring batch简介Spring Batch架构介绍Spring Batch核心概念介绍chunk 处理流程批处理操作指南 spring batch简介 spring batch是spring提供的一个数据处理框架。企业域中的许多应用程序需要批量处理才能在关键任务环境中执行业务操作。这些业务运营包括无需用户交互即可最有效地处理大量信息的自动化复杂处理。这些操作通常包括基于时间的事件例如月末计算通知或通信。在非常大的数据集中重复处理复杂业务规则的定期应用例如保险利益确定或费率调整。集成从内部和外部系统接收的信息这些信息通常需要以事务方式格式化验证和处理到记录系统中。批处理用于每天为企业处理数十亿的交易。Spring Batch是一个轻量级全面的批处理框架旨在开发对企业系统日常运营至关重要的强大批处理应用程序。Spring Batch构建了人们期望的Spring Framework特性生产力基于POJO的开发方法和一般易用性同时使开发人员可以在必要时轻松访问和利用更高级的企业服务。Spring Batch不是一个schuedling的框架。Spring Batch提供了可重用的功能这些功能对于处理大量的数据至关重要包括记录/跟踪事务管理作业处理统计作业重启跳过和资源管理。它还提供更高级的技术服务和功能通过优化和分区技术实现极高容量和高性能的批处理作业。Spring Batch可用于两种简单的用例例如将文件读入数据库或运行存储过程以及复杂的大量用例例如在数据库之间移动大量数据转换它等等 上。大批量批处理作业可以高度可扩展的方式利用该框架来处理大量信息。 Spring Batch架构介绍 一个典型的批处理应用程序大致如下从数据库文件或队列中读取大量记录。以某种方式处理数据。以修改之后的形式写回数据。其对应的示意图如下spring batch的一个总体的架构如下Figure 2.1: Batch Stereotypes在spring batch中一个job可以定义很多的步骤step在每一个step里面可以定义其专属的ItemReader用于读取数据ItemProcesseor用于处理数据ItemWriter用于写数据而每一个定义的job则都在JobRepository里面我们可以通过JobLauncher来启动某一个job。Spring Batch核心概念介绍下面是一些概念是Spring batch框架中的核心概念。什么是JobJob和Step是spring batch执行批处理任务最为核心的两个概念。其中Job是一个封装整个批处理过程的一个概念。Job在spring batch的体系当中只是一个最顶层的一个抽象概念体现在代码当中则它只是一个最上层的接口其代码如下:
/*** Batch domain object representing a job. Job is an explicit abstraction* representing the configuration of a job specified by a developer. It should* be noted that restart policy is applied to the job as a whole and not to a* step.*/
public interface Job {String getName();boolean isRestartable();void execute(JobExecution execution);JobParametersIncrementer getJobParametersIncrementer();JobParametersValidator getJobParametersValidator();}
在Job这个接口当中定义了五个方法它的实现类主要有两种类型的job一个是simplejob另一个是flowjob。在spring batch当中job是最顶层的抽象除job之外我们还有JobInstance以及JobExecution这两个更加底层的抽象。一个job是我们运行的基本单位它内部由step组成。job本质上可以看成step的一个容器。一个job可以按照指定的逻辑顺序组合step并提供了我们给所有step设置相同属性的方法例如一些事件监听跳过策略。Spring Batch以SimpleJob类的形式提供了Job接口的默认简单实现它在Job之上创建了一些标准功能。一个使用java config的例子代码如下Bean
public Job footballJob() {return this.jobBuilderFactory.get(footballJob).start(playerLoad()).next(gameLoad()).next(playerSummarization()).end().build();
}
这个配置的意思是首先给这个job起了一个名字叫footballJob接着指定了这个job的三个step他们分别由方法playerLoad,gameLoad, playerSummarization实现。什么是JobInstance我们在上文已经提到了JobInstance他是Job的更加底层的一个抽象他的定义如下public interface JobInstance {/*** Get unique id for this JobInstance.* return instance id*/public long getInstanceId();/*** Get job name.* return value of id attribute from job*/public String getJobName();
}
他的方法很简单一个是返回Job的id另一个是返回Job的名字。JobInstance指的是job运行当中作业执行过程当中的概念。Instance本就是实例的意思。比如说现在有一个批处理的job它的功能是在一天结束时执行行一次。我们假定这个批处理job的名字为EndOfDay。在这个情况下那么每天就会有一个逻辑意义上的JobInstance, 而我们必须记录job的每次运行的情况。什么是JobParameters在上文当中我们提到了同一个job每天运行一次的话那么每天都有一个jobIntsance但他们的job定义都是一样的那么我们怎么来区别一个job的不同jobinstance了。不妨先做个猜想虽然jobinstance的job定义一样但是他们有的东西就不一样例如运行时间。spring batch中提供的用来标识一个jobinstance的东西是JobParameters。JobParameters对象包含一组用于启动批处理作业的参数它可以在运行期间用于识别或甚至用作参考数据。我们假设的运行时间就可以作为一个JobParameters。例如, 我们前面的EndOfDay的job现在已经有了两个实例一个产生于1月1日另一个产生于1月2日那么我们就可以定义两个JobParameter对象一个的参数是01-01, 另一个的参数是01-02。因此识别一个JobInstance的方法可以定义为因此我么可以通过Jobparameter来操作正确的JobInstance什么是JobExecutionJobExecution指的是单次尝试运行一个我们定义好的Job的代码层面的概念。job的一次执行可能以失败也可能成功。只有当执行成功完成时给定的与执行相对应的JobInstance才也被视为完成。还是以前面描述的EndOfDay的job作为示例假设第一次运行01-01-2019的JobInstance结果是失败。那么此时如果使用与第一次运行相同的Jobparameter参数即01-01-2019作业参数再次运行那么就会创建一个对应于之前jobInstance的一个新的JobExecution实例,JobInstance仍然只有一个。JobExecution的接口定义如下public interface JobExecution {/*** Get unique id for this JobExecution.* return execution id*/public long getExecutionId();/*** Get job name.* return value of id attribute from job*/public String getJobName();/*** Get batch status of this execution.* return batch status value.*/public BatchStatus getBatchStatus();/*** Get time execution entered STARTED status.* return date (time)*/public Date getStartTime();/*** Get time execution entered end status: COMPLETED, STOPPED, FAILED* return date (time)*/public Date getEndTime();/*** Get execution exit status.* return exit status.*/public String getExitStatus();/*** Get time execution was created.* return date (time)*/public Date getCreateTime();/*** Get time execution was last updated updated.* return date (time)*/public Date getLastUpdatedTime();/*** Get job parameters for this execution.* return job parameters*/public Properties getJobParameters();}
每一个方法的注释已经解释的很清楚这里不再多做解释。只提一下BatchStatusJobExecution当中提供了一个方法getBatchStatus用于获取一个job某一次特地执行的一个状态。BatchStatus是一个代表job状态的枚举类其定义如下public enum BatchStatus {STARTING, STARTED, STOPPING,STOPPED, FAILED, COMPLETED, ABANDONED }
这些属性对于一个job的执行来说是非常关键的信息并且spring batch会将他们持久到数据库当中. 在使用Spring batch的过程当中spring batch会自动创建一些表用于存储一些job相关的信息用于存储JobExecution的表为batch_job_execution,下面是一个从数据库当中截图的实例什么是Step每一个Step对象都封装了批处理作业的一个独立的阶段。事实上每一个Job本质上都是由一个或多个步骤组成。每一个step包含定义和控制实际批处理所需的所有信息。任何特定的内容都由编写Job的开发人员自行决定。一个step可以非常简单也可以非常复杂。例如一个step的功能是将文件中的数据加载到数据库中那么基于现在spring batch的支持则几乎不需要写代码。更复杂的step可能具有复杂的业务逻辑这些逻辑作为处理的一部分。与Job一样Step具有与JobExecution类似的StepExecution如下图所示什么是StepExecutionStepExecution表示一次执行Step, 每次运行一个Step时都会创建一个新的StepExecution类似于JobExecution。但是某个步骤可能由于其之前的步骤失败而无法执行。且仅当Step实际启动时才会创建StepExecution。一次step执行的实例由StepExecution类的对象表示。每个StepExecution都包含对其相应步骤的引用以及JobExecution和事务相关的数据例如提交和回滚计数以及开始和结束时间。此外每个步骤执行都包含一个ExecutionContext其中包含开发人员需要在批处理运行中保留的任何数据例如重新启动所需的统计信息或状态信息。下面是一个从数据库当中截图的实例什么是ExecutionContextExecutionContext即每一个StepExecution 的执行环境。它包含一系列的键值对。我们可以用如下代码获取ExecutionContextExecutionContext ecStep stepExecution.getExecutionContext();
ExecutionContext ecJob jobExecution.getExecutionContext();
什么是JobRepositoryJobRepository是一个用于将上述jobstep等概念进行持久化的一个类。它同时给Job和Step以及下文会提到的JobLauncher实现提供CRUD操作。首次启动Job时将从repository中获取JobExecution并且在执行批处理的过程中StepExecution和JobExecution将被存储到repository当中。EnableBatchProcessing注解可以为JobRepository提供自动配置。什么是JobLauncherJobLauncher这个接口的功能非常简单它是用于启动指定了JobParameters的Job为什么这里要强调指定了JobParameter原因其实我们在前面已经提到了jobparameter和job一起才能组成一次job的执行。下面是代码实例public interface JobLauncher {public JobExecution run(Job job, JobParameters jobParameters)throws JobExecutionAlreadyRunningException, JobRestartException,JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
上面run方法实现的功能是根据传入的job以及jobparamaters从JobRepository获取一个JobExecution并执行Job。什么是Item ReaderItemReader是一个读数据的抽象它的功能是为每一个Step提供数据输入。当ItemReader以及读完所有数据时它会返回null来告诉后续操作数据已经读完。Spring Batch为ItemReader提供了非常多的有用的实现类比如JdbcPagingItemReaderJdbcCursorItemReader等等。ItemReader支持的读入的数据源也是非常丰富的包括各种类型的数据库文件数据流等等。几乎涵盖了我们的所有场景。下面是一个JdbcPagingItemReader的例子代码Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {MapString, Object parameterValues new HashMap();parameterValues.put(status, NEW);return new JdbcPagingItemReaderBuilderCustomerCredit().name(creditReader).dataSource(dataSource).queryProvider(queryProvider).parameterValues(parameterValues).rowMapper(customerCreditMapper()).pageSize(1000).build();
}Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {SqlPagingQueryProviderFactoryBean provider new SqlPagingQueryProviderFactoryBean();provider.setSelectClause(select id, name, credit);provider.setFromClause(from customer);provider.setWhereClause(where status:status);provider.setSortKey(id);return provider;
}
JdbcPagingItemReader必须指定一个PagingQueryProvider负责提供SQL查询语句来按分页返回数据。下面是一个JdbcCursorItemReader的例子代码: private JdbcCursorItemReaderMapString, Object buildItemReader(final DataSource dataSource, String tableName,String tenant) {JdbcCursorItemReaderMapString, Object itemReader new JdbcCursorItemReader();itemReader.setDataSource(dataSource);itemReader.setSql(sql here);itemReader.setRowMapper(new RowMapper());return itemReader;}
什么是Item Writer既然ItemReader是读数据的一个抽象那么ItemWriter自然就是一个写数据的抽象它是为每一个step提供数据写出的功能。写的单位是可以配置的我们可以一次写一条数据也可以一次写一个chunk的数据关于chunk下文会有专门的介绍。ItemWriter对于读入的数据是不能做任何操作的。Spring Batch为ItemWriter也提供了非常多的有用的实现类当然我们也可以去实现自己的writer功能。什么是Item ProcessorItemProcessor对项目的业务逻辑处理的一个抽象, 当ItemReader读取到一条记录之后ItemWriter还未写入这条记录之前I我们可以借助temProcessor提供一个处理业务逻辑的功能并对数据进行相应操作。如果我们在ItemProcessor发现一条数据不应该被写入可以通过返回null来表示。ItemProcessor和ItemReader以及ItemWriter可以非常好的结合在一起工作他们之间的数据传输也非常方便。我们直接使用即可。chunk 处理流程spring batch提供了让我们按照chunk处理数据的能力一个chunk的示意图如下它的意思就和图示的一样由于我们一次batch的任务可能会有很多的数据读写操作因此一条一条的处理并向数据库提交的话效率不会很高因此spring batch提供了chunk这个概念我们可以设定一个chunk sizespring batch 将一条一条处理数据但不提交到数据库只有当处理的数据数量达到chunk size设定的值得时候才一起去commit.java的实例定义代码如下在上面这个step里面chunk size被设为了10当ItemReader读的数据数量达到10的时候这一批次的数据就一起被传到itemWriter同时transaction被提交。skip策略和失败处理一个batch的job的step可能会处理非常大数量的数据难免会遇到出错的情况出错的情况虽出现的概率较小但是我们不得不考虑这些情况因为我们做数据迁移最重要的是要保证数据的最终一致性。spring batch当然也考虑到了这种情况并且为我们提供了相关的技术支持请看如下bean的配置我们需要留意这三个方法分别是skipLimit(),skip(),noSkip(),skipLimit方法的意思是我们可以设定一个我们允许的这个step可以跳过的异常数量假如我们设定为10则当这个step运行时只要出现的异常数目不超过10整个step都不会fail。注意若不设定skipLimit则其默认值是0.skip方法我们可以指定我们可以跳过的异常因为有些异常的出现我们是可以忽略的。noSkip方法的意思则是指出现这个异常我们不想跳过也就是从skip的所以exception当中排除这个exception从上面的例子来说也就是跳过所有除FileNotFoundException的exception。那么对于这个step来说FileNotFoundException就是一个fatal的exception抛出这个exception的时候step就会直接fail批处理操作指南本部分是一些使用spring batch时的值得注意的点批处理原则在构建批处理解决方案时应考虑以下关键原则和注意事项。批处理体系结构通常会影响体系结构尽可能简化并避免在单批应用程序中构建复杂的逻辑结构保持数据的处理和存储在物理上靠得很近换句话说将数据保存在处理过程中。最大限度地减少系统资源的使用尤其是I / O. 在internal memory中执行尽可能多的操作。查看应用程序I / O分析SQL语句以确保避免不必要的物理I / O. 特别是需要寻找以下四个常见缺陷当数据可以被读取一次并缓存或保存在工作存储中时读取每个事务的数据。重新读取先前在同一事务中读取数据的事务的数据。导致不必要的表或索引扫描。未在SQL语句的WHERE子句中指定键值。在批处理运行中不要做两次一样的事情。例如如果需要数据汇总以用于报告目的则应该如果可能在最初处理数据时递增存储的总计因此您的报告应用程序不必重新处理相同的数据。在批处理应用程序开始时分配足够的内存以避免在此过程中进行耗时的重新分配。总是假设数据完整性最差。插入适当的检查和记录验证以维护数据完整性。尽可能实施校验和以进行内部验证。例如对于一个文件里的数据应该有一个数据条数纪录告诉文件中的记录总数以及关键字段的汇总。在具有真实数据量的类似生产环境中尽早计划和执行压力测试。在大批量系统中数据备份可能具有挑战性特别是如果系统以24-7在线的情况运行。数据库备份通常在在线设计中得到很好的处理但文件备份应该被视为同样重要。如果系统依赖于文件则文件备份过程不仅应该到位并记录在案还应定期进行测试。如何默认不启动job在使用java config使用spring batch的job时如果不做任何配置项目在启动时就会默认去跑我们定义好的批处理job。那么如何让项目在启动时不自动去跑job呢spring batch的job会在项目启动时自动run如果我们不想让他在启动时run的话可以在application.properties中添加如下属性spring.batch.job.enabledfalse
在读数据时内存不够在使用spring batch做数据迁移时发现在job启动后执行到一定时间点时就卡在一个地方不动了且log也不再打印等待一段时间之后得到如下错误红字的信息为Resource exhaustion eventthe JVM was unable to allocate memory from the heap.翻译过来的意思就是项目发出了一个资源耗尽的事件告诉我们java虚拟机无法再为堆分配内存。造成这个错误的原因是: 这个项目里的batch job的reader是一次性拿回了数据库里的所有数据并没有进行分页当这个数据量太大时就会导致内存不够用。解决的办法有两个:调整reader读数据逻辑按分页读取但实现上会麻烦一些且运行效率会下降增大service内存- END -Autowired报错的4种解决方案和原因分析厉害了Spring中bean的12种定义方法Spring Cache 实战兼容所有缓存中间件