开源爬虫框架crawler4j源码学习

2020-04-03  本文已影响0人  浪里_个郎

带着问题看源码:
1,crawler4j使用的HTTP请求工具是什么
2,crawler4j如何实现设置爬虫种子页面后,不断自动深入爬取
3,crawler4j如何实现中断后可恢复爬取
4,多线程爬虫的实现

1,crawler4j使用的HTTP请求工具是什么

使用的是Httpclient:

    compile group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.7'

2,如何实现设置爬虫种子页面后,不断自动深入爬取

首先,设置种子页面:

CrawlController controller = new CrawlController(config, pageFetcher, robotstxtServer);
controller.addSeed("http://r.cnki.net/kns/brief/result.aspx?dbprefix=gwkt");

种子页面url被封装到WebURL类,然后传给负责管理待爬取页面的Frontier类,Frontier会将种子页面存入数据库:

    //CrawlController将种子页面交给Frontier
    public void schedule(WebURL url) {
        int maxPagesToFetch = config.getMaxPagesToFetch();
        synchronized (mutex) {
            try {
                if (maxPagesToFetch < 0 || scheduledPages < maxPagesToFetch) {
                    workQueues.put(url);
                    scheduledPages++;
                    counters.increment(Counters.ReservedCounterNames.SCHEDULED_PAGES);
                }
            } catch (DatabaseException e) {
                logger.error("Error while putting the url in the work queue", e);
            }
        }
    }

    //WorkQueues的put函数
    public void put(WebURL url) {
        DatabaseEntry value = new DatabaseEntry();
        webURLBinding.objectToEntry(url, value);
        Transaction txn = beginTransaction();
        urlsDB.put(txn, getDatabaseEntryKey(url), value);
        commit(txn);
    }

我们使用crawler4j需要自定义一个继承自WebCrawler的类,而WebCrawler实现了Runnable:

public class WebCrawler implements Runnable {

使用crawler4j,在代码中配置完各个参数后,需要执行:

controller.start(MyCrawler.class, numberOfCrawlers);

之后,CrawlController会让我们实现的类在新线程中跑起来,而父类WebCrawler中的run方法中,Frontier会不断从数据库中取出需要解析的url,并在processPage方法中进行解析和处理:

    @Override
    public void run() {
        onStart();
        while (true) {
            List<WebURL> assignedURLs = new ArrayList<>(50);
            isWaitingForNewURLs = true;
            //从数据库中取出url
            frontier.getNextURLs(50, assignedURLs);
            isWaitingForNewURLs = false;
            if (assignedURLs.isEmpty()) {
                if (frontier.isFinished()) {
                    return;
                }
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    logger.error("Error occurred", e);
                }
            } else {
                for (WebURL curURL : assignedURLs) {
                    if (myController.isShuttingDown()) {
                        logger.info("Exiting because of controller shutdown.");
                        return;
                    }
                    if (curURL != null) {
                        curURL = handleUrlBeforeProcess(curURL);
                        //解析和处理url
                        processPage(curURL);
                        frontier.setProcessed(curURL);
                    }
                }
            }
        }
    }

processPage方法解析页面时,获取页面中所有可点击的链接,该链接如果通过了可覆盖的shouldVisit方法检验,即属于需要爬取的链接,则将其放入数据库:

//调用Parser类解析url的HTML,会解析出页面中所有的链接
parser.parse(page, curURL.getURL());
//判断是否爬取页面中的链接,默认直接返回true
if (shouldFollowLinksIn(page.getWebURL())) {
    ParseData parseData = page.getParseData();
    List<WebURL> toSchedule = new ArrayList<>();
    int maxCrawlDepth = myController.getConfig().getMaxDepthOfCrawling();
    //获取页面中所有的链接,遍历进行处理
    for (WebURL webURL : parseData.getOutgoingUrls()) {
        webURL.setParentDocid(curURL.getDocid());
        webURL.setParentUrl(curURL.getURL());
        int newdocid = docIdServer.getDocId(webURL.getURL());
        if (newdocid > 0) {
            // This is not the first time that this Url is visited. So, we set the
            // depth to a negative number.
            webURL.setDepth((short) -1);
            webURL.setDocid(newdocid);
        } else {
            webURL.setDocid(-1);
            webURL.setDepth((short) (curURL.getDepth() + 1));
            if ((maxCrawlDepth == -1) || (curURL.getDepth() < maxCrawlDepth)) {
                //判断链接是否符合爬取条件
                if (shouldVisit(page, webURL)) {
                    if (robotstxtServer.allows(webURL)) {
                        webURL.setDocid(docIdServer.getNewDocID(webURL.getURL()));
                        toSchedule.add(webURL);
                    } else {
                        logger.debug(
                            "Not visiting: {} as per the server's \"robots.txt\" " +
                            "policy", webURL.getURL());
                    }
                } else {
                    logger.debug(
                        "Not visiting: {} as per your \"shouldVisit\" policy",
                        webURL.getURL());
                }
            }
        }
    }
    //将页面中的链接放入待爬取数据库
    frontier.scheduleAll(toSchedule);

parseData.setOutgoingUrls方法中,使用了开源URL检测器项目https://github.com/linkedin/URL-Detector/,具体使用代码如下:

            //input就是HTML字符串
            UrlDetector detector = new UrlDetector(input, getOptions());
            //获得html中所有的链接
            List<Url> urls = detector.detect();

3,crawler4j如何实现中断后可恢复爬取

Frontier中定义了专门用于存放当前正在爬取的一批链接的数据库InProcessPagesDB。
每次这一批链接准备进行爬取,就放入InProcessPagesDB:

    public void getNextURLs(int max, List<WebURL> result) {
        while (true) {
            synchronized (mutex) {
                if (isFinished) {
                    return;
                }
                try {
                    List<WebURL> curResults = workQueues.get(max);
                    workQueues.delete(curResults.size());
                    if (inProcessPages != null) {
                        for (WebURL curPage : curResults) {
                            inProcessPages.put(curPage);
                        }
                    }
                    result.addAll(curResults);

当这一批链接全部爬取结束,就将InProcessPagesDB中的数据清除:

    public void setProcessed(WebURL webURL) {
        counters.increment(Counters.ReservedCounterNames.PROCESSED_PAGES);
        if (inProcessPages != null) {
            if (!inProcessPages.removeURL(webURL)) {
                logger.warn("Could not remove: {} from list of processed pages.", webURL.getURL());
            }
        }
    }

当一次爬取中断后再次执行,Frontier的构造函数中就会从InProcessPagesDB中取上次没爬完的链接继续爬取:

    public Frontier(Environment env, CrawlConfig config) {
        this.config = config;
        this.counters = new Counters(env, config);
        try {
            workQueues = new WorkQueues(env, DATABASE_NAME, config.isResumableCrawling());
            //判断是否设置了中断继续
            if (config.isResumableCrawling()) {
                scheduledPages = counters.getValue(Counters.ReservedCounterNames.SCHEDULED_PAGES);
                //存储上一次未爬取页面信息的数据库
                inProcessPages = new InProcessPagesDB(env);
                long numPreviouslyInProcessPages = inProcessPages.getLength();
                if (numPreviouslyInProcessPages > 0) {
                    logger.info("Rescheduling {} URLs from previous crawl.",
                                numPreviouslyInProcessPages);
                    scheduledPages -= numPreviouslyInProcessPages;
                    //从数据库中获取所有未爬取的url
                    List<WebURL> urls = inProcessPages.get(IN_PROCESS_RESCHEDULE_BATCH_SIZE);
                    while (!urls.isEmpty()) {
                        //将urls放入这次待爬取的数据库
                        scheduleAll(urls);
                        inProcessPages.delete(urls.size());
                        urls = inProcessPages.get(IN_PROCESS_RESCHEDULE_BATCH_SIZE);
                    }
                }

不过,如果在一批链接爬取结束后InProcessPagesDB刚把数据清除的时候爬取被中断,另一批链接此时还放入InProcessPagesDB,那么就无法从中断处继续爬取了。有兴趣的小伙伴可以确认一下是否如我所说。

4,多线程爬虫的实现

根据设置的线程数,CrawlController会启对应个线程:

            for (int i = 1; i <= numberOfCrawlers; i++) {
                T crawler = crawlerFactory.newInstance();
                Thread thread = new Thread(crawler, "Crawler " + i);
                crawler.setThread(thread);
                crawler.init(i, this);
                thread.start();
                crawlers.add(crawler);
                threads.add(thread);
                logger.info("Crawler {} started", i);
            }

那么,Crawler4j为多线程同步做了哪些工作呢?答案是...几乎没有。因为没有必要,唯一需要并发处理的数据是需要爬取的页面url,但这部分数据的多线程同步已经交由数据库来做了,所以,各自线程爬属于自己的页面就可以了,互不冲突。

上一篇下一篇

猜你喜欢

热点阅读