目录
关于限流和限流器
固定窗口限流器
滑动窗口限流器
漏桶限流器
令牌桶限流器
总结
关于限流和限流器
限流(Rate Limiting)是一种控制资源使用率的机制,通常用于防止系统过载和滥用。
限流器(Rate Limiter)是一种实现限流的组件或服务。以下是限流器的一些主要用途和原因:
-
防止系统过载:在高流量的情况下,如网络服务或API,限流可以确保系统不会因为过多的并发请求而崩溃。通过限制每个用户或客户端的请求频率,系统可以维持稳定的性能。
-
资源分配:在多用户环境中,限流可以确保所有用户都公平地使用资源,防止某些用户占用过多资源而影响其他用户。
-
成本控制:对于云服务和API提供商来说,限流可以帮助控制成本。通过限制免费层级用户的使用量,可以鼓励用户升级到付费服务。
-
提高用户体验:如果一个服务因为过载而变得缓慢或不可用,限流可以提高用户体验,因为它可以保证服务在高负载下的响应速度。
-
防止滥用:限流可以防止恶意用户或自动化脚本滥用服务,例如防止暴力破解密码或发送垃圾邮件。
-
服务降级:在某些情况下,系统可能会故意降低服务质量,以保护关键功能的正常运行。限流是实现服务降级的一种方式。
-
合规性:某些行业法规可能要求服务提供商限制数据传输速率,以符合隐私和安全标准。
-
缓存友好:限流可以减少对缓存系统的冲击,因为缓存系统可能无法处理非常高的请求率。
固定窗口限流器
这种限流器一时间为周期,用一个计数器配合定时器,限制周期内访问的次数。
type FixedWindowRateLimiter struct {
windowSize time.Duration
limit uint64
counter uint64
ticker *time.Ticker
stop chan struct{}
status bool
}
func NewFixedWindowRateLimiter(windowSize time.Duration, limit uint64) *FixedWindowRateLimiter {
now := uint64(time.Now().UnixNano())
return &FixedWindowRateLimiter{
windowSize: windowSize,
limit: limit,
start: now,
stop: make(chan struct{}),
status: false,
}
}
-
windowSize
限流器周期 -
limit
最大访问次数限制 -
counter
计数器 -
ticker
计时器 -
stop
关闭信号管道 -
status
限流器状态
启动和关闭:
Start启动定时器用go协程处理周期更新和收到关闭信号,Close向关闭信号管道发送关闭信号。
func (fwrl *FixedWindowRateLimiter) Start() {
fwrl.ticker = time.NewTicker(fwrl.windowSize)
fwrl.status = true
go func() {
for {
select {
case <-fwrl.ticker.C:
atomic.StoreUint64(&fwrl.counter, 0)
case <-fwrl.stop:
fwrl.ticker.Stop()
fwrl.status = false
return
}
}
}()
}
func (fwrl *FixedWindowRateLimiter) Close() {
close(fwrl.stop)
}
实现的方法:
// 请求一次访问
func (fwrl *FixedWindowRateLimiter) Do() bool {
if !fwrl.status {
return false
}
currentCounter := atomic.LoadUint64(&fwrl.counter)
if currentCounter >= fwrl.limit {
return false
}
atomic.AddUint64(&fwrl.counter, 1)
return true
}
// 剩余可访问次数
func (fwrl *FixedWindowRateLimiter) Check() uint64 {
if !fwrl.status {
return 0
}
return fwrl.limit - atomic.LoadUint64(&fwrl.counter)
}
// 更新并重启限流器
func (fwrl *FixedWindowRateLimiter) Update(windowSize time.Duration, limit uint64) {
fwrl.windowSize = windowSize
fwrl.limit = limit
if fwrl.status {
fwrl.ticker.Stop()
fwrl.Start()
}
}
关于原子操作:
在代码中使用了atomic包的原子操作,目的是为了保证高并发读写下限流器的数据准确性。atomic包的实现在sre/internal/runtime/atomic,本质上使用汇编语言保证了操作的原子性。
例如swapUint64函数中,调用.Xchg64的代码如下:
TEXT ·Xchg64(SB), NOSPLIT, $0-24
MOVD ptr+0(FP), R0
MOVD new+8(FP), R1
#ifndef GOARM64_LSE
MOVBU internal∕cpu·ARM64+const_offsetARM64HasATOMICS(SB), R4
CBZ R4, load_store_loop
#endif
SWPALD R1, (R0), R2
MOVD R2, ret+16(FP)
RET
#ifndef GOARM64_LSE
load_store_loop:
LDAXR (R0), R2
STLXR R1, (R0), R3
CBNZ R3, load_store_loop
MOVD R2, ret+16(FP)
RET
#endif
计数器限流的严重问题: 这个算法虽然简单,但是有一个十分致命的问题,那就是临界问题,我们看下图:
从上图中我们可以看到,假设有一个恶意用户,他在0:59时,瞬间发送了100个请求,并且1:00又瞬间发送了100个请求,那么其实这个用户在 1秒里面,瞬间发送了200个请求。
我们刚才规定的是1分钟最多100个请求(规划的吞吐量),也就是每秒钟最多1.7个请求,用户通过在时间窗口的重置节点处突发请求, 可以瞬间超过我们的速率限制。
用户有可能通过算法的这个漏洞,瞬间压垮我们的应用。
滑动窗口限流器
跟固定窗口限流器差不多,只不过粒度更小了。
type SlidingWindowRateLimiter struct {
windowSize time.Duration
subWindow time.Duration
limit uint64
counters []uint64
start uint64
ticker *time.Ticker
subticker *time.Ticker
stop chan struct{}
status bool
}
func NewSlidingWindowRateLimiter(windowSize, subWindow time.Duration, limit uint64) (*SlidingWindowRateLimiter, error) {
if windowSize <= subWindow {
return nil, errors.New("wrong size")
}
numSubWindows := int(windowSize / subWindow)
return &SlidingWindowRateLimiter{
windowSize: windowSize,
subWindow: subWindow,
limit: limit,
counters: make([]uint64, numSubWindows),
start: uint64(time.Now().UnixNano()),
stop: make(chan struct{}),
}, nil
}
-
windowSize
限流器周期 -
subWindow
滑动窗口周期 -
limit
最大访问次数限制 -
counters
滑动窗口计数器切片 -
start
周期开启时间 -
ticker
周期计时器 -
subticker
窗口计时器 -
stop
关闭信号管道 -
status
限流器状态
启动和关闭:
滑窗是小粒度的固定窗口,计算index并且重置计数
func (swrl *SlidingWindowRateLimiter) Start() {
swrl.ticker = time.NewTicker(swrl.windowSize)
swrl.subticker = time.NewTicker(swrl.subWindow)
swrl.status = true
go func() {
for {
select {
case <-swrl.subticker.C:
now := uint64(time.Now().UnixNano())
index := int((now - swrl.start) / uint64(swrl.subWindow))
atomic.StoreUint64(&swrl.counters[index%len(swrl.counters)], 0)
case <-swrl.ticker.C:
swrl.start = uint64(time.Now().UnixNano())
case <-swrl.stop:
swrl.ticker.Stop()
swrl.status = false
return
}
}
}()
}
func (swrl *SlidingWindowRateLimiter) Close() {
close(swrl.stop)
}
其他方法:
这里Do和Check都是要计算周期内每个滑窗总和
func (swrl *SlidingWindowRateLimiter) Do() bool {
if !fwrl.status {
return false
}
now := uint64(time.Now().UnixNano())
startIndex := int((now - swrl.start) / uint64(swrl.subWindow))
endIndex := int((now - swrl.start + uint64(swrl.windowSize)) / uint64(swrl.subWindow))
// 计算周期内每个滑窗总和是否大于limit
sum := uint64(0)
for i := startIndex; i <= endIndex; i++ {
index := i
if index >= len(swrl.counters) {
index -= len(swrl.counters)
}
sum += atomic.LoadUint64(&swrl.counters[index])
}
if sum >= swrl.limit {
return false
}
// 增加当前子窗口的计数
currentIndex := int((now - swrl.start) / uint64(swrl.subWindow))
atomic.AddUint64(&swrl.counters[currentIndex], 1)
return true
}
func (swrl *SlidingWindowRateLimiter) Check() uint64 {
if !fwrl.status {
return 0
}
now := uint64(time.Now().UnixNano())
startIndex := int((now - swrl.start) / uint64(swrl.subWindow))
endIndex := int((now - swrl.start + uint64(swrl.windowSize)) / uint64(swrl.subWindow))
sum := uint64(0)
for i := startIndex; i <= endIndex; i++ {
index := i
if index >= len(swrl.counters) {
index -= len(swrl.counters)
}
sum += atomic.LoadUint64(&swrl.counters[index])
}
return swrl.limit - sum
}
func (swrl *SlidingWindowRateLimiter) Update(windowSize time.Duration, subWindow time.Duration, limit uint64) {
swrl.windowSize = windowSize
swrl.limit = limit
swrl.subWindow = subWindow
if swrl.status {
swrl.ticker.Stop()
numSubWindows := int(windowSize / swrl.subWindow)
swrl.counters = make([]uint64, numSubWindows)
swrl.Start()
}
}
漏桶限流器
漏桶,可以想象成一个木桶下面钻了一个小孔,把水倒进来就是请求访问,水漏出去就是允许请求。
理论上小孔流出水的速率是不变的,也就是允许请求的速率是不变的,这就是漏桶的特点,你可以随便倒水,但是流出水速率恒定,实现按了平稳的访问速率。
type LeakyBucket struct {
capacity uint
remaining uint
ticker *time.Ticker
reset time.Time
rate time.Duration
mutex sync.Mutex
stop chan struct{}
status bool
}
func NewLeakyBucket(capacity uint, rate time.Duration) (*LeakyBucket, error) {
return &LeakyBucket{
capacity: capacity,
remaining: capacity,
rate: rate,
status: false,
}
}
-
capacity
桶的容量 -
remaining
剩余的容量 -
ticker
计时器 -
reset
重置的时间 -
rate
漏桶的速率 -
mutes
互斥锁 -
stop
关闭信号管道 -
status
桶的状态
启动和关闭:
func (lb *LeakyBucket) Start() {
lb.ticker = time.NewTicker(swrl.rate)
lb.status = true
lb.reset = time.Now().Add(rate)
go func() {
for {
select {
case <-lb.ticker.C:
lb.mutex.lock()
if lb.remaining < lb.capacity {
lb.remaining += 1
}
lb.mutex.unlock()
case <-lb.stop:
lb.ticker.Stop()
lb.status = false
return
}
}
}()
}
func (lb *LeakyBucket) Close() {
close(lb.stop)
}
其他方法:
// 返回容量
func (lb *LeakyBucket) Capacity() uint {
return lb.capacity
}
// 桶里剩余容量
func (lb *LeakyBucket) Remaining() uint {
return lb.remaining
}
// 重置桶
func (lb *LeakyBucket) Reset() time.Time {
lb.remaining = lb.capacity
// 更新reset时间为一个rate后,这样就不用加锁了
lb.reset = time.Now().Add(rate)
return lb.reset
}
// 往桶里加请求
func (lb *LeakyBucket) Add(amount uint) (bool, error) {
lb.mutex.Lock()
defer lb.mutex.Unlock()
// 时间在重置前那就重置后再取
if time.Now().After(lb.reset) {
lb.reset = time.Now().Add(lb.rate)
lb.remaining = lb.capacity
}
if amount > lb.remaining {
return false, errors.New("too many")
}
lb.remaining -= amount
return true, nil
}
漏桶的问题:
漏桶的出水速度固定,也就是请求放行速度是固定的,因此漏桶不能有效应对突发流量,但是能起到平滑突发流量(整流)的作用。
漏桶出口的速度固定,不能灵活的应对后端能力提升,比如,通过动态扩容,后端流量从1000QPS提升到1WQPS,漏桶没有办法。
令牌桶限流器
令牌桶算法以一个设定的速率产生令牌并放入令牌桶,每次用户请求都得申请令牌,如果令牌不足,则拒绝请求。 令牌桶算法中新请求到来时会从桶里拿走一个令牌,如果桶内没有令牌可拿,就拒绝服务。当然,令牌的数量也是有上限的。令牌的数量与时间和发放速率强相关,时间流逝的时间越长,会不断往桶里加入越多的令牌,如果令牌发放的速度比申请速度快,令牌桶会放满令牌,直到令牌占满整个令牌桶。
type TookenBucket struct {
capacity uint
tooken uint
ticker *time.Ticker
reset time.Time
rate time.Duration
mutex sync.Mutex
stop chan struct{}
status bool
}
func NewTookenBucket(capacity uint, rate time.Duration) (*TookenBucket, error) {
return &TookenBucket{
capacity: capacity,
tooken: 0,
rate: rate,
status: false,
}
}
-
capacity
桶的容量 -
tooken
剩余的tooken -
ticker
计时器 -
reset
重置的时间 -
rate
漏桶的速率 -
mutes
互斥锁 -
stop
关闭信号管道 -
status
桶的状态
启动和关闭:
func (lb *TookenBucket) Start() {
tb.ticker = time.NewTicker(swrl.rate)
tb.status = true
tb.reset = time.Now().Add(rate)
go func() {
for {
select {
case <-tb.ticker.C:
tb.mutex.lock()
if tb.tooken < lb.capacity {
tb.tooken += 1
}
tb.mutex.unlock()
case <-tb.stop:
tb.ticker.Stop()
tb.status = false
return
}
}
}()
}
func (tb *TookenBucket) Close() {
close(tb.stop)
}
其他方法:
// 返回容量
func (tb *TookenBucket) Capacity() uint {
return tb.capacity
}
// 桶里tooken
func (tb *TookenBucket) Cheak() uint {
return tb.tooken
}
// 重置桶
func (tb *TookenBucket) Reset() time.Time {
tb.tooken = 0
// 更新reset时间为一个rate后,这样就不用加锁了
tb.reset = time.Now().Add(rate)
return tb.reset
}
// 往桶里取令牌
func (tb *TookenBucket) Took(amount uint) (bool, error) {
tb.mutex.Lock()
defer tb.mutex.Unlock()
// 时间在重置前那就重置后再取
if time.Now().After(tb.reset) {
tb.reset = time.Now().Add(tb.rate)
tb.token = 0
return false, nil
}
if amount > tb.tooken {
return false, errors.New("too many")
}
tb.tooken -= amount
return true, nil
}
总结
以上就是四种限流方法实现,一般在高并发实战中,采用漏桶+令牌桶。