4 months ago

JobScope JobExecutionListener Tasklet

說明一下 怎麼使用這三個東西

JobScope 可跨 Step 做資料交換
JobScope 遇到 ThreadPool 的問題
JobExecutionListener 監控任務的結束
Tasklet 非批次動作

JobScope

先說 JobScope 顧名思義 就是他可以跨 Step 做一個溝通或是暫存的手段, 這是很方便的功能
我是利用它來做一個 搜集所有 Step 的錯誤訊息, 或是把資料暫時放在記憶體中跨到下一個 Step 來使用

説先說一下怎麼做錯誤的收集 程式如下

import lombok.Data;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.stereotype.Component;

@Data
@Component
@JobScope
public class BacthJobEvents {
    private Boolean notifyEnable = Boolean.FALSE;
    private Boolean hasError = Boolean.FALSE;
    private StringBuilder events = new StringBuilder();

    public void append(String message) {
        this.events.append(message + "\r\n");
    }
    public void appendError(String message) {
        this.notifyEnable = Boolean.TRUE;
        this.hasError = Boolean.TRUE;
        this.append(message);
    }
}

並沒有很複雜, 就是提供一個訊息的輸入口跟紀錄用的 StringBuilder, 這樣一來在任何 Step 中都可以使用

實際上使用方式, 就可以把訊息吐到 BacthJobEvents 物件內, 之後在 Listener 查看是否有錯誤訊息來決定後續是否要通知相關人員

@Autowired
private BacthJobEvents bacthJobEvents;

public ItemProcessor<ImportFinanceDto, ImportFinanceDto> sendTag() {
    return new ItemProcessor<ImportFinanceDto, ImportFinanceDto>() {
        @Override
        public ImportFinanceDto process(ImportFinanceDto importFinanceDto) throws Exception {
            ImportFinanceDto return_obj = null;
            MemberTagCreateResult memberTagCreateResult = null;
            try {
                // 先建檔

                memberTagCreateResult = memberTagService.tagCreate(importFinanceDto);
                // 都成功了就要回傳

                return_obj = importFinanceDto;
            } catch (BaseException e) {
                log.error("打標系統發生錯誤 錯誤訊息 BaseException", e);
                bacthJobEvents.appendError("打標系統發生錯誤 錯誤訊息 " + e.getMessage());
            } catch (Exception e) {
                log.error("打標系統發生未知錯誤 錯誤訊息 UnknowException", e);
                bacthJobEvents.appendError("打標系統發生未知錯誤 錯誤訊息 " + e.getMessage());
            }
            return return_obj;
        }
    };
}

但是用這個 JobScope 要注意一下是否有配置 TaskExecutor 來執行, 如果有的話 請照下面程式做設定, 你的 TaskExecutor 必須 註冊 JobExecution , 不然 當 Thread 換回 主 Thread 的時候, 他會沒有 Jobscope 的 Bean 而造成錯誤

@Bean(name = "threadPoolTaskExecutor-ERP")
public TaskExecutor threadPoolTaskExecutorERP() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() {
        @Override
        public void execute(Runnable task) {
            if (JobSynchronizationManager.getContext() != null) {
                // 须作此注册, 不然切换回主 Thread 的时候会丧失 Jobscope 的 Bean 造成错误

                final JobExecution jobExecution = JobSynchronizationManager.getContext().getJobExecution();
                super.execute(() -> {
                    JobSynchronizationManager.register(jobExecution);
                    try {
                        task.run();
                    } finally {
                        JobSynchronizationManager.close();
                    }
                });
            } else {
                // 如果是一般執行緒就不會有Context

                super.execute(() -> task.run());
            }
        }
    };
    executor.setCorePoolSize(10);
    executor.setMaxPoolSize(20);
    executor.setKeepAliveSeconds(30);
    executor.setQueueCapacity(1000);
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.setThreadNamePrefix("ERP-task");
    executor.initialize();
    return executor;
}

這個 JobScope 的另一種用法就是當作 Step 跟 Step 的資料傳遞
首先定義 JobScope 物件

import com.ps.batch.dto.batch.ImportUserDto;
import lombok.Data;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;

@Data
@Component
@JobScope
public class BatchJobImportUserDtoTemp {
    private List<ImportUserDto> importUserDtoList = new ArrayList<>();
}

之後 使用的時候 在 writer 可以儲存到 BatchJobImportUserDtoTemp , 有用 多執行緒的記得加 synchronized

@Autowired
private BatchJobImportUserDtoTemp batchJobImportUserDtoTemp;

public ItemWriter<ImportUserDto> saveToMemory() {
    return new ItemWriter<ImportUserDto>() {
        private Long jobId;

        @BeforeStep
        public void getInterstepData(StepExecution stepExecution) {
            JobExecution jobExecution = stepExecution.getJobExecution();
            this.jobId = jobExecution.getJobId();
        }

        @Override
        public synchronized void write(List<? extends ImportUserDto> list) throws Exception {
            log.debug("jobid {}", jobId);
            log.debug("List 儲存至記憶體 {}", list);
            batchJobImportUserDtoTemp.getImportUserDtoList().addAll(list);
        }
    };
}

在下一個 Step 的時候就可以在 reader 把他讀出來

@Autowired
private BatchJobImportUserDtoTemp batchJobImportUserDtoTemp;

public ItemReader<ImportUserDto> getFromMemory() {
    return new ItemReader<ImportUserDto>() {
        private int nextIndex = 0;

        @Override
        public synchronized ImportUserDto read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
            ImportUserDto importUser = null;
            if (nextIndex < batchJobImportUserDtoTemp.getImportUserDtoList().size()) {
                importUser = batchJobImportUserDtoTemp.getImportUserDtoList().get(nextIndex);
                log.debug("從記憶體讀出 nextIndex={}, ImportUserDto={}", nextIndex, importUser);
                nextIndex++;
            }
            return importUser;
        }
    };
}

兩個 Step 組合起來就會像是這樣

public Job exportNewCustomerFinance() throws Exception {
    return jobBuilderFactory.get("exportNewCustomerFinance")
            .incrementer(new RunIdIncrementer())
            .start(step_findNewImportUser())
            .next(step_findFinanceByImportUser())
            .listener(batchJobCompletionListener)
            .build();
}

public Step step_findNewImportUser() {
    return stepBuilderFactory.get("findNewImportUser")
            .<ImportUserDto, ImportUserDto>chunk(100)
            .reader(importUserDtoReader.findNewImportUserThreadSafe())
            .processor(importUserDtoProcessor.filterIsFreezing())
            .writer(importUserDtoWriter.saveToMemory())
            .taskExecutor(taskExecutor)
            .throttleLimit(20)
            .build();
}

public Step step_findFinanceByImportUser() {
    return stepBuilderFactory.get("findFinanceByImportUser")
            .<ImportUserDto, List<FinanceDto>>chunk(100)
            .reader(importUserDtoReader.getFromMemory())
            .processor(importUserDtoProcessor.findFinanceDtoListByImportUser())
            .writer(financeDtoWriter.saveListToMemory())
            .taskExecutor(taskExecutor)
            .throttleLimit(50)
            .build();
}

JobExecutionListener

先說 JobExecutionListener 顧名思義就是監聽 Job 的事件或結果, 然後我再結合 上面 @JobScope BacthJobEvents 可以取得在整個 Job 中是否有錯誤訊息需要處理
完整程式如下

import com.ps.batch.service.JobService;
import com.ps.batch.service.NotifyService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.List;

@Slf4j
@Component
public class BatchJobCompletionListener extends JobExecutionListenerSupport {
    @Autowired
    private BacthJobEvents bacthJobEvents;
    @Autowired
    private NotifyService notifyService;
    @Autowired
    private JobService jobService;

    /**
     * 任務結束後判斷結果跟決定是否通知
     *
     * @param jobExecution
     */
    @Override
    public void afterJob(JobExecution jobExecution) {
        JobParameters jobParameters = jobExecution.getJobParameters();
        String jobName = this.getJobName(jobParameters);
        String mailTo = this.getMailTo(jobParameters);
        if (jobExecution.getStatus() == BatchStatus.FAILED) {
            notifyService.notifyError("異常終止通知", this.createMessageContent(jobExecution), jobName, mailTo);
        }
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {

        }
        if (bacthJobEvents.getNotifyEnable() == true) {
            if (bacthJobEvents.getHasError()) {
                notifyService.notifyError("錯誤結果通知", bacthJobEvents.getEvents().toString(), jobName, mailTo);
            } else {
                notifyService.notifyGood("結果通知", bacthJobEvents.getEvents().toString(), jobName, mailTo);
            }
        }
    }

    /**
     * 錯誤訊息的組合
     *
     * @param jobExecution
     * @return
     */
    private String createMessageContent(JobExecution jobExecution) {
        List<Throwable> exceptions = jobExecution.getAllFailureExceptions();
        StringBuilder content = new StringBuilder();
        content.append("Job execution #");
        content.append(jobExecution.getId());
        content.append(" of job instance #");
        content.append(jobExecution.getJobInstance().getId());
        content.append(" failed with following exceptions:");
        for (Throwable exception : exceptions) {
            content.append("");
            content.append(formatExceptionMessage(exception));
        }
        return content.toString();
    }

    /**
     * 讀出完整錯誤訊息
     *
     * @param exception
     * @return
     */
    private String formatExceptionMessage(Throwable exception) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        exception.printStackTrace(new PrintStream(baos));
        return baos.toString();
    }

    /**
     * 取出儲存在 JobParameters 內的任務名稱
     *
     * @param jobParameters
     * @return
     */
    private String getJobName(JobParameters jobParameters) {
        String jobName = null;
        try {
            if (StringUtils.hasText(jobParameters.getString("jobName"))) {
                jobName = jobService.decodeFromBase64Str(jobParameters.getString("jobName"));
            }
        } catch (Exception e) {
            log.error("", e);
        }
        return jobName;
    }

    /**
     * 取得是否指定通知的信箱
     *
     * @param jobParameters
     * @return
     */
    private String getMailTo(JobParameters jobParameters) {
        String mailTo = null;
        try {
            if (StringUtils.hasText(jobParameters.getString("mailTo"))) {
                mailTo = jobService.decodeFromBase64Str(jobParameters.getString("mailTo"));
            }
        } catch (Exception e) {
            log.error("", e);
        }
        return mailTo;
    }
}

透過 Autowired 來取的 Job 生命週期中的 BacthJobEvents 查看是否有錯誤訊息的事件

而使用的時候在 jobBuilderFactory 去配置 listener 就可以了

@Autowired
private BatchJobCompletionListener batchJobCompletionListener;

public Job exportMemberTag() throws Exception {
    return jobBuilderFactory.get("exportTag")
            .incrementer(new RunIdIncrementer())
            .start(step_exportMemberTag())
            .next(step_exportImportFinanceDtoTag())
            .listener(batchJobCompletionListener)
            .build();

Tasklet

接下來說 Tasklet

Tasklet 概念就不像那些 ItemReader 有一批一批的概念, 而是一個步驟你一次做到完就結束

舉個例子來說, 我需要在最後一個步驟把所有資料排序跟去除重複的相同用戶ID

@Autowired
private JobScopeImportFinanceDto jobScopeImportFinanceDto;

/**
 * 將記憶體的資料按照時間先後順序排列,並只取時間排序第一筆
 * 這功能只會用單一 Thread 執行所以簡單鎖就可以
 *
 * @return
 */
public Tasklet taskletSelectTopOneByUserid() {
    return new Tasklet() {
        private List<Long> useridList = new ArrayList<>();
        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext context) {
            // 這一批要執行的

            List<ImportFinanceDto> importFinanceDtoListNew = new ArrayList<>();
            // 從記憶體取出來全部要執行的

            List<ImportFinanceDto> importFinanceDtoList = jobScopeImportFinanceDto.getImportFinanceDtoList();
            // 排序

            importFinanceDtoList = importFinanceDtoList.stream()
                    .sorted((f1, f2) -> Long.compare(f1.getOrgcreatetime(), f2.getOrgcreatetime()))
                    .collect(Collectors.toList());
            // 去重

            for (ImportFinanceDto importFinanceDto : importFinanceDtoList) {
                synchronized (useridList) {
                    if (useridList.contains(importFinanceDto.getTerminusUserId())) {
                        // 跳過

                        log.warn("同一批重複的 UserID = {}", importFinanceDto.getTerminusUserId());
                    } else {
                        useridList.add(importFinanceDto.getTerminusUserId());
                        importFinanceDtoListNew.add(importFinanceDto);
                    }
                }
            }
            // 設置

            jobScopeImportFinanceDto.setImportFinanceDtoList(importFinanceDtoListNew);
            return RepeatStatus.FINISHED;
        }
    };
}

中間邏輯就不用看大概就排序跟去重, 記得都做完後 就要回傳 RepeatStatus.FINISHED

然後 Tasklet 要再包裝成 Step

public Step step_selectTopOneByUserid() {
    return stepBuilderFactory.get("selectTopOneByUserid")
            .tasklet(importFinanceDtoProcessor.taskletSelectTopOneByUserid()).build();
}

大致上就這樣完成了

← SpringBatch 3 基礎使用教學 如何使用 JOOQ 產生符合 JPA 規範的 Entity →
 
comments powered by Disqus