原文地址:https://www.douyacun.com/article/f8f4e2bb806a6176e9c61dcaa0a4a6cc
背景:
需求:
todo:
time/rate 是go提供的官方限流工具
原理: 令牌桶,以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务
我首先想到的方案初始化定时任务,每隔多长时间往桶里放一令牌。但是相比之下,time/rate的实现方式就是更优雅了
关键变量:
已知变量:
简化公式:
当前可用令牌数,tokens(当前可用令牌数) := (now (当前时间) - last(上一次消耗令牌的时间) ).Seconds() * limit(令牌/每秒)
完整代码是:
// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
last := lim.last
if now.Before(last) {
last = now
}
// Avoid making delta overflow below when last is very old.
// 这里限制令牌数不超过burst,lim.tokens 是上一次结余
maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
elapsed := now.Sub(last)
if elapsed > maxElapsed {
elapsed = maxElapsed
}
// Calculate the new number of tokens, due to time that passed.
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta // 上一次结余 + 时间增量
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
return now, last, tokens
}
比起定时器来说是不是很优雅?这里有个比较绕的点:limit 的单位,n/s, 每秒多少个令牌
// Every converts a minimum time interval between events to a Limit.
func Every(interval time.Duration) Limit {
if interval <= 0 {
return Inf
}
return 1 / Limit(interval.Seconds())
}
计算可用tokens的公式就更好理解了,用 tokens(当前可用令牌数) := (now (当前时间) - last(上一次消耗令牌的时间) ).Seconds() * limit(令牌/每秒)
time/rate 只是给出了最基础的实现,绝大部分的需求
其实 time/rate 的成本是非常低的,所以我们只需要多层map结构就可以满足, 每次初始化一个 rate.Limiter 就可以满足
type ApiKeyRateLimit interface {
Load()
GetLimiter(api, key string) (limiter *Limiter, ok bool)
}
// API - UserId - rate.Limiter
type ApiKeyRateLimitImpl struct {
hub map[string]map[string]*rate.Limiter
}
动态加载配置,可以使用etcd订阅配置,我实现的是定时从数据库里捞配置
type ApiAccountRateLimit struct {
ctx context.Context
lim rate.ApiKeyRateLimit
}
func NewApiAccountRateLimit(ctx context.Context, limit rate.ApiKeyRateLimit) *ApiAccountRateLimit {
r := &ApiAccountRateLimit{
ctx: ctx,
lim: limit,
}
go r.load()
return r
}
func (h *ApiAccountRateLimit) Hook() gin.HandlerFunc {
return func(ctx *gin.Context) {
path := ctx.Request.URL.Path
acctId, ok := ctx.Get(consts.OpenapiAccountId)
if !ok {
ctx.JSON(http.StatusOK, gin.H{"code": http.StatusUnauthorized, "message": http.StatusText(http.StatusUnauthorized)})
ctx.Abort()
return
}
limiter, ok := h.lim.GetLimiter(path, acctId.(string))
if !ok {
ctx.JSON(http.StatusOK, gin.H{"code": http.StatusUnauthorized, "message": http.StatusText(http.StatusUnauthorized)})
ctx.Abort()
}
// 响应头:X-RateLimit-* 这里time/rat没有放出Advance,需要改一下
_, _, tokens := limiter.Advance(time.Now())
burst := limiter.Burst()
ctx.Header("X-RateLimit-Limit", strconv.Itoa(burst))
ctx.Header("X-RateLimit-Remaining", strconv.Itoa(int(tokens)))
if !limiter.Allow() {
ctx.JSON(http.StatusOK, gin.H{"code": http.StatusTooManyRequests, "message": http.StatusText(http.StatusTooManyRequests)})
ctx.Abort()
return
}
ctx.Next()
}
}
// 定时加载配置
func (h *ApiAccountRateLimit) load() {
ticker := time.NewTicker(5 * time.Minute)
for {
select {
case <-h.ctx.Done():
logger.Errorf("quit: RateLimit load config")
case <-ticker.C:
h.lim.Load()
}
}
}