爬虫中 阻塞队列和 线程池 造成的类死锁问题
很早就想尝试一下爬虫,相关的博文已经很多,这里记下几个困扰了我挺久的问题。
既然说的是死锁,我们来复习一下死锁的四个条件:
- 循环等待
- 占有且请求(请求与持有)
- 互斥(资源有限,每次只能被一个或一类线程使用)
- 不可抢占(不可剥夺,无优先级)
四个条件中不可被破坏的是互斥条件,即多进程同时访问会有数据的不一致性。
言归正传,首先在我的实现中自定义了线程池:
public ThreadPoolExecutor getFixedThreadPool(int corePoolSize,int maxPoolSize,int waitingQueuesize) {
return new ThreadPoolExecutor(corePoolSize, maxPoolSize, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(waitingQueuesize),new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable run, ThreadPoolExecutor executor) {
if(!executor.isTerminated())
try {
executor.getQueue().put(run);
} catch (Exception e) {
LOG.error("Inner Exception in workQueue, putting task error",e);
}
}
});
//return Executors.newFixedThreadPool(2*nThread);
}
这是一个类newFixedThreadPool的操作,自定义了等待队列的大小,同时队列满时阻塞了入队操作,避免Int.maxValue造成的溢出或是过多的任务堆积,兼具fixedThreadPool和cachedThreadPool的优点,可这里阻塞的拒绝策略让我在后续的实现中饱受折磨...
然后为了避免任务都被阻塞在线程池,我又额外开了一个阻塞队列存储爬出来的待爬URL,然后用一个监控线程监控这个队列,爬虫任务相当于生产者生产待爬URL,监控线程相当于消费者消费产生的URL,而这个队列就是仓库了,完美的设计。。。
public class WatchTask implements Runnable{
@Override
public void run() {
while(isCrawl) {
try{
String url=urlQueue.poll(1000, TimeUnit.MILLISECONDS);
while(StringUtils.isNotBlank(url) && isCrawled(url)) url=urlQueue.poll(100, TimeUnit.MILLISECONDS);
if(StringUtils.isNotBlank(url)) executor.execute(new WorkTask(url));
}catch(Exception e) {
LOG.error("Taking url from blocking queue error, urlQueue size:"+urlQueue.size(),e);
}
lastCur=System.currentTimeMillis();
LOG.info("WatchTask running, urlQueue:"+urlQueue.size());
}
}
}
public class WorkTask implements Runnable{
private String seedUrl=null;
public WorkTask(String seedUrl) {
this.seedUrl=seedUrl;
}
@Override
public void run() {
List<String> urls;
try {
urls=crawler.doCrawl(seedUrl);
if(urls==null || urls.size()==0) return;
for(String url:urls) {
if(StringUtils.isNotBlank(url) && !isCrawled(url)) {
urlQueue.put(url);
}
}
}catch(Exception e) {
LOG.error("Puting url to blocking queue error, size:"+urlQueue.size(),e);
}
}
}
在程序中URL队列的大小要远大于线程池等待队列,明眼的朋友到这里应该看出我的操作问题在哪里了:
死锁示意图
于是,将额外的阻塞队列和监控任务去掉,工作线程改成这样,颇有种自给自足的感觉:
public class WorkTask implements Runnable{
private String seedUrl=null;
public WorkTask(String seedUrl) {
this.seedUrl=seedUrl;
}
@Override
public void run() {
List<String> urls;
try {
urls=crawler.doCrawl(seedUrl);
if(urls==null || urls.size()==0) return;
for(String url:urls) {
if(StringUtils.isNotBlank(url) && !isCrawled(url)) {
executor.execute(new WorkTask(url));
}
}
}catch(Exception e) {
LOG.error("Puting url to blocking queue error, size:"+urlQueue.size(),e);
}
}
}
然而,实际运行中发现能爬取的数据条数在线程池最大线程数左右,往后程序就像挂掉一样虽然在跑但什么输出都没有,肯定又是阻塞了!
经过一番分析,发现问题回到了线程池本身的等待队列,圆圈代表线程池,黑点表示线程非空闲:
线程池死锁示意图
就这样,又一个死锁创造出来了,其原因归根到底还是一个种子url能爬取出来的子URL太多了——几百甚至几千上万个(没错我在爬某网用户信息,子url是用户的粉丝或其关注的人,因为一些需求不能进行部分舍弃),既然如此那就把等待队列设至大一点,对子url太多的,全部舍弃,至于何为多大家自有判断,我用子url数和等待队列大小关系来决定,当等待队列中url数量超过等待队列容量的一半,或子url数量超过队列数量一半退出:
public class WorkTask implements Runnable{
private String seedUrl=null;
public WorkTask(String seedUrl) {
this.seedUrl=seedUrl;
}
@Override
public void run() {
List<String> urls;
try {
urls=crawler.doCrawl(seedUrl);
int size=tpe.getQueue().size();
//无子url,或队列中任务数量超过容量一半,或url数量超过队列数量一半,避免崩掉故退出
if(urls==null || urls.size()==0 || size>halfQueueSize || urls.size()>halfQueueSize) return;
System.out.println("****************add to queue with size"+urls.size());
for(String url:urls) {
if(StringUtils.isNotBlank(url) && !isCrawled(url)) {
executor.execute(new WorkTask(url));
}
}
}catch(Exception e) {
LOG.error("Puting url to blocking queue error, size:"+urlQueue.size(),e);
}
}
}
至此,终于把死锁的问题解决了,但是爬虫跑了一会ip就被封了,下一步是使用代理。
本文为本人解决实际问题的记录,有任何高见欢迎留言。