java爬虫多线程redis队列(爬取国美网站的商品信息)
前面那篇爬虫文章用的是单线程没有用到其它一些比较提高效率的工具比较遗憾,所以今天做了一个比较全面的爬虫。首先谢谢 @天不生我万古长这位小伙伴的留言,不然还真有点懒了。因为上班所以也只能利用周末的时间来写了。其实这次构思了很久。本来是想爬淘宝的商品信息,但是遇到了一个坑就是ssl的证书验证,这里纠结了半天终于绕过去了。但是由于淘宝的限制比较严,ip直接被限制访问了。我也很无语,如果同样有小伙伴遇到了https请求的证书验证通过不了,建议去看一下这一篇博客,感觉写的不错。http://blog.csdn.net/u014256984/article/details/73330573 这里主要讲的就是通过java代码获取证书文件,然后将证书文件放入到jdk下面,具体我就不细说了。说一说今天的重点。
首先说一下我的目标页面。国美的搜索页 国美搜索页面.png 搜索列表页.png技术点
httpClient Jsoup 这些都是爬虫最基本的,就不老生常谈了。这里我说一说用的新的技术点,以及新的技术点遇到了哪些坑。
-
redis 以及redis的队列应用
这里用redis主要的作用就是保存需要解析的url 以及已经解析过的url两个队列。这里我遇到最多的问题,就是用多线程执行的时候出现redis链接重置的问题。网上查了一下也没有一个统一的答案,我也只是根据控制台输出的错误信息感觉可能是在多线程执行的时候,redis创建了多次连接。为什么会创建多次连接就会出现重置的问题。我的猜测就是因为redis本省是不支持windows的,只是微软在打了补丁的情况下才支持。这可能有一点影响。这方面我也没有去深究。我的解决方案就是创建一个redis的单例模式。 -
mongodb
首先说一下为什么要用mongodb- mongodb是非关系型数据库。
- mongodb相对于关系型数据库他的效率要高很多很多。
- mongodb存储数据理论上是没有上限的,当然这是理论。
- mongodb4.5以后是天生自带连接池的。
-
线程池
在处理多线程的问题的时候,如果创建一个线程池管理线程。其实这里的效果是非常好的。但是好是好用,坑却特别多,一定要注意对于有些数据进行操作的时候要进行枷锁的操作,为了保证数据的准确性。
说了这么多也感觉有点词穷了,还是上代码。
- redis的队列创建
package com.xdl.redisUtil;
import redis.clients.jedis.Jedis;
/**
*
* @ClassName: redisqueue
* redis队列
* @author liangchu
* @date 2018-1-6 上午11:52:44
*
*/
public class RedisQueue {
// 这是单例
private static Jedis jedis = RedisSingleton.getJedisInstance();
/*public RedisQueue(){
//连接本地的 Redis 服务
jedis = RedisSingleton.getJedisInstance();
}*/
//将未访问的url加入到toVisit表中(使用的是尾插法)
public static void addToVisit(String url) {
jedis.rpush("toVisit", url);
}
//将未访问的url弹出进行解析
public static String getToVisit() {
return jedis.lpop("toVisit");
}
//将已经解析过的url添加到已访问队列中
public static void addVisited(String url) {
jedis.rpush("visited", url);
}
//判断待访问url队列是否为空
public static boolean toVisitIsEmpty() {
Long length = jedis.llen("toVisit");
if (length == 0) {
return true;
} else {
return false;
}
}
}
package com.xdl.redisUtil;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import com.mongodb.MongoClient;
public class MultithreadCrawler {
/**
* @throws Exception
* @throws InterruptedException
* @Title: main
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param @param args 参数
* @return void 返回类型
* @author liangchu
* @date 2018-1-6 下午12:19:53
* @throws
*/
public static void main(String[] args) throws Exception {
//拿到种子链接 这里主要从这几个方面抓取数据
List<String> strings = new ArrayList<String>();
strings.add("手机");
strings.add("男装");
strings.add("女装");
strings.add("电脑");
strings.add("相机");
strings.add("食品");
//将种子链接写进redis数据库的待抓取列表
for (String url : strings) {
RedisQueue.addToVisit("http://search.gome.com.cn/search?question="+url+"&searchType=goods&page=1");
}
//创建一个收集线程的列表
List<Thread> threadList = new ArrayList<Thread>();
//创建线程的个数
int threadNum = 5;
// mongodb连接
MongoClient mongo = new MongoClient("127.0.0.1", 27017);
RunThread run = new RunThread();
run.setThreads(threadNum,mongo);
//创建5个线程,并对其进行收集
for (int i = 0; i < threadNum; i++) {
Thread thread = new Thread(run);
thread.start();
threadList.add(thread);
}
//main线程需要等待所有子线程退出
while (threadList.size() > 0) {
Thread child = threadList.remove(0);
child.join();
}
}
}
- run函数
package com.xdl.redisUtil;
import java.util.ArrayList;
import java.util.List;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
import com.mongodb.MongoClient;
public class RunThread extends Thread {
MongoClient mongo = null;
//线程计数器需要对所有线程可见,是共享变量
int threads = 0;
//redis队列的对象,也是所有对象共享的变量
//创建线程锁
private static Object lock = new Object();
public void setThreads(int threads,MongoClient mongo) {
this.threads = threads;
this.mongo = mongo;
}
@SuppressWarnings("deprecation")
public void parseToVisitUrltoRedis() throws Exception {
//用来保存新提取出来的url列表(此变量不应是共享变量,我们把它变为每个线程的私有变量)
//我们应该知道的是在Java中哪些变量在线程之间是不共享的,参考资料:
List<String> urlList = new ArrayList<String>();
boolean flag = true;
while (flag) {
//从爬虫队列中取出待抓取的url
if (!RedisQueue.toVisitIsEmpty()) {
String url = RedisQueue.getToVisit();
/**
* 对此url进行解析,提取出新的url列表
* 解析出来的url顺便就写进urlList中了
*
* 在这个过程中不要求保证同步,每个线程都负责解析自己所属的url,解析完成
* 之后将url写入自己的urlList之中,当在解析过程中发生阻塞,则切换到其他
* 线程,保证程序的高并发性。
*/
// 创建httpclient实例
CloseableHttpClient httpClient = HttpClients.createDefault();
// 创建httpget实例
HttpGet httpGet = new HttpGet(url);
// 执行http get 请求
CloseableHttpResponse response = null;
response = httpClient.execute(httpGet);
HttpEntity entity = response.getEntity();// 获取返回实体
// EntityUtils.toString(entity,"utf-8");//获取网页内容,指定编码
String html = EntityUtils.toString(entity, "UTF-8");
response.close();
httpClient.close();
Document doc = Jsoup.parse(html);
// 获取产品列表信息
Element elementP = doc.getElementById("product-box");
// 获取产品列
Elements elements = elementP.select("li[class=product-item]")
.select("div[class=item-tab-warp]");
// 下一页的信息就存入redis队列当中 做下一次分析的url链接所用
// 如果这个没有数据这个线程就退出
if(elements.size() <=0){
flag = false;
return ;
}
for (Element element : elements) {
// 获取产品价格
String price = element.select("div[class=item-tab]").select("div[class=item-price-info]")
.select("p[class=item-price]")
.select("span[class=price asynPrice]").text();
// 获取产品名称 和产品链接
String producthref = element.select("p[class=item-name]")
.select("a[class=emcodeItem item-link]").attr("href");
String productTitle = element.select("p[class=item-name]")
.select("a[class=emcodeItem item-link]").attr("title");
// 评价人数
String productStatus = element.select("p[class=item-comment-dispatching]")
.select("a[class=comment]").text();
// 经营品牌
String product = element.select("p[class=item-shop]")
.text();
// 将这些信息存入mogondb中
DB db = mongo.getDB("taobao");
DBCollection emp = db.getCollection("productinfo");
DBObject obj = new BasicDBObject();
obj.put("productTitle", productTitle);
obj.put("producthref", producthref);
obj.put("productStatus", productStatus);
obj.put("product", product);
obj.put("price", price);
emp.insert(obj);
// 这里我也纠结了好久要不要关,如果关了就会报错 所以最后就没关了如果各位有好的解决方案 记得告诉我O(∩_∩)O
//mongo.close();
}
// 这里是获取它的下一页,然后将下一页的连接加入到redis队列当中
int page = Integer.parseInt(url.substring(url.lastIndexOf("=")+1))+1;
String redisToVisit = url.substring(0, url.lastIndexOf("=")+1)+page;
if(page >5){
flag = false;
return;
}
/**
* 在此同步块中主要进行提取出来的url的写操作,必须是同步操作,保证一个同
* 一时间只有一个线程在对Redis数据库进行写操作。
*/
synchronized(lock){
// 加入到redis队列中
RedisQueue.addToVisit(redisToVisit);
}
} else {
//在改变线程计数器的值的时候必须保证线程的同步性
synchronized (lock) {
//等待线程数的计数器的计数器减1
threads--;
//如果仍然有其他线程在活动,则通知此线程进行等待
if (threads > 0) {
/*调用线程的wait方法会将此线程挂起,直到有其他线程调用notify\
notifyAll将此线程进行唤醒*/
wait();
threads++;
} else {
//如果其他的线程都在等待,说明待抓取队列已空,则通知所有线程进行退出
notifyAll();
return;
}
}
}
}
}
public void run() {
//虽然run方法不能抛出异常,但是可以在run方法中进行try,catch
try {
parseToVisitUrltoRedis();
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 主函数
package com.xdl.redisUtil;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import com.mongodb.MongoClient;
public class MultithreadCrawler {
/**
* @throws Exception
* @throws InterruptedException
* @Title: main
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param @param args 参数
* @return void 返回类型
* @author liangchu
* @date 2018-1-6 下午12:19:53
* @throws
*/
public static void main(String[] args) throws Exception {
//拿到种子链接 这里主要从 手机 服饰 电器 食品 这几个大的方面来抓取
List<String> strings = new ArrayList<String>();
strings.add("手机");
strings.add("男装");
strings.add("女装");
strings.add("电脑");
strings.add("相机");
strings.add("食品");
//将种子链接写进redis数据库的待抓取列表
for (String url : strings) {
RedisQueue.addToVisit("http://search.gome.com.cn/search?question="+url+"&searchType=goods&page=1");
}
//创建一个收集线程的列表
List<Thread> threadList = new ArrayList<Thread>();
//创建线程的个数
int threadNum = 1;
MongoClient mongo = new MongoClient("127.0.0.1", 27017);
RunThread run = new RunThread();
run.setThreads(threadNum,mongo);
//创建5个线程,并对其进行收集
for (int i = 0; i < threadNum; i++) {
Thread thread = new Thread(run);
thread.start();
threadList.add(thread);
}
//main线程需要等待所有子线程退出
while (threadList.size() > 0) {
Thread child = threadList.remove(0);
child.join();
}
}
}
商品信息列表.png
总结
不得不说加入了redis队列和mongodb存储数据 效率简直要起飞了。15s不到就抓了1200条商品信息。因为有了上次的教训不敢抓得太久,所以只抓取了1200条。如果有不怕封的小伙伴可以试试,当然后果是自负。O(∩∩)O,终于弄完了整整一天。下次加入quartz定时任务,这样获取股票,天气,航班什么的都可以获取实时的了。如果有需求的小伙伴可以留言,有时间一定完成。good night!!(*^_^*) 嘻嘻