使用消息队列实现业务解耦(案例)
2020-05-24 本文已影响0人
不孤独的字符串
在一个系统中,对于一些允许异步处理的业务,消息中间件在业务解耦上总能起到很重要的作用,一定程度上能够提高系统的相应时间以及吞吐量。消息中间件本质是一个队列,工作方式就是生产者-消费者模式。
本文实现对某小说网站内容的爬取,同时接入RabbitMQ实现业务解耦。首先说明为什么使用消息队列,一方面,数据库的CRUD的速度与爬虫的速度不一样,利用RabbitMQ可以实现数据库读取业务和爬虫业务解耦,保证爬虫整体的效率;另一方面,使用RabbitMQ的监控平台可以很好的监控爬取的进度情况。实现的功能大体如下:
- 基于schedule的定时功能,固定时间爬取小说信息和章节列表保存到数据库中
- 跟随系统启动单独开启一条线程,每隔一定时间查询数据库中还没有内容的章节,同时将查询到的内容添加到消息队列中。
- 消费者监听队列获取章节信息,爬取对应链接的章节内容,内容的爬取需要考虑分页的问题。
- 定时从数据库中查询还在更新中的小说,爬取新的章节信息。
该功能涉及到的路由键:
mq.config.exchange=kd.novel.direct
#存储小说的队列
mq.config.queue.novel=mq.config.queue.novel
mq.config.routing.novel.key=novel.routing
#存储章节内容的队列
mq.config.queue.chapter=mq.config.queue.chapter
mq.config.routing.chapter.key=chapter.routing
#存储章节下一页内容的队列
mq.config.queue.next.chapter=mq.config.queue.chapter.next
mq.config.routing.next.chapter.key=chapter.next.routing
#存储章节最后一页内容的队列
mq.config.queue.end.chapter=mq.config.queue.chapter.end
mq.config.routing.end.chapter.key=chapter.end.routing
#存储更新的章节信息
mq.config.queue.chapter.update=mq.config.queue.chapter.update
mq.config.routing.chapter.update.key=chapter.update.routing
查询所有还没有章节内容的章节信息,添加到消息队列中:
@Component
public class ChapterInfoSync implements ApplicationRunner {
@Autowired
private ChapterService chapterService;
@Value("${mq.config.exchange}")
private String novelExchange;
@Value("${mq.config.routing.chapter.key}")
private String chapterRouting;
@Autowired
private AmqpTemplate amqpTemplate;
@Override
public void run(ApplicationArguments applicationArguments) throws Exception {
while (true){
int counts = chapterService.getInfoCounts();
List<Chapter> chapterList = null;
for(int i = 1; i <= counts; i++){
chapterList = chapterService.getChapterList(i , SIZE);
for(Chapter chapter : chapterList){
//将查询到的数据添加到消息队列中
amqpTemplate.convertAndSend(novelExchange, chapterRouting, JsonUtils.objectToJson(chapter));
}
TimeUnit.MINUTES.sleep(5);
}
}
}
private static final int SIZE = 500;
}
使用@RabbitListener实现队列监听,获取消息队列中的信息,解析数据得到章节内容的访问链接,从而爬取章节的内容。
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.chapter.update}", autoDelete = "false"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
key = "${mq.config.routing.chapter.update.key}"
)
)
public class ChapterReceiver {
@Autowired
private NovelService novelService;
@Autowired
private AmqpTemplate amqpTemplate;
@Value("${mq.config.exchange}")
private String novelExchange;
@Value("${mq.config.routing.novel.key}")
private String novelRouting;
@RabbitHandler
public void novelInfo(String msg) throws Exception{
Novel novel = JsonUtils.jsonToPojo(msg, Novel.class);
HttpClientBuilder builder = HttpClients.custom();
builder.setUserAgent("Mozilla/5.0(Windows;U;Windows NT 5.1;en-US;rv:0.9.4)");
CloseableHttpClient httpclient = builder.build();
CloseableHttpResponse response = null;
try {
HttpGet httpget = new HttpGet(novel.getLink());
response = httpclient.execute(httpget); //执行
int code = response.getStatusLine().getStatusCode(); //获取响应状态码
String html = "";
if (code == 200) {
html = EntityUtils.toString(response.getEntity(), "utf-8");
} else {
EntityUtils.consume(response.getEntity());
}
if ("".equals(html)) {
return;
}
// 解析数据
Document document = Jsoup.parse(html);
Elements chapter = document.select("ul[class=_chapter] li a");
//章节列表
List<Chapter> chapterList = new ArrayList<Chapter>();
int nId = novel.getId();
for (Element element : chapter) {
Chapter info = new Chapter();
String url = Encoder.encodeUrl(element.attr("href"));
String content = Encoder.encodeHtml(element.text());
info.setId(nId);
info.setName(content);
info.setLink(url);
chapterList.add(info);
}
int size = novelService.getNovelSize(nId);
//大小有变化则加入消费队列
if(size != chapterList.size()){
NovelColl novelColl = new NovelColl();
novelColl.setNovel(novel);
novelColl.setChapters(chapterList);
amqpTemplate.convertAndSend(novelExchange, novelRouting, JsonUtils.objectToJson(novelColl));
}
}catch (Exception e){
logger.error("crawel chapter err;", e.getMessage());
}finally {
if(response != null){
response.close();
}
if(httpclient != null){
httpclient.close();
}
}
}
private static final Logger logger = LoggerFactory.getLogger(ChapterReceiver.class);
}
小说的爬虫是基于WebCollector实现和HttpClient实现。WebCollector默认使用多线程进行内容爬取,同时可以设置爬取的深度,因此速度很快,适合小说基本信息和章节列表的爬取。而基于HttpClient实现的章节爬虫则是单线程,方便控制章节内容和小说进行对应。
如果你感兴趣,可以点击链接查看源码:https://gitee.com/hsfeng/bicrawel