4 months ago

批次作業如果要處理的比較好 其實有非常多細節還是要去處理 比如 排程執行的紀錄 資料輸入輸出的統計 每一個 任務的成功失敗 資料流...等等...到 Retry / Skip 的處理.

這邊開發時是使用 Spring Boot 1.5.9 所以搭配的是 Spring Batch 3.0.8 , 現在 SpringBoot 2 發佈出來了, 搭配的是 Spring Batch 4.0.0, 如果開新專案建議直接開 SpringBoot 2 來開發吧, 後面再整理一下 Spring Batch 4.0.0 的寫法.

介紹一下

基礎流程
編排 Job
編排 Step
撰寫 ItemReader
ItemReader on MultiThreading
ItemReader use JdbcPaging
撰寫 processor
撰寫 ItemWriter
撰寫 ItemWriter use Dto
processor 串接

基礎流程

基本上呢, 就是下圖這樣的架構

最小的 Step 就是由 ItemReader(讀來源) -> ItemProcessor(處理) -> ItemWriter(寫結果) 作為一次的處理動作

複數的 Step 可以組合在一起變一個 大的 Job 就這樣而已

而每一個 Step 的啟動 結束 成功失敗 讀多少筆資料 處理筆數 寫入筆數 都會完整記錄在 資料庫 中, 也可以大概知道排程處理資料的結果.

下面這是啟動的主程式

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;  
import org.springframework.boot.SpringApplication;  
import org.springframework.boot.autoconfigure.SpringBootApplication;  
import org.springframework.context.ApplicationContext;  
import org.springframework.data.jpa.repository.config.EnableJpaAuditing;  
import org.springframework.scheduling.annotation.EnableAsync;  
import org.springframework.scheduling.annotation.EnableScheduling;  
  
import java.util.TimeZone;  
  
@EnableAsync  
@EnableScheduling  
@EnableJpaAuditing  
@EnableBatchProcessing  
@SpringBootApplication  
public class ImportApplication {  
    public static void main(String\[\] args) {  
        TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"));  
  ApplicationContext context = SpringApplication.run(ImportApplication.class, args);  
  }  
}

@EnableBatchProcessing
就是啟用 SpringBatch 如果你還沒有 Batch 用的排程資料表, 他則會用你預設的 DataSource 來建立會用到的表格.

接下來介紹程式部分, 下面這支是一個很基本的作業流程配置

package com.ps.batch.schedule.config;

import com.ps.batch.batch.serivce.ImportFinanceService;
import com.ps.batch.constant.ImportFinanceConstant;
import com.ps.batch.dto.batch.ImportFinanceDto;
import com.ps.batch.dto.batch.ImportUserDto;
import com.ps.batch.schedule.BatchJobCompletionListener;
import com.ps.batch.schedule.processor.ImportFinanceDtoProcessor;
import com.ps.batch.schedule.processor.ImportUserDtoProcessor;
import com.ps.batch.schedule.reader.ImportFinanceDtoReader;
import com.ps.batch.schedule.reader.ImportUserDtoReader;
import com.ps.batch.schedule.writer.ImportFinanceDtoWriter;
import com.ps.batch.schedule.writer.ImportUserDtoWriter;
import com.ps.batch.service.JobService;
import com.ps.batch.service.MemberTagService;
import com.ps.batch.service.NotifyService;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.Scheduled;


/**
 * 會員打標任務
 */

@Data
@Slf4j
@Configuration
public class BatchJobMemberTag {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private BatchJobCompletionListener batchJobCompletionListener;
    @Autowired
    private ImportFinanceService importFinanceService;
    @Autowired
    private MemberTagService memberTagService;

    private String jobName = "健身房 会员 打標 作業";

    @Value("${membertag.api.notify.mailto}")
    private String membertagApiNotifyMailto;

    @Autowired
    private JobService jobService;
    @Autowired
    private ImportFinanceDtoReader importFinanceDtoReader;
    @Autowired
    private ImportFinanceDtoProcessor importFinanceDtoProcessor;
    @Autowired
    private ImportFinanceDtoWriter importFinanceDtoWriter;
    @Autowired
    private ImportUserDtoReader importUserDtoReader;
    @Autowired
    private ImportUserDtoProcessor importUserDtoProcessor;
    @Autowired
    private ImportUserDtoWriter importUserDtoWriter;

    @Autowired
    private NotifyService notifyService;

    @Autowired
    @Qualifier("threadPoolTaskExecutor")
    private TaskExecutor taskExecutor;

    @Scheduled(initialDelay = 1 * 1000, fixedDelay = 10 * 60 * 1000)
    public void jobRun() {
        try {
            JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
            jobParametersBuilder.addLong("time", System.currentTimeMillis());
            jobParametersBuilder.addString("jobName", jobService.encodeToBase64Str(jobName));
            jobParametersBuilder.addString("mailTo", jobService.encodeToBase64Str(membertagApiNotifyMailto));
            JobParameters jobParameters = jobParametersBuilder.toJobParameters();
            jobService.runJob(exportMemberTag(), jobParameters, jobName);
        } catch (Exception e) {
            notifyService.notifyError("異常終止通知", "啟動失敗=" + e.getMessage(), jobName, null);
        }
    }

    public Job exportMemberTag() throws Exception {
        return jobBuilderFactory.get("exportTag")
                .incrementer(new RunIdIncrementer())
                .start(step_exportMemberTag())
                .next(step_exportImportFinanceDtoTag())
                .listener(batchJobCompletionListener)
                .build();
    }

    public Step step_exportImportFinanceDtoTag() throws Exception {
        return stepBuilderFactory.get("exportImportFinanceTag")
                .<ImportFinanceDto, ImportFinanceDto>chunk(100)
                .reader(importFinanceDtoReader.getUnTaggedImportFinance())
                .processor(importFinanceDtoProcessor.sendTag())
                .writer(importFinanceDtoWriter.updateTaggedWriter(ImportFinanceConstant.membertagged_tagged))
                .taskExecutor(taskExecutor)
                .throttleLimit(10)
                .build();
    }

    public Step step_exportMemberTag() throws Exception {
        return stepBuilderFactory.get("exportImportUserTag")
                .<ImportUserDto, ImportUserDto>chunk(100)
                .reader(importUserDtoReader.findUnTaggedImportUser())
                .processor(importUserDtoProcessor.sendTag())
                .writer(importUserDtoWriter.updateMembertaggedIsTrue())
                .taskExecutor(taskExecutor)
                .throttleLimit(10)
                .build();
    }
}

首先從 Scheduled 開始看, 這是 Spring Batch 提供的註解, 可用來配置定時任務, 可以用 cron 或是 initialDelay & fixedDelay 來設定, 另外要注意一點這邊啟動的時候會用著主執行緒一路執行下去, 也就是執行完一個下一次的才會啟動喔.

在啟動排程中, 我建立了 JobParametersBuilder 來傳遞必要的參數在排程中可以存取 例如任務名稱 jobName 跟 發信通知的對象 mailTo,
這邊注意 這些參數 SpringBatch 都會儲存在 batch_job_execution_params 的表格內, 但是如果你放中文的話會儲存失敗造成 Exception, 所以我這邊就稍微編碼成 Base64 再傳送進去

JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addLong("time", System.currentTimeMillis());
jobParametersBuilder.addString("jobName", jobService.encodeToBase64Str(jobName));
jobParametersBuilder.addString("mailTo", jobService.encodeToBase64Str(membertagApiNotifyMailto));
JobParameters jobParameters = jobParametersBuilder.toJobParameters();

batch_job_execution_params 表格內的資料大概是像這樣

JOB_EXECUTION_ID TYPE_CD KEY_NAME STRING_VAL DATE_VAL LONG_VAL DOUBLE_VAL IDENTIFYING
128992 LONG time 1970-01-01 08:00:00 1521009519606 0 Y
128992 STRING jobName 5YGl6Lqr5oi/IOWinumHjyDkvJrlkZgg5ZCM5q2l 1970-01-01 08:00:00 0 0 Y
128993 LONG time 1970-01-01 08:00:00 1521009530611 0 Y
128993 STRING jobName 5YGl6Lqr5oi/IOWinumHjyDorqLljZUg5ZCM5q2lKDI05bCP5pmCKQ== 1970-01-01 08:00:00 0 0 Y

然後 我的 jobService 做了什麼事?

jobService.runJob(exportMemberTag(), jobParameters, jobName);

其實就是做一個任務的啟動總開關
因為難免有要更換版的時候, 所以做了一個開關控制, 更新版本時先暫停不要再啟動新的任務, 等現在任務都結束後再進行 shoutdown 後更換 jar 檔 會是比較保險, 以免有的任務有卡 交易等等...善後很麻煩啊XD

public void runJob(Job job, JobParameters jobParameters, String jobName) {
    if (BatchConstant.batchEnable.booleanValue() == Boolean.TRUE.booleanValue()) {
        try {
            jobLauncher.run(job, jobParameters);
        } catch (Exception e) {
            log.error("", e);
            notifyService.notifyError("任务启动失败通知", e.getMessage(), jobName, null);
        }
    } else {
        log.warn("任务暫停執行通知 {}", jobName);
    }
}

編排 Job

接下來看怎麼去編排一個任務

public Job exportMemberTag() throws Exception {
        return jobBuilderFactory.get("exportTag")
                .incrementer(new RunIdIncrementer())
                .start(step_exportMemberTag())
                .next(step_exportImportFinanceDtoTag())
                .listener(batchJobCompletionListener)
                .build();
    }

透過工廠物件 jobBuilderFactory 去 組合你的 Step 步驟順序 跟監聽器 listener

編排 Step

那再來看我們的第一個 Step (step_exportMemberTag) 怎麼編排出來的

public Step step_exportImportFinanceDtoTag() throws Exception {
    return stepBuilderFactory.get("exportImportFinanceTag")
            .<ImportFinanceDto, ImportFinanceDto>chunk(100)
            .reader(importFinanceDtoReader.getUnTaggedImportFinance())
            .processor(importFinanceDtoProcessor.sendTag())
            .writer(importFinanceDtoWriter.updateTaggedWriter(ImportFinanceConstant.membertagged_tagged))
            .taskExecutor(taskExecutor)
            .throttleLimit(10)
            .build();
}

也是透過 stepBuilderFactory 去組合出來, 這邊需要定義的 chunk 是從 reader 讀出來一次要處理多少筆,
chunk 前面泛型 ImportFinanceDto 定義 reader & processor & writer 中間處理的資料型態

taskExecutor 則是定義用哪個 ThreadPool 這邊你可以先略過, 因為要用的話還有其他配置要搭配處理
throttleLimit 則是讓資料同時發散出去的大小, 比如設定 10 就同時會讓 10 筆資料丟到 ThreadPool 讓他去執行, 跟上面 taskExecutor 要搭配一起配置

撰寫 Reader

再來看一下 reader 怎麼寫

public ItemReader<ImportFinanceDto> getUnTaggedImportFinance() {
    JdbcCursorItemReader<ImportFinanceDto> reader = new JdbcCursorItemReader<ImportFinanceDto>();
    reader.setSql("select * from importfinance where membertagged = 0 order by orgcreatetime ");
    reader.setDataSource(batchDataSource);
    reader.setRowMapper(this.getImportFinanceDtoRowMapper());
    return reader;
}

private RowMapper<ImportFinanceDto> getImportFinanceDtoRowMapper() {
    return new RowMapper<ImportFinanceDto>() {
        @Override
        public ImportFinanceDto mapRow(ResultSet resultSet, int i) throws SQLException {
            if (!(resultSet.isAfterLast()) && !(resultSet.isBeforeFirst())) {
                ImportFinanceDto importFinanceDto = new ImportFinanceDto();
                importFinanceDto.setSerid(resultSet.getLong("serid"));
                importFinanceDto.setPlatform(resultSet.getString("platform"));
                importFinanceDto.setPpk(resultSet.getString("ppk"));
                importFinanceDto.setRulecodes(resultSet.getString("rulecodes"));
                importFinanceDto.setSaleamount(resultSet.getDouble("saleamount"));
                importFinanceDto.setTradeamount(resultSet.getDouble("tradeamount"));
                importFinanceDto.setSource(resultSet.getLong("source"));
                importFinanceDto.setSourcetype(resultSet.getInt("sourcetype"));
                importFinanceDto.setOuterid(resultSet.getString("outerid"));
                importFinanceDto.setOutertype(resultSet.getInt("outertype"));
                importFinanceDto.setTerminusUserId(resultSet.getLong("terminususerid"));
                importFinanceDto.setExported(resultSet.getInt("exported"));
                importFinanceDto.setCardtype(resultSet.getString("cardtype"));
                importFinanceDto.setOrgcreatetime(resultSet.getLong("orgcreatetime"));
                importFinanceDto.setMembertagged(resultSet.getInt("membertagged"));
                importFinanceDto.setCreateddate(resultSet.getDate("createddate"));
                importFinanceDto.setLastmodifieddate(resultSet.getDate("lastmodifieddate"));
                return importFinanceDto;
            } else {
                log.info("Returning null from rowMapper");
                return null;
            }
        }
    };
}

ItemReader on MultiThreading

如果你有需要使用 多執行緒下去取資料 簡單一點可以加個 synchronized 像下面這樣

public ItemReader<ImportFinanceDto> getUnExportImportFinance() {
    JdbcCursorItemReader<ImportFinanceDto> reader = new JdbcCursorItemReader<ImportFinanceDto>() {
        @Override
        public synchronized ImportFinanceDto read() throws Exception {
            return super.read();
        }
    };
    reader.setSql("select * from importfinance where exported = 0 order by orgcreatetime ");
    reader.setDataSource(batchDataSource);
    reader.setRowMapper(this.getImportFinanceDtoRowMapper());
    return reader;
}

ItemReader on JdbcPaging

如果你想真的比較有效率的取資料 則可以改用 JdbcPagingItemReader 分頁來讀取資料庫 , 下面是另一個範例

public ItemReader<ImportUserDto> findNewImportUserThreadSafe() {
    JdbcPagingItemReader<ImportUserDto> jdbcPagingItemReader = new JdbcPagingItemReader();
    jdbcPagingItemReader.setDataSource(batchDataSource);
    jdbcPagingItemReader.setFetchSize(10);
    jdbcPagingItemReader.setPageSize(100);
    try {
        jdbcPagingItemReader.setQueryProvider(new MySqlPagingQueryProvider() {{
            setSelectClause("select * ");
            setFromClause("from importusers");
            setWhereClause("isnew = :isnew");
            setSortKeys(new HashMap<String, Order>() {{
                put("serid", Order.ASCENDING);
            }});
        }});
        jdbcPagingItemReader.setParameterValues(new HashMap<String, Object>() {{
            put("isnew", "1");
        }});
        jdbcPagingItemReader.setRowMapper(this.getImportUserDtoRowMapper());
        // 這個要做, 不然沒有 JdbcTemplate 可以操作

        jdbcPagingItemReader.afterPropertiesSet();
    } catch (Exception e) {
        e.printStackTrace();
    }
    return jdbcPagingItemReader;
}

撰寫 processor

再講 processor 的中間處理, 在這階段你可以做一些像過濾, 或是轉換物件給下一階段 Step 處理

public ItemProcessor<ImportFinanceDto, ImportFinanceDto> sendTag() {
    return new ItemProcessor<ImportFinanceDto, ImportFinanceDto>() {
        @Override
        public ImportFinanceDto process(ImportFinanceDto importFinanceDto) throws Exception {
            ImportFinanceDto return_obj = null;
            MemberTagCreateResult memberTagCreateResult = null;
            try {
                // 建檔

                memberTagCreateResult = memberTagService.tagCreate(importFinanceDto);
                // 都成功了就要回傳

                return_obj = importFinanceDto;
            } catch (Exception e) {
                log.error("打標系統發生未知錯誤 錯誤訊息 UnknowException", e);
                bacthJobEvents.appendError("打標系統發生未知錯誤 錯誤訊息 " + e.getMessage());
            }
            return return_obj;
        }
    };
}

撰寫 ItemWriter

最後 接收結果來更新資料庫

public ItemWriter<ImportFinanceDto> updateTaggedWriter(Integer membertagged) {
    JdbcBatchItemWriter<ImportFinanceDto> databaseItemWriter = new JdbcBatchItemWriter<ImportFinanceDto>();
    databaseItemWriter.setDataSource(batchDataSource);
    databaseItemWriter.setSql("update importfinance set membertagged = ?, lastmodifieddate = ? where serid = ? ");
    databaseItemWriter.setItemPreparedStatementSetter(updateTaggedWriterSetter(membertagged));
    return databaseItemWriter;
}

private ItemPreparedStatementSetter<ImportFinanceDto> updateTaggedWriterSetter(Integer membertagged) {
    return (importFinance, ps) -> {
        ps.setInt(1, membertagged);
        ps.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
        ps.setLong(3, importFinance.getSerid());
    };
}

撰寫 ItemWriter use Dto

上面是使用像 PreparedStatement 的語句來進行配置, 有時欄位一多就對到眼花, 你可以換像用 Hibernate 的 SQL 寫法 使用 : 來指定參數 參考如下 直接使用 InLeftDto 來提供參數

public ItemWriter<InLeftDto> saveInLeft() {
        JdbcBatchItemWriter<InLeftDto> databaseItemWriter = new JdbcBatchItemWriter<InLeftDto>();
        NamedParameterJdbcTemplate jdbcTemplate = new NamedParameterJdbcTemplate(erpDataSource);
        databaseItemWriter.setJdbcTemplate(jdbcTemplate);
        databaseItemWriter.setSql("REPLACE into T_INLEFT ( VC_CODE, VC_CLUB) values ( :vcCode, :vcClub)");
        ItemSqlParameterSourceProvider<InLeftDto> paramProvider =
                new BeanPropertyItemSqlParameterSourceProvider<InLeftDto>();
        databaseItemWriter.setItemSqlParameterSourceProvider(paramProvider);
        databaseItemWriter.afterPropertiesSet();
        return databaseItemWriter;
    }

這樣就是一個基本 Batch 操作

processor 串接

如果你有很多不同的處理階段想串起來, 寫成像下面這樣, 那他只會執行最後一個

public Step step_findRecentlyCreatedCustomer() throws Exception {
        return stepBuilderFactory.get("findRecentlyCreatedCustomer")
                .<CustomerDto, ImportUserDto>chunk(10)
                .reader(customerDtoReader.findRecentlyCreatedCustomerDto())
                .processor(customerDtoProcessor.filterExistsImportUser())
                .processor(terminusProcessor.createMemberByCustomerDto())
                .writer(importUserDtoWriter.createImportUser())
                .build();
    }

你必須寫一個 CompositeItemProcessor 把它組合起來

public CompositeItemProcessor<CustomerDto, ImportUserDto> findRecentlyCreatedCustomerProcessor() throws Exception {
    CompositeItemProcessor<CustomerDto, ImportUserDto> compositeItemProcessor = new CompositeItemProcessor<>();
    compositeItemProcessor.setDelegates(Arrays.asList(
            customerDtoProcessor.filterExistsImportUser(),
            terminusProcessor.createMemberByCustomerDto()
    ));
    return compositeItemProcessor;
}

最後 你才可以使用

public Step step_findRecentlyCreatedCustomer() throws Exception {
    return stepBuilderFactory.get("findRecentlyCreatedCustomer")
            .<CustomerDto, ImportUserDto>chunk(10)
            .reader(customerDtoReader.findRecentlyCreatedCustomerDto(ShenyangGymConstant.platform))
            .processor(findRecentlyCreatedCustomerProcessor())
            .writer(importUserDtoWriter.createImportUser())
            .build();
}
← Configuring Visual Studio Code Run SpringBoot SpringBatch 3 how to use JobScope JobExecutionListener Tasklet →
 
comments powered by Disqus