通过canal+Nginx+lua+redis实现缓存预热和二级

2020-03-06  本文已影响0人  走在冷风中吧

缓存穿透问题: 当客户端发起访问, nginx本地没有缓存, 查询redis也没有缓存, 就会去查mysql, 当mysql中查询不到数据时, nginx和redis中不会也有更新的缓存数据; 当这种无结果的访问被黑客攻击高并发请求时, 就会造成mysql数据库频繁访问, 产生缓存穿透现象.

解决措施: 使用nginx+redis实现缓存预热, 如果从nginx和redis中都无法获得数据, 直接返回给客户端, 不去访问数据库.


缓存预热原理图, 通过nginx和redis将mysql访问压力截断

实现技术: canal监测数据库变化 + rabbitmq消息队列分发+nginx lua执行redis脚本和mysql数据库数据更新


缓存预热以及二级缓存实现技术流程

一. canal

阿里研发的对数据库binlog日志监听的服务器技术
原始作用: 为了跨机房进行mysql数据库同步
现在用来监听mysql服务器, 监测数据库数值的变化, 发送给canal客户端

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

(1)查看当前mysql是否开启binlog模式。

SHOW VARIABLES LIKE '%log_bin%'

如果log_bin的值为OFF是未开启,为ON是已开启。

(2)修改/etc/my.cnf 需要开启binlog模式。

[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1

修改完成之后,重启mysqld的服务。

(1)下载地址canal

https://github.com/alibaba/canal/releases/tag/canal-1.0.24

(2)下载之后 上传到linux系统中,解压缩到指定的目录/usr/local/canal
(3)修改 exmaple下的实例配置

vi conf/example/instance.properties

修改如图所示的几个参数。提供监测的mysql服务器的地址,以及用户名和密码


image.png

一定要注释掉下面这个参数,这样就会扫描全库

#canal.instance.defaultDatabaseName =

(3)启动服务:

[root@localhost canal]# ./bin/startup.sh
  1. 创建工程模块changgou_canal,pom引入依赖
    我们这里使用的一个开源的项目,它实现了springboot与canal的集成。比原生的canal更加优雅。
    https://github.com/chenqian56131/spring-boot-starter-canal
    使用前需要将starter-canal安装到本地仓库。安装方法参考:将github第三方jar包安装到本地maven
    安装完成后倒入pom依赖:
<dependency>
    <groupId>com.xpand</groupId>
    <artifactId>starter-canal</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>
  1. 创建包com.changgou.canal ,包下创建启动类
@SpringBootApplication
@EnableCanalClient   //开启canal客户端支持
public class CanalApplication {

    public static void main(String[] args) {
        SpringApplication.run(CanalApplication.class, args);
    }
}
  1. 添加配置文件application.properties
canal.client.instances.example.host=192.168.225.128 //这里是canal服务器端的ip
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000
  1. 创建com.changgou.canal.listener包,包下创建类

@CanalEventListener
public class BusinessListener {

 
    /**
     *  ListenPoint schema: 数据库名  table: 表名
     * @param eventType
     * @param rowData
     */
    @ListenPoint(schema = "changgou_business", table = {"tb_ad"})
    public void adUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        System.err.println("数据发生变化");
         for(CanalEntry.Column column: rowData.getAfterColumnsList()) {
          
        }
    }
}

其中ListenPoint schema代表监测的数据库名, table 代表监测的表名
启动客户端服务, 这时如果修改了changgou_business库中tb_ad表中的值, 就会在控制台上收到打印的内容: 数据发生变化
rowData.getAfterColumnsList() 可以获取到对应产生变化后的那一行的数据.
rowData.getBeforeColumnsList() 可以获取到对应产生变化前的那一行的数据.
监控到mysql数据的变化后可以根据自己的需求发送变化通知, 这里我们之后会发送rabbitmq消息通知队列监控服务做出处理.

二. RabbitMQ

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
</dependency>
spring.rabbitmq.host=192.168.225.128  //rabbitMQ的服务器ip
@CanalEventListener
public class BusinessListener {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @ListenPoint(schema = "changgou_business", table = {"tb_ad"})
    public void adUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        System.err.println("广告数据发生变化");

        //修改后数据
        for(CanalEntry.Column column: rowData.getAfterColumnsList()) {
            if(column.getName().equals("position")){
                System.out.println("发送消息到mq  ad_update_queue:"+column.getValue());
                // 参数2ad_update_queue为消息队列的名称
                //参数1: 是交换机exchage. 这个例子中没有使用
               // 参数3为发送的消息内容, 这个列子中我们发送的为position的值 
               rabbitTemplate.convertAndSend("","ad_update_queue",column.getValue());  //发送消息到mq
                break;
            }
        }
    }
}
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
  <groupId>com.squareup.okhttp3</groupId>
  <artifactId>okhttp</artifactId>
  <version>3.9.0</version>
</dependency>

在spring节点下配置rabbitmq的host地址

spring:
  rabbitmq:
    host: 192.168.200.128

创建rabbitmq消息监听类, 注解@RabbitListener设置监听的队列名称

@Component
@RabbitListener(queues = "ad_update_queue")
public class AdListener {

    /**
     * 获取更新广告通知
     * @param message
     */
    @RabbitHandler
    public void updateAd(String message){
        System.out.println("接收到消息:"+message);      
}

三. nginx+lua+redis

当我们接收到数据库信息变更时, 最重要的是通知nginx进行本地更新并且将数据缓存至redis, 使用户能访问获得最新数据.
这里我们使用openRestry,OpenResty(又称:ngx_openresty) 是一个基于 NGINX 的可伸缩的 Web 平台,由中国人章亦春发起,提供了很多高质量的第三方模块。OpenResty 简单理解成 就相当于封装了nginx,并且集成了LUA脚本,开发人员只需要简单的其提供了模块就可以实现相关的逻辑,而不再像之前,还需要在nginx中自己编写lua的脚本,再进行调用了。

1.添加仓库执行命令

 yum install yum-utils
 yum-config-manager --add-repo https://openresty.org/package/centos/openresty.repo

2.执行安装

yum install openresty

3.安装成功后 会在默认的目录如下:

/usr/local/openresty

修改/usr/local/openresty/nginx/conf/nginx.conf ,将配置文件使用的根设置为root,目的就是将来要使用lua脚本的时候 ,直接可以加载在root下的lua脚本。

#user nobody; 配置文件第一行原来为这样, 现改为下面的配置
user root root;

四. 实现缓存预热

实现思路:
(1)用户请求获取广告数据, 先从nginx缓存中读取, 若没有, 则从redis中读取
(2)监控mysql广告数据发生变化, 通知nginx进行数据更新, 同时将更新的json数据保存到redis, 保证缓存到最新的数据

        location /ad_read {
            content_by_lua_file /root/lua/ad_read.lua;
        }

路由将有/root/lua/ad_read.lua;脚本执行, 脚本的内容会从nginx本地或redis中获取缓存数据, openResty需要开启共享内存, 提供二级缓存功能, 减轻redis压力.
ad_read.lua脚本内容:

--设置响应头类型
ngx.header.content_type="application/json;charset=utf8"
--获取请求中的参数ID
local uri_args = ngx.req.get_uri_args();
local position = uri_args["position"];

--获取本地缓存
local cache_ngx = ngx.shared.dis_cache;
--根据ID 获取本地缓存数据
local adCache = cache_ngx:get('ad_cache_'..position);

if adCache == "" or adCache == nil then

    --引入redis库
    local redis = require("resty.redis");
    --创建redis对象
    local red = redis:new()
    --设置超时时间
    red:set_timeout(2000)
    --连接
    local ok, err = red:connect("192.168.225.128", 6379)
    --获取key的值
    local rescontent=red:get("ad_"..position)
    --输出到返回响应中
    ngx.say(rescontent)
    --关闭连接
    red:close()
    --将redis中获取到的数据存入nginx本地缓存
    cache_ngx:set('ad_cache_'..position, rescontent, 10*60);
else
    --nginx本地缓存中获取到数据直接输出
    ngx.say(adCache)
end
#包含redis初始化模块
lua_shared_dict dis_cache 5m;  #共享内存开启

修改/usr/local/openresty/nginx/conf/nginx.conf文件:

 server {
        ....
        # 添加
        location /ad_update {
            content_by_lua_file /root/lua/ad_update.lua;
        }
        ....
         
    }

ad_update.lua

ngx.header.content_type="application/json;charset=utf8"
local cjson = require("cjson")
local mysql = require("resty.mysql")
local uri_args = ngx.req.get_uri_args()
local position = uri_args["position"]

local db = mysql:new()
db:set_timeout(1000)  
local props = {  
    host = "192.168.200.128",  
    port = 3306,  
    database = "changgou_business",  
    user = "root",  
    password = "123456"  
}

local res = db:connect(props)  
local select_sql = "select url,image from tb_ad where status ='1' and position='"..position.."' and start_time<= NOW() AND end_time>= NOW()"  
res = db:query(select_sql)  
db:close()  

local redis = require("resty.redis")
local red = redis:new()
red:set_timeout(2000)

local ip ="192.168.200.128"
local port = 6379
red:connect(ip,port)

red:set("ad_"..position,cjson.encode(res))
red:close()

ngx.say("{flag:true}")
上一篇下一篇

猜你喜欢

热点阅读