陵水文章资讯

nginx+lua+redis实现限流的示例代码

2026-04-06 16:16:02 浏览次数:1
详细信息

1. 安装必要的模块

# 安装 OpenResty(已包含 nginx + lua)
# 或者安装 lua-nginx-module 到普通 nginx
# 需要安装 redis-lua 库
luarocks install lua-resty-redis

2. 限流实现代码

限流模块 rate_limit.lua

local redis = require "resty.redis"
local string_format = string.format

local _M = {}

-- 滑动窗口限流算法
-- key: redis key
-- period: 时间窗口(秒)
-- limit: 限制次数
-- window_size: 窗口内子窗口数量(默认10)
function _M.sliding_window(key, period, limit, window_size)
    window_size = window_size or 10
    local now = ngx.now() * 1000  -- 毫秒时间戳
    local window_period = period * 1000 / window_size  -- 每个子窗口的时长(毫秒)

    local red = redis:new()
    red:set_timeout(1000)  -- 1秒超时

    local ok, err = red:connect("127.0.0.1", 6379)
    if not ok then
        ngx.log(ngx.ERR, "failed to connect to redis: ", err)
        return false
    end

    -- 使用redis pipeline提高性能
    red:init_pipeline()

    -- 1. 移除过期的子窗口
    local oldest = now - (period * 1000)
    red:zremrangebyscore(key, 0, oldest)

    -- 2. 添加当前请求到滑动窗口
    red:zadd(key, now, string_format("%s_%s", now, ngx.var.remote_addr))

    -- 3. 设置key过期时间(防止长期占用内存)
    red:expire(key, period * 2)

    -- 4. 获取当前窗口内的请求数
    red:zcard(key)

    local results, err = red:commit_pipeline()
    if not results then
        ngx.log(ngx.ERR, "failed to commit pipeline: ", err)
        red:set_keepalive(10000, 100)
        return false
    end

    -- 释放连接到连接池
    red:set_keepalive(10000, 100)

    local current_count = results[4] or 0

    -- 检查是否超过限制
    if current_count > limit then
        return false, current_count
    end

    return true, current_count
end

-- 令牌桶限流算法
-- key: redis key
-- rate: 每秒生成令牌数
-- capacity: 桶容量
function _M.token_bucket(key, rate, capacity)
    local now = ngx.now()
    local red = redis:new()
    red:set_timeout(1000)

    local ok, err = red:connect("127.0.0.1", 6379)
    if not ok then
        ngx.log(ngx.ERR, "failed to connect to redis: ", err)
        return false
    end

    -- 使用redis的hash存储令牌桶状态
    -- tokens: 当前令牌数
    -- last_time: 上次更新时间戳
    local data = red:hgetall(key)

    local tokens = capacity
    local last_time = now

    if data and #data > 0 then
        for i = 1, #data, 2 do
            if data[i] == "tokens" then
                tokens = tonumber(data[i+1]) or capacity
            elseif data[i] == "last_time" then
                last_time = tonumber(data[i+1]) or now
            end
        end
    end

    -- 计算新增的令牌
    local elapsed = now - last_time
    local new_tokens = elapsed * rate

    -- 更新令牌数量(不超过容量)
    tokens = math.min(capacity, tokens + new_tokens)

    -- 检查是否有足够令牌
    if tokens < 1 then
        red:set_keepalive(10000, 100)
        return false, 0
    end

    -- 消耗一个令牌
    tokens = tokens - 1

    -- 更新redis
    red:hmset(key, "tokens", tokens, "last_time", now)
    red:expire(key, math.ceil(capacity / rate) * 2)

    red:set_keepalive(10000, 100)

    return true, tokens
end

-- 固定窗口限流算法
-- key: redis key
-- period: 时间窗口(秒)
-- limit: 限制次数
function _M.fixed_window(key, period, limit)
    local red = redis:new()
    red:set_timeout(1000)

    local ok, err = red:connect("127.0.0.1", 6379)
    if not ok then
        ngx.log(ngx.ERR, "failed to connect to redis: ", err)
        return false
    end

    -- 获取当前计数
    local current = tonumber(red:get(key)) or 0

    if current >= limit then
        red:set_keepalive(10000, 100)
        return false, current
    end

    -- 使用INCR增加计数,如果是第一次则设置过期时间
    local new_count = red:incr(key)
    if new_count == 1 then
        red:expire(key, period)
    end

    red:set_keepalive(10000, 100)

    return true, new_count
end

return _M

Nginx 配置文件 nginx.conf

http {
    lua_package_path "/path/to/your/lua/scripts/?.lua;;";

    # 初始化redis连接池
    init_by_lua_block {
        redis_connection_pool_size = 100
        redis_connection_pool_timeout = 10000
    }

    # 共享字典,用于本地缓存
    lua_shared_dict rate_limit_cache 10m;

    server {
        listen 80;

        # 限流配置(针对IP)
        location /api/ {
            access_by_lua_block {
                local rate_limit = require "rate_limit"
                local limit = 100  -- 每分钟100次
                local period = 60  -- 时间窗口60秒

                -- 获取客户端IP
                local client_ip = ngx.var.remote_addr
                local key = "rate_limit:ip:" .. client_ip

                -- 使用滑动窗口算法
                local ok, current = rate_limit.sliding_window(key, period, limit)

                if not ok then
                    ngx.header["X-RateLimit-Limit"] = limit
                    ngx.header["X-RateLimit-Remaining"] = 0
                    ngx.header["X-RateLimit-Reset"] = period
                    ngx.status = 429
                    ngx.say('{"error": "Too Many Requests", "message": "Rate limit exceeded"}')
                    ngx.exit(429)
                else
                    ngx.header["X-RateLimit-Limit"] = limit
                    ngx.header["X-RateLimit-Remaining"] = limit - current
                    ngx.header["X-RateLimit-Reset"] = period
                end
            }

            proxy_pass http://backend_server;
        }

        # 针对用户ID限流
        location /api/user/ {
            access_by_lua_block {
                local rate_limit = require "rate_limit"
                local cjson = require "cjson"

                -- 获取用户ID(从header或参数中)
                local user_id = ngx.req.get_headers()["X-User-ID"] or
                               ngx.var.arg_user_id

                if not user_id then
                    ngx.status = 400
                    ngx.say('{"error": "User ID required"}')
                    ngx.exit(400)
                end

                local key = "rate_limit:user:" .. user_id
                local ok, current = rate_limit.fixed_window(key, 60, 50)

                if not ok then
                    ngx.header["X-RateLimit-Limit"] = 50
                    ngx.header["X-RateLimit-Remaining"] = 0
                    ngx.status = 429
                    ngx.say('{"error": "Too Many Requests"}')
                    ngx.exit(429)
                end
            }

            proxy_pass http://backend_server;
        }

        # 多维度限流示例(IP + API路径)
        location ~ ^/api/v1/(.*)$ {
            access_by_lua_block {
                local rate_limit = require "rate_limit"
                local client_ip = ngx.var.remote_addr
                local api_path = ngx.var[1]

                -- 组合key:IP+API路径
                local key = string.format("rate_limit:combined:%s:%s", 
                                         client_ip, api_path)

                local ok, current = rate_limit.token_bucket(key, 10, 30)  -- 每秒10个令牌,桶容量30

                if not ok then
                    ngx.status = 429
                    ngx.say('{"error": "Rate limit exceeded"}')
                    ngx.exit(429)
                end
            }

            proxy_pass http://backend_server;
        }

        # 查看限流状态(用于监控)
        location /rate-limit/status {
            content_by_lua_block {
                local redis = require "resty.redis"
                local red = redis:new()

                red:set_timeout(1000)
                local ok, err = red:connect("127.0.0.1", 6379)
                if not ok then
                    ngx.say('{"error": "Redis connection failed"}')
                    return
                end

                -- 获取所有限流相关的key
                local keys = red:keys("rate_limit:*")
                local result = {}

                for _, key in ipairs(keys) do
                    local key_type = red:type(key)
                    if key_type == "zset" then
                        local count = red:zcard(key)
                        table.insert(result, {key = key, count = count, type = "sliding_window"})
                    elseif key_type == "hash" then
                        local data = red:hgetall(key)
                        table.insert(result, {key = key, data = data, type = "token_bucket"})
                    elseif key_type == "string" then
                        local count = red:get(key)
                        table.insert(result, {key = key, count = count, type = "fixed_window"})
                    end
                end

                red:set_keepalive(10000, 100)
                ngx.header.content_type = "application/json"
                ngx.say(require("cjson").encode(result))
            }
        }
    }
}

3. 高级限流配置示例

-- advanced_rate_limit.lua
local redis = require "resty.redis"
local cjson = require "cjson"

local _M = {}

-- 分布式限流:使用Redis集群支持
function _M.distributed_limit(key, limit, period, cluster_nodes)
    local red = redis:new()
    red:set_timeout(1000)

    -- 简单的哈希分片
    local hash = ngx.crc32_long(key)
    local node_index = hash % #cluster_nodes + 1
    local node = cluster_nodes[node_index]

    local ok, err = red:connect(node.host, node.port)
    if not ok then
        ngx.log(ngx.ERR, "Failed to connect to redis cluster: ", err)
        return false
    end

    -- 使用Lua脚本保证原子性
    local script = [[
        local key = KEYS[1]
        local limit = tonumber(ARGV[1])
        local period = tonumber(ARGV[2])

        local current = redis.call('incr', key)
        if current == 1 then
            redis.call('expire', key, period)
        end

        if current > limit then
            return 0
        end

        return limit - current
    ]]

    local remaining, err = red:eval(script, 1, key, limit, period)
    red:set_keepalive(10000, 100)

    if not remaining then
        ngx.log(ngx.ERR, "Redis eval failed: ", err)
        return false
    end

    return remaining >= 0, tonumber(remaining) or 0
end

-- 漏桶算法
function _M.leaky_bucket(key, rate, capacity)
    local now = ngx.now()
    local red = redis:new()
    red:set_timeout(1000)

    local ok, err = red:connect("127.0.0.1", 6379)
    if not ok then
        return false
    end

    local script = [[
        local key = KEYS[1]
        local rate = tonumber(ARGV[1])  -- 流出速率(个/秒)
        local capacity = tonumber(ARGV[2])  -- 桶容量
        local now = tonumber(ARGV[3])

        local data = redis.call('hmget', key, 'water', 'last_time')
        local water = tonumber(data[1]) or 0
        local last_time = tonumber(data[2]) or now

        -- 计算漏出的水量
        local elapsed = now - last_time
        local leaked = elapsed * rate
        water = math.max(0, water - leaked)

        -- 检查桶是否已满
        if water >= capacity then
            redis.call('hmset', key, 'water', water, 'last_time', now)
            redis.call('expire', key, math.ceil(capacity / rate) * 2)
            return 0
        end

        -- 添加水(请求)
        water = water + 1
        redis.call('hmset', key, 'water', water, 'last_time', now)
        redis.call('expire', key, math.ceil(capacity / rate) * 2)

        return 1
    ]]

    local result, err = red:eval(script, 1, key, rate, capacity, now)
    red:set_keepalive(10000, 100)

    if not result then
        return false
    end

    return tonumber(result) == 1
end

return _M

4. 使用说明

配置Redis:确保Redis服务正常运行 安装Lua模块luarocks install lua-resty-redis 调整限流参数:根据实际需求调整limit和period值 监控限流状态:通过 /rate-limit/status 接口查看限流状态

5. 限流策略对比

算法 优点 缺点 适用场景
固定窗口 实现简单 边界问题,不够平滑 简单场景
滑动窗口 更平滑,更精确 实现复杂,消耗资源 精确控制
令牌桶 允许突发流量 实现复杂 API网关
漏桶 平滑流量,防止突发 响应延迟 流量整形

这个方案提供了完整的限流实现,支持多种算法,可以根据具体业务需求选择合适的限流策略。

相关推荐