洞察企业如何通过模块化APP集成工具高效管理多平台小程序
812
2022-12-10
Spring Batch 如何自定义ItemReader
目录Spring Batch 自定义ItemReader创建自定义ItemReader配置ItemReader Bean小结一下Spring Batch 之 ItekSXkgMdXmReaderJdbcPagingItemReaderFlatFileItemReaderStaxEventItemReaderMultiResourceItemReader异常处理及重启机制
Spring Batch 自定义ItemReader
Spring Batch支持各种数据输入源,如文件、数据库等。然而有时也会遇到一些默认不支持的数据源,这时我们则需要实现自己的数据源————自定义ItemReader。本文通过示例说明如何自定义ItemReader。
创建自定义ItemReader
创建自定义ItemReader需要下面两个步骤:
创建一个实现ItemReader接口的类,并提供返回对象类型 T 作为类型参数。
按照下面规则实现ItemReader接口的T read()方法
read()方法如果存在下一个对象则返回,否则返回null。
下面我们自定义ItemReader,其返回在线测试课程的学生信息StuDto类型,为了减少复杂性,该数据存储在内存中。StuDto类是一个简单数据传输对象,代码如下:
@Data
public class StuDTO {
private String emailAddress;
private String name;
private String purchasedPackage;
}
下面参照一下步骤创建ItemReader:
创建InMemoryStudentReader 类
实现ItemReader接口,并设置返回对象类型为StuDto
类中增加List studentData 字段,其包括参加课程的学生信息
类中增加nextStudentIndex 字段,表示下一个StuDto对象的索引
增加私有initialize()方法,初始化学生信息并设置索引值为0
创建构造函数并调用initialize方法
实现read()方法,包括下面规则:如果存在下一个学生,则返回StuDto对象并把索引加一。否则返回null。
InMemoryStudentReader 代码如下:
public class InMemoryStudentReader implements ItemReader
private int nextStudentIndex;
private List
InMemoryStudentReader() {
initialize();
}
private void initialize() {
StuDto tony = new StuDto();
tony.setEmailAddress("tony.tester@gmail.com");
tony.setName("Tony Tester");
tony.setPurchasedPackage("master");
StuDto nick = new StuDto();
nick.setEmailAddress("nick.newbie@gmail.com");
nick.setName("Nick Newbie");
nick.setPurchasedPackage("starter");
StuDto ian = new StuDto();
ian.setEmailAddress("ian.intermediate@gmail.com");
ian.setName("Ian Intermediate");
ian.setPurchasedPackage("intermediate");
studentData = Collections.unmodifiableList(Arrays.asList(tony, nick, ian));
nextStudentIndex = 0;
}
@Override
public StuDto read() throws Exception {
StuDto nextStudent = null;
if (nextStudentIndex < studentData.size()) {
nextStudent = studentData.get(nextStudentIndex);
nextStudentIndex++;
}
return nextStudent;
}
}
创建好自定义ItemReader后,需要配置其作为bean让Spring Batch Job使用。下面请看如何配置。
配置ItemReader Bean
配置类代码如下:
@Configuration
public class InMemoryStudentJobConfig {
@Bean
ItemReader
return new InMemoryStudentReader();
}
}
需要增加@Configuration表明类为配置类, 增加方法返回ItemReader类型,并增加@Bean注解,实现方法内容————返回InMemoryStudentReader对象。
小结一下
本文通过示例说明如何自定义ItemReader,主要包括三个方面:
自定义ItemReader需实现ItemReader接口
实现ItemReader接口,需要指定返回类型作为类型参数(T)
实现接口方法read,如果存在下一个对象则返回,反之返回null
Spring Batch 之 ItemReader
重点介绍 ItemReader,如何从不同数据源读取数据;以及异常处理及重启机制。
JdbcPagingItemReader
从数据库中读取数据
@Configuration
public class DBJdbcDemoJobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("dbJdbcDemoWriter")
private ItemWriter super Customer> dbJdbcDemoWriter;
@Autowired
private DataSource dataSource;
@Bean
public Job DBJdbcDemoJob(){
return jobBuilderFactory.get("DBJdbcDemoJob")
.start(dbJdbcDemoStep())
.build();
}
@Bean
public Step dbJdbcDemoStep() {
return stepBuilderFactory.get("dbJdbcDemoStep")
.
.reader(dbJdbcDemoReader())
.writer(dbJdbcDemoWriter)
.build();
}
@Bean
@StepScope
public JdbcPagingItemReader
JdbcPagingItemReader
reader.setDataSource(this.dataSource);
reader.setFetchSize(100); //批量读取
reader.setRowMapper((rs,rowNum)->{
return Customer.builder().id(rs.getLong("id"))
.firstName(rs.getString("firstName"))
.lastName(rs.getString("lastName"))
.birthdate(rs.getString("birthdate"))
.build();
});
mysqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from Customer");
Map
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
}
Job 和 ItermWriter不是本文介绍重点,此处举例,下面例子相同
@Component("dbJdbcDemoWriter")
public class DbJdbcDemoWriter implements ItemWriter
@Override
public void write(List extends Customer> items) throws Exception {
for (Customer customer:items)
System.out.println(customer);
}
}
FlatFileItemReader
从CVS文件中读取数据
@Configuration
public class FlatFileDemoJobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("flatFileDemoWriter")
private ItemWriter super Customer> flatFileDemoWriter;
@Bean
public Job flatFileDemoJob(){
return jobBuilderFactory.get("flatFileDemoJob")
.start(flatFileDemoStep())
.build();
}
@Bean
public Step flatFileDemoStep() {
return stepBuilderFactory.get("flatFileDemoStep")
.
.reader(flatFileDemoReader())
.writer(flatFileDemoWriter)
.build();
}
@Bean
@StepScope
public FlatFileItemReader
FlatFileItemReader
reader.setResource(new ClassPathResource("customer.csv"));
reader.setLinesToSkip(1);
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[]{"id","firstName","lastName","birthdate"});
DefaultLineMapper
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper((fieldSet -> {
return Customer.builder().id(fieldSet.readLong("id"))
.firstName(fieldSet.readString("firstName"))
.lastName(fieldSet.readString("lastName"))
.birthdate(fieldSet.readString("birthdate"))
.build();
}));
lineMapper.afterPropertiesSet();
reader.setLineMapper(lineMapper);
return reader;
}
}
StaxEventItemReader
从XML文件中读取数据
@Configuration
public class XmlFileDemoJobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("xmlFileDemoWriter")
private ItemWriter super Customer> xmlFileDemoWriter;
@Bean
public Job xmlFileDemoJob(){
return jobBuilderFactory.get("xmlFileDemoJob")
.start(xmlFileDemoStep())
.build();
}
@Bean
public Step xmlFileDemoStep() {
return stepBuilderFactory.get("xmlFileDemoStep")
.
.reader(xmlFileDemoReader())
.writer(xmlFileDemoWriter)
.build();
}
@Bean
@StepScope
public StaxEventItemReader
StaxEventItemReader
reader.setResource(new ClassPathResource("customer.xml"));
reader.setFragmentRootElementName("customer");
XStreamMarshaller unMarshaller = new XStreamMarshaller();
Map
map.put("customer",Customer.class);
unMarshaller.setAliases(map);
reader.setUnmarshaller(unMarshaller);
return reader;
}
}
MultiResourceItemReader
从多个文件读取数据
@Configuration
public class MultipleFileDemoJobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("flatFileDemoWriter")
private ItemWriter super Customer> flatFileDemoWriter;
@Value("classpath*:/file*.csv")
private Resource[] inputFiles;
@Bean
public Job multipleFileDemoJob(){
return jobBuilderFactory.get("multipleFileDemoJob")
.start(multipleFileDemoStep())
.build();
}
@Bean
public Step multipleFileDemoStep() {
return stepBuilderFactory.get("multipleFileDemoStep")
.
.reader(multipleResourceItemReader())
.writer(flatFileDemoWriter)
.build();
}
private MultiResourceItemReader
MultiResourceItemReader
reader.setDelegate(flatFileReader());
reader.setResources(inputFiles);
return reader;
}
@Bean
public FlatFileItemReader
FlatFileItemReader
reader.setResource(new ClassPathResource("customer.csv"));
// reader.setLinesToSkip(1);
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[]{"id","firstName","lastName","birthdate"});
DefaultLineMapper
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper((fieldSet -> {
return Customer.builder().id(fieldSet.readLong("id"))
.firstName(fieldSet.readString("firstName"))
.lastName(fieldSet.readString("lastName"))
.birthdate(fieldSet.readString("birthdate"))
.build();
}));
lineMapper.afterPropertiesSet();
reader.setLineMapper(lineMapper);
return reader;
}
}
异常处理及重启机制
对于chunk-oriented step,Spring Batch提供了管理状态的工具。如何在一个步骤中管理状态是通过ItemStream接口为开发人员提供访问权限保持状态的组件。这里提到的这个组件是ExecutionContext实际上它是键值对的映射。map存储特定步骤的状态。该ExecutionContext使重启步骤成为可能,因为状态在JobRepository中持久存在。
执行期间出现错误时,最后一个状态将更新为JobRepository。下次作业运行时,最后一个状态将用于填充ExecutionContext然后
可以继续从上次离开的地方开始运行。
检查ItemStream接口:
将在步骤开始时调用open()并执行ExecutionContext;
用DB填充值; update()将在每个步骤或事务结束时调用,更新ExecutionContext;
完成所有数据块后调用close();
下面我们构造个例子
准备个cvs文件,在第33条数据,添加一条错误名字信息 ;当读取到这条数据时,抛出异常终止程序。
ItemReader测试代码
@Component("restartDemoReader")
public class RestartDemoReader implements ItemStreamReader
private Long curLine = 0L;
private boolean restart = false;
private FlatFileItemReader
private ExecutionContext executionContext;
RestartDemoReadekSXkgMdXr
public () {
reader.setResource(new ClassPathResource("restartDemo.csv"));
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[]{"id", "firstName", "lastName", "birthdate"});
DefaultLineMapper
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper((fieldSet -> {
return Customer.builder().id(fieldSet.readLong("id"))
.firstName(fieldSet.readString("firstName"))
.lastName(fieldSet.readString("lastName"))
.birthdate(fieldSet.readString("birthdate"))
.build();
}));
lineMapper.afterPropertiesSet();
reader.setLineMapper(lineMapper);
}
@Override
public Customer read() throws Exception, UnexpectedInputException, ParseException,
NonTransientResourceException {
Customer customer = null;
this.curLine++;
//如果是重启,则从上一步读取的行数继续往下执行
if (restart) {
reader.setLinesToSkip(this.curLine.intValue()-1);
restart = false;
System.out.println("Start reading from line: " + this.curLine);
}
reader.open(this.executionContext);
customer = reader.read();
//当匹配到wrongName时,显示抛出异常,终止程序
if (customer != null) {
if (customer.getFirstName().equals("wrongName"))
throw new RuntimeException("Something wrong. Customer id: " + customer.getId());
} else {
curLine--;
}
return customer;
}
/**
* 判断是否是重启job
* @param executionContext
* @throws ItemStreamException
*/
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
this.executionContext = executionContext;
if (executionContext.containsKey("curLine")) {
this.curLine = executionContext.getLong("curLine");
this.restart = true;
} else {
this.curLine = 0L;
executionContext.put("curLine", this.curLine.intValue());
}
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
System.out.println("update curLine:kSXkgMdX " + this.curLine);
executionContext.put("curLine", this.curLine);
}
@Override
public void close() throws ItemStreamException {
}
}
Job配置
以10条记录为一个批次,进行读取
@Configuration
public class RestartDemoJobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("flatFileDemoWriter")
private ItemWriter super Customer> flatFileDemoWriter;
@Autowired
@Qualifier("restartDemoReader")
private ItemReader
@Bean
public Job restartDemoJob(){
return jobBuilderFactory.get("restartDemoJob")
.start(restartDemoStep())
.build();
}
@Bean
public Step restartDemoStep() {
return stepBuilderFactory.get("restartDemoStep")
.
.reader(restartDemoReader)
.writer(flatFileDemoWriter)
.build();
}
}
当我们第一次执行时,程序在33行抛出异常异常,curline值是30;
这时,我们可以查询数据库 batch_step_excution表,发现curline值已经以 键值对形式,持久化进数据库(上文以10条数据为一个批次;故33条数据异常时,curline值为30)
接下来,我们更新wrongName,再次执行程序;
程序会执行open方法,判断数据库step中map是否存在curline,如果存在,则是重跑,即读取curline,从该批次开始往下继续执行;
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~