爬虫中 阻塞队列和 线程池 造成的类死锁问题

2018-11-18  本文已影响0人  shu2man

很早就想尝试一下爬虫,相关的博文已经很多,这里记下几个困扰了我挺久的问题。
既然说的是死锁,我们来复习一下死锁的四个条件:

言归正传,首先在我的实现中自定义了线程池

    public ThreadPoolExecutor getFixedThreadPool(int corePoolSize,int maxPoolSize,int waitingQueuesize) {
        return new ThreadPoolExecutor(corePoolSize, maxPoolSize, 60L, TimeUnit.SECONDS, 
                new ArrayBlockingQueue<Runnable>(waitingQueuesize),new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable run, ThreadPoolExecutor executor) {
                        if(!executor.isTerminated())
                            try {
                                executor.getQueue().put(run);
                            } catch (Exception e) {
                                LOG.error("Inner Exception in workQueue, putting task  error",e);
                            }
                    }
                });
        
        //return Executors.newFixedThreadPool(2*nThread);
    }

这是一个类newFixedThreadPool的操作,自定义了等待队列的大小,同时队列满时阻塞了入队操作,避免Int.maxValue造成的溢出或是过多的任务堆积,兼具fixedThreadPool和cachedThreadPool的优点,可这里阻塞的拒绝策略让我在后续的实现中饱受折磨...

然后为了避免任务都被阻塞在线程池,我又额外开了一个阻塞队列存储爬出来的待爬URL,然后用一个监控线程监控这个队列,爬虫任务相当于生产者生产待爬URL,监控线程相当于消费者消费产生的URL,而这个队列就是仓库了,完美的设计。。。

public class WatchTask implements Runnable{
        @Override
        public void run() {
            while(isCrawl) {
                try{
                    String url=urlQueue.poll(1000, TimeUnit.MILLISECONDS);
                    while(StringUtils.isNotBlank(url) && isCrawled(url)) url=urlQueue.poll(100, TimeUnit.MILLISECONDS);
                    if(StringUtils.isNotBlank(url)) executor.execute(new WorkTask(url));
                }catch(Exception e) {
                    LOG.error("Taking url from blocking queue error, urlQueue size:"+urlQueue.size(),e);
                }
                lastCur=System.currentTimeMillis();
                LOG.info("WatchTask running, urlQueue:"+urlQueue.size());
            }
        }
        
    }
    
    public class WorkTask implements Runnable{
        private String seedUrl=null;
        
        public WorkTask(String seedUrl) {
            this.seedUrl=seedUrl;
        }
        
        @Override
        public void run() {
            List<String> urls;
            try {
                urls=crawler.doCrawl(seedUrl);
                if(urls==null || urls.size()==0) return;
                for(String url:urls) {
                    if(StringUtils.isNotBlank(url) && !isCrawled(url)) {
                        urlQueue.put(url);
                    }
                }
            }catch(Exception e) {
                LOG.error("Puting url to blocking queue error, size:"+urlQueue.size(),e);
            }
        }
    }

在程序中URL队列的大小要远大于线程池等待队列,明眼的朋友到这里应该看出我的操作问题在哪里了:


死锁示意图

于是,将额外的阻塞队列和监控任务去掉,工作线程改成这样,颇有种自给自足的感觉:

    
    public class WorkTask implements Runnable{
        private String seedUrl=null;
        
        public WorkTask(String seedUrl) {
            this.seedUrl=seedUrl;
        }
        
        @Override
        public void run() {
            List<String> urls;
            try {
                urls=crawler.doCrawl(seedUrl);
                if(urls==null || urls.size()==0) return;
                for(String url:urls) {
                    if(StringUtils.isNotBlank(url) && !isCrawled(url)) {
                        executor.execute(new WorkTask(url));
                    }
                }
            }catch(Exception e) {
                LOG.error("Puting url to blocking queue error, size:"+urlQueue.size(),e);
            }
        }
    }

然而,实际运行中发现能爬取的数据条数在线程池最大线程数左右,往后程序就像挂掉一样虽然在跑但什么输出都没有,肯定又是阻塞了!
经过一番分析,发现问题回到了线程池本身的等待队列,圆圈代表线程池,黑点表示线程非空闲:


线程池死锁示意图

就这样,又一个死锁创造出来了,其原因归根到底还是一个种子url能爬取出来的子URL太多了——几百甚至几千上万个(没错我在爬某网用户信息,子url是用户的粉丝或其关注的人,因为一些需求不能进行部分舍弃),既然如此那就把等待队列设至大一点,对子url太多的,全部舍弃,至于何为多大家自有判断,我用子url数和等待队列大小关系来决定,当等待队列中url数量超过等待队列容量的一半,或子url数量超过队列数量一半退出:

    public class WorkTask implements Runnable{
        private String seedUrl=null;
        
        public WorkTask(String seedUrl) {
            this.seedUrl=seedUrl;
        }
        
        @Override
        public void run() {
            List<String> urls;
            try {
                urls=crawler.doCrawl(seedUrl);
                int size=tpe.getQueue().size();
                //无子url,或队列中任务数量超过容量一半,或url数量超过队列数量一半,避免崩掉故退出
                if(urls==null || urls.size()==0 || size>halfQueueSize || urls.size()>halfQueueSize) return;
                System.out.println("****************add to queue with size"+urls.size());
                for(String url:urls) {
                    if(StringUtils.isNotBlank(url) && !isCrawled(url)) {
                        executor.execute(new WorkTask(url));
                    }
                }
            }catch(Exception e) {
                LOG.error("Puting url to blocking queue error, size:"+urlQueue.size(),e);
            }
        }
    }

至此,终于把死锁的问题解决了,但是爬虫跑了一会ip就被封了,下一步是使用代理。

本文为本人解决实际问题的记录,有任何高见欢迎留言。

上一篇下一篇

猜你喜欢

热点阅读