over 2 years ago

基於前一篇因為在啟動程序中直接開始爬蟲會有執行上有時異常,主程序就跳掉的問題,所以就調整一下順便用SpringData看看,並改成多執行緒

build.gradle
buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath('org.springframework.boot:spring-boot-gradle-plugin:1.2.5.RELEASE')
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'spring-boot'

sourceCompatibility = 1.8
targetCompatibility = 1.8
version = '1.0'

repositories {
    mavenCentral()
}

dependencies {
    compile 'org.springframework.boot:spring-boot-starter-data-elasticsearch:1.2.5.RELEASE',
            'org.springframework.boot:spring-boot-starter-batch:1.2.5.RELEASE',
            'org.springframework.boot:spring-boot-starter-test:1.2.5.RELEASE'
            
    compile 'commons-codec:commons-codec:1.10'
    
    compile 'org.projectlombok:lombok:1.16.4'
    compile 'org.jsoup:jsoup:1.8.2'
}

設定檔

application.yml
spring.data.elasticsearch.cluster-nodes: 10.1.99.175:9300

spring.data.elasticsearch.repositories.enabled: true


taskthread.pool.corePoolSize: 10

taskthread.pool.maxPoolSize: 20

taskthread.pool.queueCapacity: 50

taskthread.pool.keepAliveSeconds: 60

taskthread.poolmonitor.keepMonitoringSeconds: 10

主程式

BatchApplication.java
package com.sam.app;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;

import com.sam.service.CrawlerService;

@SpringBootApplication
@EnableBatchProcessing
@EnableElasticsearchRepositories(basePackages = {"com.sam.repository"})
@ComponentScan(basePackages = "com.sam")
public class BatchApplication {

    //線程池參數

    @Value("${taskthread.pool.corePoolSize}")
    private int corePoolSize;
    @Value("${taskthread.pool.maxPoolSize}")
    private int maxPoolSize;
    @Value("${taskthread.pool.queueCapacity}")
    private int queueCapacity;
    @Value("${taskthread.pool.keepAliveSeconds}")
    private long keepAliveSeconds;
    @Value("${taskthread.poolmonitor.keepMonitoringSeconds}")
    private int keepMonitoringSeconds;

    @Bean
    public ThreadPoolExecutor threadPoolExecutor(){
        //初始化連線池

        ThreadPoolExecutor threadpool = new ThreadPoolExecutor(
                corePoolSize, 
                maxPoolSize, 
                keepAliveSeconds, 
                TimeUnit.SECONDS, 
                new ArrayBlockingQueue(queueCapacity), 
                new ThreadPoolExecutor.CallerRunsPolicy());
        System.out.println("連線池創建成功");
        //啟動連線池監控器

        ThreadPoolMonitor monitor = new ThreadPoolMonitor(threadpool, keepMonitoringSeconds);
        Thread monitorThread = new Thread(monitor);
        monitorThread.start();
        return threadpool;
    }

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Autowired
    private CrawlerService crawlerService;

    @Bean
    protected Tasklet tasklet() {
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext context) {
                crawlerService.crawler("20070401", "20150723");
                return RepeatStatus.FINISHED;
            }
        };
    }

    @Bean
    public Job job() throws Exception {
        return this.jobs.get("job").start(step1()).build();
    }

    @Bean
    protected Step step1() throws Exception {
        return this.steps.get("step1").tasklet(tasklet()).build();
    }
    
    public static void main(String[] args) {
        System.exit(SpringApplication.exit(SpringApplication.run(BatchApplication.class, args)));
    }
}

服務層

CrawlerService.java
package com.sam.service;

import java.time.LocalDate;
import java.util.concurrent.ThreadPoolExecutor;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.sam.app.ThreadWorker;
import com.sam.repository.NewsRepository;

@Service
public class CrawlerService {
    @Autowired
    private ThreadPoolExecutor threadPoolExecutor;
    @Autowired
    private NewsRepository repository;
    
    public void crawler(String start,String end){
        //LocalDate today = LocalDate.now();

        LocalDate startDate = LocalDate.of(Integer.parseInt(start.substring(0, 4)), Integer.parseInt(start.substring(4, 6)), Integer.parseInt(start.substring(6, 8)));
        LocalDate endDate = LocalDate.of(Integer.parseInt(end.substring(0, 4)), Integer.parseInt(end.substring(4, 6)), Integer.parseInt(end.substring(6, 8)));
        while(startDate.isBefore(endDate)){
            ThreadWorker threadWorker = new ThreadWorker();
            threadWorker.setDatestr(startDate.toString());
            threadWorker.setRepository(repository);
            threadPoolExecutor.execute(threadWorker);
            startDate = startDate.plusDays(1L);
        }
    }    
}

主要爬蟲程式

ThreadWorker.java
package com.sam.app;

import java.io.IOException;
import java.io.Serializable;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.chrono.ChronoLocalDate;
import java.time.chrono.Chronology;
import java.time.chrono.MinguoChronology;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.format.DecimalStyle;
import java.time.temporal.TemporalAccessor;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Random;

import lombok.Data;

import org.apache.commons.codec.digest.DigestUtils;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import org.springframework.beans.factory.annotation.Autowired;

import com.sam.entities.News;
import com.sam.entities.Stock;
import com.sam.repository.NewsRepository;

@Data
public class ThreadWorker implements Runnable, Serializable{

    private String datestr;

    private NewsRepository repository;

    @Override
    public void run() {
        System.out.println(datestr + " 的新聞 執行 取得 ");
        List<News> newslist = this.getNewsList(datestr);
        System.out.println(datestr + " 的新聞 共 " + newslist.size() + "筆");
        for(News news:newslist){
            this.getNewsContent(news);
            this.repository.save(news);
        }
    }


    private List<News> getNewsList(String date){
        News news = null;
        List<News> newslist = new ArrayList();
        try {
            Document doc = Jsoup.connect(String.format("http://fund.bot.com.tw/Z/ZF/ZF_H_%s.djhtm", date)).get();
            Elements resultLinks = doc.select("table :gt(0) > td > a");
            for(Element el:resultLinks){
                news = new News();
                news.setPostdate(new SimpleDateFormat("yyyy-MM-dd").parse(date));
                news.setLink(el.attr("href"));
                news.setLinktext(el.text());
                news.setCreatedDateTime(new Date());
                newslist.add(news);
            }
        } catch (Exception e) {
            System.err.println(date + " 的 新聞清單 取得 發生錯誤ex=" + e.getMessage());
            try {
                Thread.sleep(new Random().nextInt(9) * 1000 );
            } catch (InterruptedException ex) {
                // TODO Auto-generated catch block

                ex.printStackTrace();
            }
            newslist = this.getNewsList(date);
        }
        return newslist;
    }

    private void getNewsContent(News news){
        Document doc;
        try {
            doc = Jsoup.connect(String.format("http://fund.bot.com.tw/%s", news.getLink())).get();

            news.setId(DigestUtils.md5Hex(news.getLink()));

            //時間 (92/06/23 16:20:43)

            Element datetime = doc.select("table :gt(0) div.p01").first();
            news.setPostdate(this.toDate(datetime.text().trim()));
            //System.out.println(datetime.text().trim());

            //內容

            Element content = doc.select("table :gt(0) td.p1").first();
            String contenttext = content.text().replaceAll(" ", "");
            news.setContent(contenttext.trim());

            //•相關個股:  1736喬山 •相關產業:  生物科技

            List relatedStocks = new ArrayList();
            List relatedIndustries = new ArrayList();
            Elements relateds = doc.select("table :gt(0) td.p2");
            for(Element el:relateds){
                String text = el.text().replaceAll(" ", "");
                if(text.startsWith("•相關個股")){
                    String[] relatedsstocktext = text.split(":")[1].split(",");
                    for(String stocktext:relatedsstocktext){
                        Stock stock = new Stock();
                        stock.setStockid(stocktext.substring(0, 4));
                        stock.setStockname(stocktext.substring(4));
                        relatedStocks.add(stock);
                    }
                }else{
                    //•相關產業

                    relatedIndustries.add(text.split(":")[1]);
                }
            }
            news.setRelatedStocks(relatedStocks);
            news.setRelatedIndustries(relatedIndustries);
        } catch (Exception e) {
            System.err.println(new SimpleDateFormat("yyyy-MM-dd").format(news.getPostdate()) + " 的 新聞內容 取得 發生錯誤ex=" + e.getMessage() + ",link="+news.getLink());
            try {
                Thread.sleep(new Random().nextInt(9) * 1000 );
            } catch (InterruptedException ex) {
                // TODO Auto-generated catch block

                ex.printStackTrace();
            }
            this.getNewsContent(news);
        }
    }

    private Date toDate(String input){
        Date date = null;
        try {
            Locale locale = Locale.getDefault(Locale.Category.FORMAT);
            Chronology chrono = MinguoChronology.INSTANCE;
            DateTimeFormatter df = new DateTimeFormatterBuilder().parseLenient()
                    .appendPattern("(yyy/MM/dd HH:mm:ss)").toFormatter().withChronology(chrono)
                    .withDecimalStyle(DecimalStyle.of(locale));
            TemporalAccessor temporal = df.parse(input);
            ChronoLocalDate cDate = chrono.date(temporal);

            date =  new SimpleDateFormat("yyyy-MM-dd").parse(LocalDate.from(cDate).toString());
        } catch (ParseException e) {
            // TODO Auto-generated catch block

            e.printStackTrace();
        }
        return date;
    }
}

線池監控元件

ThreadPoolMonitor.java
package com.sam.app;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * 連線池監控元件
 * @author samzhu
 *
 */
public class ThreadPoolMonitor implements Runnable{
    private ThreadPoolExecutor executor;

    private int seconds;

    private boolean run = true;

    public ThreadPoolMonitor(ThreadPoolExecutor executor, int delay) {
        this.executor = executor;
        this.seconds = delay;
    }

    public void shutdown() {
        this.run = false;
    }

    public void run() {
        while (run) {
            System.out
            .println(String
                    .format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s",
                            this.executor.getPoolSize(), this.executor.getCorePoolSize(),
                            this.executor.getActiveCount(), this.executor.getCompletedTaskCount(),
                            this.executor.getTaskCount(), this.executor.isShutdown(),
                            this.executor.isTerminated()));
            try {
                Thread.sleep(seconds * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

參考網址
Spring Batch 3.0うごかす

← 使用Spring-Boot整合Spring-Data操作ElasticSearch 讀取CSV檔的方法 →
 
comments powered by Disqus