SpringBatch从入门到实战(六):ItemReader

news/2024/7/7 20:19:14

一:ListItemReader

用于简单的开发测试。

@Bean
public ItemReader<String> listItemReader() {
    return new ListItemReader<>(Arrays.asList("a", "b", "c"));
}

二:FlatFileItemReader

1.1 完全映射

当文件里的字段值和实体类的属性完全一样时,可以直接使用targetType(Class)来完成映射。常用的分割符如逗号, “\u001B” 表示ESC,

1,monday,10,上海市,浦东新区
2,zhangsan,20,北京市,朝阳区
3,lisi,30,深圳市,宝安区
4,wangwu,31,上海市,浦东新区
5,huihui,32,上海市,浦东新区
@Getter
@Setter
@ToString
public class UserInfo {
    private Long id;
    private String username;
    private Integer age;
    private String city;
    private String area;
}
@Configuration
public class HelloWorldChunkJobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;


    @Bean
    public Job helloWorldChunkJob() {
        return jobBuilderFactory.get("helloWorldChunkJob")
                .start(step1())
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<UserInfo, UserInfo>chunk(3)
                .reader(itemReader())
                .writer(itemWriter())
                .build();
    }

    @Bean
    public ItemReader<UserInfo> itemReader() {
        return new FlatFileItemReaderBuilder<UserInfo>()
                .encoding("UTF-8")
                .name("userItemReader")
                .resource(new ClassPathResource("static/user.csv"))
                //.resource(new PathResource("/a/b/c/user.csv"))
                .linesToSkip(1) // 跳过第一行
                .delimited().delimiter(",")
                .names("id", "username", "age", "city", "area")
                .targetType(UserInfo.class)
                .build();
    }


    @Bean
    public ItemWriter<UserInfo> itemWriter() {
        return new ItemWriter<UserInfo>() {
            @Override
            public void write(List<? extends UserInfo> items) throws Exception {
                System.out.println("itemWriter=" + items);
            }
        };
    }
}

1.2 自定字段映射 fieldSetMapper

@Getter
@Setter
@ToString
public class User {
    private Long id;
    private String username;
    private Integer age;
    private String address;
}
@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
            .<UserInfo, User>chunk(3)
            .reader(itemReader())
            .writer(itemWriter())
            .build();
}

@Bean
public ItemReader<UserInfo> itemReader() {
    return new FlatFileItemReaderBuilder<UserInfo>()
            .encoding("UTF-8")
            .name("userItemReader")
            .resource(new ClassPathResource("static/user.csv"))
            //.resource(new PathResource("/a/b/c/user.csv"))
            .delimited().delimiter(",")
            .names("id", "username", "age", "city", "area")
            .fieldSetMapper(fieldSetMapper())
            .build();
}

@Bean
public FieldSetMapper fieldSetMapper() {
    return new UserFieldSetMapper();
}
public class UserFieldSetMapper implements FieldSetMapper<User> {

    @Override
    public User mapFieldSet(FieldSet fieldSet) throws BindException {
        User user = new User();
        user.setId(fieldSet.readLong("id"));
        user.setUsername(fieldSet.readString("username"));
        user.setAge(fieldSet.readInt("age"));
        // 字段处理
        user.setAddress(fieldSet.readString("city") + fieldSet.readString("area"));

        return user;
    }
}

1.3 行映射 lineMapper

public ItemReader<UserInfo> itemReader() {
    //ESC
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer("\u001B");
    // 行结束标志0
    tokenizer.setQuoteCharacter('\u001A');
    tokenizer.setFieldSetFactory(new DefaultFieldSetFactory());
    tokenizer.setNames("id", "username", "age", "city", "area");

    BeanWrapperFieldSetMapper<UserInfo> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
    fieldSetMapper.setTargetType(UserInfo.class);

    DefaultLineMapper<UserInfo> lineMapper = new DefaultLineMapper<>();
    lineMapper.setLineTokenizer(tokenizer);
    lineMapper.setFieldSetMapper(fieldSetMapper);

    return new FlatFileItemReaderBuilder<UserInfo>()
            .encoding("UTF-8")
            .name("userItemReader")
            .resource(new ClassPathResource("static/user.dat"))
            .lineMapper(lineMapper)
            .build();
}

三:JsonItemReader

[
  {"id":1, "username":"a", "age":18},
  {"id":2, "username":"b", "age":17},
  {"id":3, "username":"c", "age":16},
  {"id":4, "username":"d", "age":15},
  {"id":5, "username":"e", "age":14}
]
@Bean
public JsonItemReader<UserInfo> itemReader() {
    ObjectMapper objectMapper = new ObjectMapper();
    JacksonJsonObjectReader<UserInfo> jsonObjectReader = new JacksonJsonObjectReader<>(UserInfo.class);
    jsonObjectReader.setMapper(objectMapper);

    return new JsonItemReaderBuilder<UserInfo>()
            .name("jsonItemReader")
            .resource(new ClassPathResource("static/user.json"))
            .jsonObjectReader(jsonObjectReader)
            .build();
}

MultiResourceItemReader

多文件读。

@Value("classpath:/static/user*.csv")
private Resource[] resources;

@Bean
public ItemReader<UserInfo> multiResourceItemReader() {
    return new MultiResourceItemReaderBuilder()
            .name("multiResourceItemReader")
            .resources(resources)
            .delegate(flatFileitemReader())
            .build();
}


@Bean
public FlatFileItemReader<UserInfo> flatFileitemReader() {
    return new FlatFileItemReaderBuilder<UserInfo>()
            .encoding("UTF-8")
            .name("userItemReader")
            //.resource(new PathResource("/a/b/c/user.csv"))
            .delimited().delimiter(",")
            .names("id", "username", "age", "city", "area")
            .targetType(UserInfo.class)
            .build();
}

XML

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-oxm</artifactId>
</dependency>
<dependency>
    <groupId>com.thoughtworks.xstream</groupId>
    <artifactId>xstream</artifactId>
    <version>1.4.7</version>
</dependency>
<?xml version="1.0" encoding="utf-8" ?>
<users>
    <user>
        <id>1</id>
        <username>huihui</username>
        <age>33</age>
        <address>上海市</address>
    </user>
</users>
@Bean
public ItemReader<User> xmlItemReader() {
    Map<String, Object> aliases = new HashMap<>();
    aliases.put("user", User.class);

    XStreamMarshaller unmarshaller = new XStreamMarshaller();
    unmarshaller.setAliases(aliases);

    return new StaxEventItemReaderBuilder<User>()
            .name("xmlItemReader")
            .resource(new ClassPathResource("static/user.xml"))
            .addFragmentRootElements("user")
            .unmarshaller(unmarshaller)
            .build();
}

四:数据库

3.1 JdbcCursorItemReader

游标一次读一条。
在这里插入图片描述

@Getter
@Setter
@ToString
public class User {
    private Long id;
    private String name;
    private int age;
}
public class UserRowMapper implements RowMapper<User> {
    @Override
    public User mapRow(ResultSet rs, int rowNum) throws SQLException {
        User user = new User();
        user.setId(rs.getLong("id"));
        user.setName(rs.getString("name"));
        user.setAge(rs.getInt("age"));
        return user;
    }
}
@Configuration
public class CursorDBReaderJob {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    @Bean
    public UserRowMapper userRowMapper(){
        return new UserRowMapper();
    }

    @Bean
    public JdbcCursorItemReader<User> userItemReader(){

        return new JdbcCursorItemReaderBuilder<User>()
                .name("userCursorItemReader")
                .dataSource(dataSource)
                .sql("select * from user where age > ?")
                .rowMapper(userRowMapper())
                //拼接参数
        		.preparedStatementSetter(new ArgumentPreparedStatementSetter(new Object[]{16}))
                .build();
	}

    @Bean
    public ItemWriter<User> itemWriter(){
        return new ItemWriter<User>() {
            @Override
            public void write(List<? extends User> items) throws Exception {
                items.forEach(System.err::println);
            }
        };
    }

    @Bean
    public Step step(){
        return stepBuilderFactory.get("step1")
                .<User, User>chunk(1)
                .reader(userItemReader())
                .writer(itemWriter())
                .build();

    }

    @Bean
    public Job job(){
        return jobBuilderFactory.get("cursor-db-reader-job")
                .start(step())
                .build();
    }
}

3.2 JdbcPagingItemReader 分页

一次性读一页。

在这里插入图片描述

@Configuration
public class PageDBReaderJob {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    @Bean
    public UserRowMapper userRowMapper(){
        return new UserRowMapper();
    }


    @Bean
    public PagingQueryProvider pagingQueryProvider() throws Exception {
        SqlPagingQueryProviderFactoryBean factoryBean = new SqlPagingQueryProviderFactoryBean();
        factoryBean.setDataSource(dataSource);
        factoryBean.setSelectClause("select *");   //查询列
        factoryBean.setFromClause("from user");    //查询的表
        factoryBean.setWhereClause("where age > :age"); //where 条件
        factoryBean.setSortKey("id");   //结果排序
        return factoryBean.getObject();
    }

    @Bean
    public JdbcPagingItemReader<User> userItemReader() throws Exception {
        HashMap<String, Object> param = new HashMap<>();
        param.put("age", 16);
        return new JdbcPagingItemReaderBuilder<User>()
                .name("userPagingItemReader")
                .dataSource(dataSource)  //数据源
                .queryProvider(pagingQueryProvider())  //分页逻辑
                .parameterValues(param)   //条件
                .pageSize(10) //每页显示条数
                .rowMapper(userRowMapper())  //映射规则
                .build();
    }

    @Bean
    public ItemWriter<User> itemWriter(){
        return new ItemWriter<User>() {
            @Override
            public void write(List<? extends User> items) throws Exception {
                items.forEach(System.err::println);
            }
        };
    }

    @Bean
    public Step step() throws Exception {
        return stepBuilderFactory.get("step1")
                .<User, User>chunk(1)
                .reader(userItemReader())
                .writer(itemWriter())
                .build();

    }

    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("page-db-reader-job1")
                .start(step())
                .build();
    }
}


3.3 MyBatisPagingItemReader

@Configuration
public class HelloWorldChunkJobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private SqlSessionFactory sqlSessionFactory;

    private static int PAGE_SIZE = 3;


    @Bean
    public Job helloWorldChunkJob() {
        return jobBuilderFactory.get("helloWorldChunkJob")
                .start(step1())
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<User, User>chunk(PAGE_SIZE)
                .reader(itemReader())
                .writer(itemWriter())
                .build();
    }

    @Bean
    public MyBatisPagingItemReader<User> itemReader() {
        Map<String, Object> map = new HashMap<>();
        map.put("id", 1);

        MyBatisPagingItemReader<User> itemReader = new MyBatisPagingItemReader<>();
        itemReader.setSqlSessionFactory(sqlSessionFactory);
        itemReader.setQueryId("com.example.batch.mapper.UserMapper.selectUserList");
        itemReader.setPageSize(PAGE_SIZE);
        itemReader.setParameterValues(map);
        return itemReader;
    }

    @Bean
    public ItemWriter<User> itemWriter() {
        return new ItemWriter<User>() {
            @Override
            public void write(List<? extends User> items) throws Exception {
                System.out.println("itemWriter=" + items);
            }
        };
    }
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.example.batch.mapper.UserMapper">
    <select id="selectUserList" resultType="com.example.batch.entity.User">
        select * from tbl_user
        where id > #{id}
        limit #{_pagesize} offset #{_skiprows}
    </select>

</mapper>

五:多线程读

  • userItemReader() 加上saveState(false) Spring Batch 提供大部分的ItemReader是有状态的,作业重启基本通过状态来确定作业停止位置,而在多线程环境中,如果对象维护状态被多个线程访问,可能存在线程间状态相互覆盖问题。所以设置为false表示关闭状态,但这也意味着作业不能重启了。

  • step() 方法加上 .taskExecutor(new SimpleAsyncTaskExecutor()) 为作业步骤添加了多线程处理能力,以块为单位,一个块一个线程,观察上面的结果,很明显能看出输出的顺序是乱序的。改变 job 的名字再执行,会发现输出数据每次都不一样。

@Bean
public FlatFileItemReader<User> userItemReader(){
    System.out.println(Thread.currentThread());

    FlatFileItemReader<User> reader = new FlatFileItemReaderBuilder<User>()
            .name("userItemReader")
            .saveState(false) //防止状态被覆盖
            .resource(new ClassPathResource("static/user.csv"))
            .delimited().delimiter("#")
            .names("id", "username", "age")
            .targetType(User.class)
            .build();

    return reader;
}

@Bean
public Step step(){
    return stepBuilderFactory.get("step1")
            .<User, User>chunk(2)
            .reader(userItemReader())
            .writer(itemWriter())
            .taskExecutor(new SimpleAsyncTaskExecutor())
            .build();

}

六:多步骤并行执行

@Bean
public Job parallelJob(){

    //线程1-读user-parallel.txt
    Flow parallelFlow1 = new FlowBuilder<Flow>("parallelFlow1")
            .start(flatStep())
            .build();

    //线程2-读user-parallel.json
    Flow parallelFlow2 = new FlowBuilder<Flow>("parallelFlow2")
            .start(jsonStep())
            .split(new SimpleAsyncTaskExecutor())
            .add(parallelFlow1)
            .build();


    return jobBuilderFactory.get("parallel-step-job")
            .start(parallelFlow2)
            .end()
            .build();
}

parallelJob() 配置job,需要指定并行的flow步骤,先是parallelFlow1然后是parallelFlow2 , 2个步骤间使用**.split(new SimpleAsyncTaskExecutor())** 隔开,表示线程池开启2个线程,分别处理parallelFlow1, parallelFlow2 2个步骤。

七:异常处理

方式一:设置跳过异常次数

@Bean
public Step step() throws Exception {
    return stepBuilderFactory.get("step1")
        .<User, User>chunk(1)
        .reader(userItemReader())
        .writer(itemWriter())
        .faultTolerant() //容错
        .skip(Exception.class)  //跳过啥异常
        .noSkip(RuntimeException.class)  //不能跳过啥异常
        .skipLimit(10)  //跳过异常次数
        .throttleLimit(10)
        .skipPolicy(new SkipPolicy() {
            @Override
            public boolean shouldSkip(Throwable t, int skipCount) throws SkipLimitExceededException {
                //定制跳过异常与异常次数
                return false;
            }
        })
        .build();

}

方式二:记录错误信息

public class ErrorItemReaderListener implements ItemReadListener {
    @Override
    public void beforeRead() {

    }

    @Override
    public void afterRead(Object item) {

    }

    @Override
    public void onReadError(Exception ex) {
        System.out.println("记录读数据相关信息...");
    }
}


方式三:直接跳过不处理

@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
            .<User, User>chunk(PAGE_SIZE)
            .reader(itemReader())
            .writer(itemWriter())
            .faultTolerant()
            .skip(Exception.class)
            .build();
}

http://lihuaxi.xjx100.cn/news/1243648.html

相关文章

【计算机网络基础】第3章 单元复习

文章目录 一. 单选题(共22题)二. 填空题(共7题)三. 判断题(共5题)一. 单选题(共22题) (单选题)以下哪一个不是关于千兆位以太网的正确描述( D )。 A. 支持全双工传送方式 B. 帧格式与以太网帧格式相同 C. 数据传输速率为1000Mb/S D. 只能基于光纤实现 (单选题)下列设…

clickhouse--通过命令行导入、导出数据示例

导入、导出的数据格式 ClickHouse可以接受和返回各种格式的数据,如CSV、JSON、XML、PARQUET等 输入支持的格式可用于解析提供给insert的数据,从文件支持的表(如File、URL或HDFS)执行select,或读取字典。支持的输出格式可用于安排SELECT的结果,并在文件支持的表中执行inser…

电商--抢购架构总结

文章目录 背景业务流程业务难点技术难点技术方案技术方向具体落地客户端流控网关流控容器流控后端接口流控数据库流控 流控总结优化读取加速异步化流程处理系统扩容 压测监控 总结参考文献 背景 这是个在做NFT电商项目时遇到的场景&#xff0c;要求运营可以商家某个系列的NFT商…

软件测试“入坑”学习指南:如何系统的学习

最近收到很多知友的**&#xff0c;说&#xff0c;想自学软件测试的一些知识&#xff0c;有没有好点的学习路线或者学习规划呢&#xff1f;本期&#xff0c;小编就简单整理一波学习知识点&#xff0c;作为分享。 一、软件测试工程师的职责 随着技术的发展&#xff0c;各种应用…

Unity3D:自定义 Editor 工具

推荐&#xff1a;将 NSDT场景编辑器 加入你的3D工具链 3D工具集&#xff1a; NSDT简石数字孪生 使用自定义 Editor 工具 从 Scene 视图中可以访问使用工具模式 API 创建的自定义工具。 您可以通过以下方式来访问自定义工具&#xff1a; 单击场景视图工具工具条叠加中的可用自…

给大家分享下什么是「API接口」

作为产品经理&#xff0c;了解清楚接口的相关知识是非常有必要的&#xff0c;毕竟总不想被技术大佬认为自己时什么都不懂的需求搬运工。那就往下看下去吧 -----拿去餐馆吃饭的例子 模拟网络请求流程 厨师是后端提供API&#xff0c;服务员是前端请求调用API&#xff0c;我们是用…

Ioctl()方式实现与驱动交互简洁框架

前言 ioctl是linux中一种除read和write之外的数据传递机制 驱动层IOCTL&#xff1a; int (*ioctl) (struct inode *inode, struct file *fp, unsigned int request, unsigned long args);以上函数参数的含义如下。 inode和fp用来确定被操作的设备。request就是用户程序下发…

Python之pyecharts的常见用法3-极坐标图-漏斗图

Pyecharts是一个基于Echarts的Python可视化库&#xff0c;可以用Python语言轻松地生成各种交互式图表和地图。它支持多种图表类型&#xff0c;包括折线图、柱状图、散点图、饼图、地图等&#xff0c;并且可以通过简单的API调用实现数据可视化。 Pyecharts的优点包括&#xff1a…