over 7 years ago
基於前一篇因為在啟動程序中直接開始爬蟲會有執行上有時異常,主程序就跳掉的問題,所以就調整一下順便用SpringData看看,並改成多執行緒
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath('org.springframework.boot:spring-boot-gradle-plugin:1.2.5.RELEASE')
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'spring-boot'
sourceCompatibility = 1.8
targetCompatibility = 1.8
version = '1.0'
repositories {
mavenCentral()
}
dependencies {
compile 'org.springframework.boot:spring-boot-starter-data-elasticsearch:1.2.5.RELEASE',
'org.springframework.boot:spring-boot-starter-batch:1.2.5.RELEASE',
'org.springframework.boot:spring-boot-starter-test:1.2.5.RELEASE'
compile 'commons-codec:commons-codec:1.10'
compile 'org.projectlombok:lombok:1.16.4'
compile 'org.jsoup:jsoup:1.8.2'
}
設定檔
spring.data.elasticsearch.cluster-nodes: 10.1.99.175:9300
spring.data.elasticsearch.repositories.enabled: true
taskthread.pool.corePoolSize: 10
taskthread.pool.maxPoolSize: 20
taskthread.pool.queueCapacity: 50
taskthread.pool.keepAliveSeconds: 60
taskthread.poolmonitor.keepMonitoringSeconds: 10
主程式
package com.sam.app;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
import com.sam.service.CrawlerService;
@SpringBootApplication
@EnableBatchProcessing
@EnableElasticsearchRepositories(basePackages = {"com.sam.repository"})
@ComponentScan(basePackages = "com.sam")
public class BatchApplication {
//線程池參數
@Value("${taskthread.pool.corePoolSize}")
private int corePoolSize;
@Value("${taskthread.pool.maxPoolSize}")
private int maxPoolSize;
@Value("${taskthread.pool.queueCapacity}")
private int queueCapacity;
@Value("${taskthread.pool.keepAliveSeconds}")
private long keepAliveSeconds;
@Value("${taskthread.poolmonitor.keepMonitoringSeconds}")
private int keepMonitoringSeconds;
@Bean
public ThreadPoolExecutor threadPoolExecutor(){
//初始化連線池
ThreadPoolExecutor threadpool = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveSeconds,
TimeUnit.SECONDS,
new ArrayBlockingQueue(queueCapacity),
new ThreadPoolExecutor.CallerRunsPolicy());
System.out.println("連線池創建成功");
//啟動連線池監控器
ThreadPoolMonitor monitor = new ThreadPoolMonitor(threadpool, keepMonitoringSeconds);
Thread monitorThread = new Thread(monitor);
monitorThread.start();
return threadpool;
}
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Autowired
private CrawlerService crawlerService;
@Bean
protected Tasklet tasklet() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext context) {
crawlerService.crawler("20070401", "20150723");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Job job() throws Exception {
return this.jobs.get("job").start(step1()).build();
}
@Bean
protected Step step1() throws Exception {
return this.steps.get("step1").tasklet(tasklet()).build();
}
public static void main(String[] args) {
System.exit(SpringApplication.exit(SpringApplication.run(BatchApplication.class, args)));
}
}
服務層
package com.sam.service;
import java.time.LocalDate;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.sam.app.ThreadWorker;
import com.sam.repository.NewsRepository;
@Service
public class CrawlerService {
@Autowired
private ThreadPoolExecutor threadPoolExecutor;
@Autowired
private NewsRepository repository;
public void crawler(String start,String end){
//LocalDate today = LocalDate.now();
LocalDate startDate = LocalDate.of(Integer.parseInt(start.substring(0, 4)), Integer.parseInt(start.substring(4, 6)), Integer.parseInt(start.substring(6, 8)));
LocalDate endDate = LocalDate.of(Integer.parseInt(end.substring(0, 4)), Integer.parseInt(end.substring(4, 6)), Integer.parseInt(end.substring(6, 8)));
while(startDate.isBefore(endDate)){
ThreadWorker threadWorker = new ThreadWorker();
threadWorker.setDatestr(startDate.toString());
threadWorker.setRepository(repository);
threadPoolExecutor.execute(threadWorker);
startDate = startDate.plusDays(1L);
}
}
}
主要爬蟲程式
package com.sam.app;
import java.io.IOException;
import java.io.Serializable;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.chrono.ChronoLocalDate;
import java.time.chrono.Chronology;
import java.time.chrono.MinguoChronology;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.format.DecimalStyle;
import java.time.temporal.TemporalAccessor;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Random;
import lombok.Data;
import org.apache.commons.codec.digest.DigestUtils;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import org.springframework.beans.factory.annotation.Autowired;
import com.sam.entities.News;
import com.sam.entities.Stock;
import com.sam.repository.NewsRepository;
@Data
public class ThreadWorker implements Runnable, Serializable{
private String datestr;
private NewsRepository repository;
@Override
public void run() {
System.out.println(datestr + " 的新聞 執行 取得 ");
List<News> newslist = this.getNewsList(datestr);
System.out.println(datestr + " 的新聞 共 " + newslist.size() + "筆");
for(News news:newslist){
this.getNewsContent(news);
this.repository.save(news);
}
}
private List<News> getNewsList(String date){
News news = null;
List<News> newslist = new ArrayList();
try {
Document doc = Jsoup.connect(String.format("http://fund.bot.com.tw/Z/ZF/ZF_H_%s.djhtm", date)).get();
Elements resultLinks = doc.select("table :gt(0) > td > a");
for(Element el:resultLinks){
news = new News();
news.setPostdate(new SimpleDateFormat("yyyy-MM-dd").parse(date));
news.setLink(el.attr("href"));
news.setLinktext(el.text());
news.setCreatedDateTime(new Date());
newslist.add(news);
}
} catch (Exception e) {
System.err.println(date + " 的 新聞清單 取得 發生錯誤ex=" + e.getMessage());
try {
Thread.sleep(new Random().nextInt(9) * 1000 );
} catch (InterruptedException ex) {
// TODO Auto-generated catch block
ex.printStackTrace();
}
newslist = this.getNewsList(date);
}
return newslist;
}
private void getNewsContent(News news){
Document doc;
try {
doc = Jsoup.connect(String.format("http://fund.bot.com.tw/%s", news.getLink())).get();
news.setId(DigestUtils.md5Hex(news.getLink()));
//時間 (92/06/23 16:20:43)
Element datetime = doc.select("table :gt(0) div.p01").first();
news.setPostdate(this.toDate(datetime.text().trim()));
//System.out.println(datetime.text().trim());
//內容
Element content = doc.select("table :gt(0) td.p1").first();
String contenttext = content.text().replaceAll(" ", "");
news.setContent(contenttext.trim());
//•相關個股: 1736喬山 •相關產業: 生物科技
List relatedStocks = new ArrayList();
List relatedIndustries = new ArrayList();
Elements relateds = doc.select("table :gt(0) td.p2");
for(Element el:relateds){
String text = el.text().replaceAll(" ", "");
if(text.startsWith("•相關個股")){
String[] relatedsstocktext = text.split(":")[1].split(",");
for(String stocktext:relatedsstocktext){
Stock stock = new Stock();
stock.setStockid(stocktext.substring(0, 4));
stock.setStockname(stocktext.substring(4));
relatedStocks.add(stock);
}
}else{
//•相關產業
relatedIndustries.add(text.split(":")[1]);
}
}
news.setRelatedStocks(relatedStocks);
news.setRelatedIndustries(relatedIndustries);
} catch (Exception e) {
System.err.println(new SimpleDateFormat("yyyy-MM-dd").format(news.getPostdate()) + " 的 新聞內容 取得 發生錯誤ex=" + e.getMessage() + ",link="+news.getLink());
try {
Thread.sleep(new Random().nextInt(9) * 1000 );
} catch (InterruptedException ex) {
// TODO Auto-generated catch block
ex.printStackTrace();
}
this.getNewsContent(news);
}
}
private Date toDate(String input){
Date date = null;
try {
Locale locale = Locale.getDefault(Locale.Category.FORMAT);
Chronology chrono = MinguoChronology.INSTANCE;
DateTimeFormatter df = new DateTimeFormatterBuilder().parseLenient()
.appendPattern("(yyy/MM/dd HH:mm:ss)").toFormatter().withChronology(chrono)
.withDecimalStyle(DecimalStyle.of(locale));
TemporalAccessor temporal = df.parse(input);
ChronoLocalDate cDate = chrono.date(temporal);
date = new SimpleDateFormat("yyyy-MM-dd").parse(LocalDate.from(cDate).toString());
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return date;
}
}
線池監控元件
package com.sam.app;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 連線池監控元件
* @author samzhu
*
*/
public class ThreadPoolMonitor implements Runnable{
private ThreadPoolExecutor executor;
private int seconds;
private boolean run = true;
public ThreadPoolMonitor(ThreadPoolExecutor executor, int delay) {
this.executor = executor;
this.seconds = delay;
}
public void shutdown() {
this.run = false;
}
public void run() {
while (run) {
System.out
.println(String
.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s",
this.executor.getPoolSize(), this.executor.getCorePoolSize(),
this.executor.getActiveCount(), this.executor.getCompletedTaskCount(),
this.executor.getTaskCount(), this.executor.isShutdown(),
this.executor.isTerminated()));
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
參考網址
Spring Batch 3.0うごかす