Featured image of post 多级缓存和缓存同步

多级缓存和缓存同步

多级缓存

OpenResty - 本地缓存

一个基于Nginx的高性能Web平台,基于Lua语言进行扩展,集成大量精良的Lua库、第三方模块,允许使用Lua自定义业务逻辑、自定义库

openresty-opm 可以安装第三方lua库

可以把openresty当nginx来用。

nginx.conf

# lua 模块
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
# c模块
lua_package_cpath "/usr/local/openresty/lualib/?.so;;";
server{
	xxx
	location /api/xxx {
        # 默认响应类型
		default_type application/json;
        # 响应结果由 xxx.lua决定
		content_by_lua_file lua/xxx.lua
	}
}

xxx.lua

ngx.say('{"id":1,"name":"xxx"}')

请求参数处理

参数类型 参数示例 参数获取方式
路径占位符 /item/1001 通过正则表达式匹配,在Nginx配置中使用如下配置来匹配并获取参数:
location ~ /item/(\d+) {
content_by_lua_file lua/item.lua;
}
匹配到的参数存入ngx.var数组中,通过索引获取:
local id = ngx.var[1]
请求头 id: 1001 使用OpenResty获取请求头,返回值是table类型:
local headers = ngx.req.get_headers()
Get请求参数 ?id=1001 获取GET请求参数,返回值是table类型:
local getParams = ngx.req.get_uri_args()
Post表单参数 id=1001 先调用ngx.req.read_body()以确保请求体被完全读取,然后获取POST表单参数,返回值是table类型:
local postParams = ngx.req.get_post_args()
JSON参数 {"id": 1001} 先调用ngx.req.read_body()以确保请求体被完全读取,然后获取body中的json参数,返回值是string类型:
local jsonBody = ngx.req.get_body_data()

查询Tomcat - 进程缓存

nginx内部发送http请求

local resp = ngx.location.capture("/path", {
    method = ngx.HTTP_GET, -- 请求方式
    args = {a=1, b=2}, -- get方式传参数
    body = "c=3&d=4" -- post方式传参数
})

返回的响应内容包括:

  • resp.status: 响应状态码
  • resp.header: 响应头,是一个table
  • resp.body: 响应体,就是响应数据

注意: 这里的path是路径,不包含ip和端口。这个请求会被Nginx内部的server监听并处理。

但我们希望这个请求发送到Tomcat服务器,所以还需要编写一个server来对这个路径做反向代理

location /path {
	proxy_pass http://tomcat_ip:tomcat_port;
}

封装http查询的函数

  1. 在/usr/local/openresty/lualib目录下创建common.lua文件:

    vi /usr/local/openresty/lualib/common.lua
    
  2. 在common.lua中封装http查询的函数

    -- 封装函数,发送http请求,并解析响应
    local function read_http(path, params)
        local resp = ngx.location.capture(path,{
            method = ngx.HTTP_GET,
            args = params,
        })
        if not resp then
            -- 记录错误信息,返回404
            ngx.log(ngx.ERR, "http not found, path: ", path, ", args: ", args)
            ngx.exit(404)
        end
        return resp.body
    end
    -- 封装方法导出
    local _M = {
        read_http = read_http
    }
    return _M
    

    编写Lua

    -- 导入common函数库
    local common = require('common')
    local read_http = common.read_http
    -- 导入cjson库
    local cjson = require('cjson')
    
    -- 获取路径参数
    local id = ngx.var[1]
    
    -- 查询商品信息
    local itemJSON = read_http("/item/" .. id, nil)
    -- 查询库存信息
    local stockJSON = read_http("/item/stock/" .. id, nil)
    
    -- JSON转化为Lua的table
    local item = cjson.decode(itemJSON)
    local stock = cjson.decode(stockJSON)
    -- 组合数据
    item.stock = stock.stock
    item.sold = stock.sold
    
    -- 把item序列化为json 返回给客户
    ngx.say(cjson.encode(item))
    

Tomcat集群负载均衡

# 反向代理配置
location /item {
	proxy_pass http://tomcat-cluster;
}
# tomcat集群配置
upstream tomcat-cluster{
    hash $request_uri; # 保证相同ip访问同一个缓存
	server xxx:xxx;
    server xxx:xxx;
}

Redis

冷启动与缓存预热

冷启动: 服务刚刚启动时,Redis中并没有缓存,如果所有商品数据都在第一次查询时添加缓存,可能会给数据库带来较大压力。

缓存预热: 在实际开发中,我们可以利用大数据统计用户访问的热点数据,在项目启动时将这些热点数据提前查询并保存到Redis中。

缓存预热

@Component
public class RedisHandler implements InitializingBean {
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    @Autowired
    private IItemService itemService;
    
    private static final ObjetMapper MAPPR = new ObjectMapper();
    
    @Autowired
    private IItemStockService stockService;

    @Override
    public void afterPropertiesSet() throws Exception {
        // 初始化逻辑...
        // 1. 查询商品信息
        List<Item> itemList = itemService.list();

        // 2. 放入缓存
        for (Item item : itemList) {
            // 2.1. item序列化为JSON
            String json = MAPPER.writeValueAsString(item);
            // 2.2. 存入redis
            redisTemplate.opsForValue().set("item🆔" + item.getId(), json);
        }

        // 3. 查询商品库存信息
        List<ItemStock> stockList = stockService.list();

        // 4. 放入缓存
        for (ItemStock stock : stockList) {
            // 2.1. item序列化为JSON
            String json = MAPPER.writeValueAsString(stock);
            // 2.2. 存入redis
            redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
        }
    }
    
    // 后面会用到的方法,这里不需要
    public void saveItem(Item item) {
        try {
            String json = MAPPER.writeValueAsString(item);
            redisTemplate.opsForValue().set("item🆔" + item.getId(), json);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    public void deleteItemById(Long id) {
        redisTemplate.delete("item🆔" + id);
    }

}

让OpenResty优先查询Redis

openresty提供了操作redis的模块

引入Redis模块,并初始化Redis对象

local redis = require("resty.redis")
-- 初始化Redis对象
local red = redis:new()
-- 设置Redis超时时间
red:set_timeouts(1000, 1000, 1000)

封装函数,用来释放Redis连接,其实是放入连接池

local function close_redis(red)
    local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒
    local pool_size = 100 -- 连接池大小
    local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
    if not ok then
        ngx.log(ngx.ERR, "放入Redis连接池失败: ", err)
    end
end

封装函数,从Redis读数据并返回

-- 封装redis的方法 读取ip和port是redis地址,key是查询的key
local function read_redis(ip, port, key)
    -- 连接一个连接
    local ok, err = red:connect(ip, port)
    if not ok then
        ngx.log(ngx.ERR, "连接redis失败:", err)
        return nil
    end
    -- 查询redis
    local resp, err = red:get(key)
    -- 查询失败处理
    if not resp then
        ngx.log(ngx.ERR, "查询Redis失败:", err, ", key = ", key)
        return nil
    end
    -- 检测被标为为空的响应
    if resp == ngx.null then
        resp = nil
        ngx.log(ngx.ERR, "查询Redis数据为空,key = ", key)
    end
    close_redis(red)
    return resp
end

优先查询Redis:

-- 封装查询函数,先查询redis,再查询http
local function read_data(key, path, params)
    -- 查询redis
    local resp = read_redis("127.0.0.1", 6379, key)
    -- 判断redis是否命中
    if not resp then
        -- Redis查询失败, 查询http
        resp = read_http(path, params)
    end
    return resp
end

全部代码:

-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 导入cjson库
local cjson = require('cjson')

-- 封装查询函数
function read_data(key, path, params)
    -- 查询redis
    local resp = read_redis("127.0.0.1", 6379, key)
    -- 判断redis是否命中
    if not resp then
        -- Redis查询失败, 记录日志并尝试http
        ngx.log(ngx.ERR, "redis查询失败, 尝试http, key: ", key)
        -- redis查询失败, 尝试http
        resp = read_http(path, params)
    end
    return resp
end

-- 获取路径参数
local id = ngx.var[1]

-- 查询商品信息
local itemJSON = read_data("item🆔" .. id, "/item/" .. id, nil)
-- 查询库存信息
local stockJSON = read_data("item:stock:id:" .. id, "/item/stock/" .. id, nil)

-- JSON转化为Lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 组合数据
item.stock = stock.stock
item.sold = stock.sold

-- 把item序列化为json 返回客户端
ngx.say(cjson.encode(item))

Nginx本地缓存

OpenResty为Nginx提供了shared dict的功能,可以在nginx的多个worker之间共享数据,实现缓存功能。

开启共享词典: nginx.conf

http{
	...
	lua_shared_dict item_cache 150m;
}

操作共享词典

-- 获取本地缓存对象
local item_cache = ngx.shared.item_cache
-- 存储,指定key, value, 过期时间,单位s, 默认为0代表永不过期
item_cache:set('key', 'value', 1000)
-- 读取
local val = item_cache:get('key')

在查询时优先查询本地缓存

修改查询函数

-- 封装查询函数
function read_data(key, expire, path, params)
    -- 查询本地缓存
    local val = item_cache:get(key)
    if not val then
        ngx.log(ngx.ERR, "本地缓存中查询失败, 尝试查询Redis, key: ", key)
        -- 查询redis
        val = read_redis("127.0.0.1", 6379, key)
        -- 判断redis查询结果
        if not val then
            ngx.log(ngx.ERR, "redis查询失败, 尝试查询http, key: ", key)
            -- redis查询失败, 查询http
            val = read_http(path, params)
        end
    end
    -- 查询成功, 把数据写入本地缓存
    item_cache:set(key, val, expire)
    -- 返回数据
    return val
end

缓存同步

缓存同步策略

设置有效期: 给缓存设置有效期,到期后自动删除。再次查询时更新

  • 优势:简单、方便
  • 缺点:时效性差,缓存过期之前可能不一致
  • 应用:更新频率较低,时效性要求低的业务

**同步双写:**在修改数据库的同时,直接修改缓存

  • 优势:时效性强,缓存与数据库强一致
  • 缺点:有代码侵入,稳定度高;
  • 应用:对一致性、时效性要求较高的缓存数据

**异步更新:**修改数据库时发送时间通知,相关服务监听到通知后修改缓存数据

  • 优势:低耦合,可以同时通知多个缓存服务
  • 缺点:时效性一般,可能存在中间不一致状态
  • 应用:时效性要求一般,有多个服务需要同步

基于Canel异步通知

Canel是基于MySQL的主从同步来实现的,MySQK主从同步的原理如下:

image-20240219200836114

  • MySQL master 将数据变更写入二进制日志(binary log),其中记录的数据叫做binary log events
  • MySQL slave 将 master 的 binary log events拷贝到自己的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映到自己的数据库

Canel把自己伪装成MySQL的一个slave节点,从而监听master的binary log变化。再把得到的变化信息通知给Canal的康双人,进而完成对其他数据库的同步。

image-20240219201154209

推荐用canal-starter作为canal客户端

canal:
	destination: xxx # canal实例名称,要跟canal-server运行时设置的destination一致
	server: ip:port

编写监听器,监听Canal消息

@CanalTable("tb_item") // 指定要监听的表
@Component
public class ItemHandler implements EntryHandler<Item> {

    @Autowired
    private RedisHandler redisHandler;
    @Autowired
    private Cache<Long, Item> itemCache;
    
    @Override
    public void insert(Item item) {
        // 写数据到JVM进程缓存
        itemCache.put(item.getId(),item);
        // 新增数据更新到redis
        redisHandler.saveItem(item);
    }

    @Override
    public void update(Item before, Item after) {
        // 写数据到JVM进程缓存
        itemCache.put(after.getId(),item);
        // 更新redis数据
		redisHandler.saveItem(after);
    }

    @Override
    public void delete(Item item) {
        // 写数据到JVM进程缓存
        itemCache.invalidate(item.getId());
        // 删除redis数据
        redisHandler.deleteItemById(item.getId());
    }
}

Canal 推送给 canal-client 的是被修改的这一行数据 (row), 而我们引入的canal-client则会帮我们把行数据封装到 item实体类中。这个过程中需要知道数据库与实体的映射关系,要用到JPA的几个注解:

@Data
@TableName("tb_item")
public class Item {
    @TableId(type = IdType.AUTO)
    private Long id;

    @Column(name = "name")
    private String name;

    // ... 其它字段略
    @TableField(exist = false)
    @Transient //标记不属于表中的字段
    private Integer stock;
    @TableField(exist = false)
    @Transient //标记不属于表中的字段
    private Integer sold;
}