【Go】-限流器的四种实现方法

news/2024/12/23 21:56:14 标签: golang, 开发语言, 后端

目录

关于限流和限流器

固定窗口限流器

滑动窗口限流器

漏桶限流器

令牌桶限流器

总结


关于限流和限流器

        限流(Rate Limiting)是一种控制资源使用率的机制,通常用于防止系统过载和滥用。

        限流器(Rate Limiter)是一种实现限流的组件或服务。以下是限流器的一些主要用途和原因:

  1. 防止系统过载:在高流量的情况下,如网络服务或API,限流可以确保系统不会因为过多的并发请求而崩溃。通过限制每个用户或客户端的请求频率,系统可以维持稳定的性能。

  2. 资源分配:在多用户环境中,限流可以确保所有用户都公平地使用资源,防止某些用户占用过多资源而影响其他用户。

  3. 成本控制:对于云服务和API提供商来说,限流可以帮助控制成本。通过限制免费层级用户的使用量,可以鼓励用户升级到付费服务。

  4. 提高用户体验:如果一个服务因为过载而变得缓慢或不可用,限流可以提高用户体验,因为它可以保证服务在高负载下的响应速度。

  5. 防止滥用:限流可以防止恶意用户或自动化脚本滥用服务,例如防止暴力破解密码或发送垃圾邮件。

  6. 服务降级:在某些情况下,系统可能会故意降低服务质量,以保护关键功能的正常运行。限流是实现服务降级的一种方式。

  7. 合规性:某些行业法规可能要求服务提供商限制数据传输速率,以符合隐私和安全标准。

  8. 缓存友好:限流可以减少对缓存系统的冲击,因为缓存系统可能无法处理非常高的请求率。

81ea9992faabfd618afa1453cb28d4a


固定窗口限流器

        这种限流器一时间为周期,用一个计数器配合定时器,限制周期内访问的次数。

4f56e37260c0de9cc877e8d17bdb590

type FixedWindowRateLimiter struct {
     windowSize time.Duration
     limit      uint64
     counter    uint64
     ticker     *time.Ticker
     stop       chan struct{}
     status     bool
 }
 ​
 func NewFixedWindowRateLimiter(windowSize time.Duration, limit uint64) *FixedWindowRateLimiter {
     now := uint64(time.Now().UnixNano())
     return &FixedWindowRateLimiter{
        windowSize: windowSize,
        limit:      limit,
        start:      now,
        stop:       make(chan struct{}),
        status:     false,
     }
 }
  • windowSize限流器周期

  • limit最大访问次数限制

  • counter计数器

  • ticker计时器

  • stop关闭信号管道

  • status限流器状态

启动和关闭:

        Start启动定时器用go协程处理周期更新和收到关闭信号,Close向关闭信号管道发送关闭信号。

 func (fwrl *FixedWindowRateLimiter) Start() {
     fwrl.ticker = time.NewTicker(fwrl.windowSize)
     fwrl.status = true
     go func() {
         for {
             select {
             case <-fwrl.ticker.C:
                 atomic.StoreUint64(&fwrl.counter, 0)
             case <-fwrl.stop:
                 fwrl.ticker.Stop()
                 fwrl.status = false
                 return
             }
         }
     }()
 }
 ​
 func (fwrl *FixedWindowRateLimiter) Close() {
     close(fwrl.stop)
 }

实现的方法:

 // 请求一次访问
 func (fwrl *FixedWindowRateLimiter) Do() bool {
     if !fwrl.status {
         return false
     }
     currentCounter := atomic.LoadUint64(&fwrl.counter)
     if currentCounter >= fwrl.limit {
         return false
     }
     atomic.AddUint64(&fwrl.counter, 1)
     return true
 }
 ​
 // 剩余可访问次数
 func (fwrl *FixedWindowRateLimiter) Check() uint64 {
     if !fwrl.status {
         return 0
     }
     return fwrl.limit - atomic.LoadUint64(&fwrl.counter)
 }
 ​
 // 更新并重启限流器
 func (fwrl *FixedWindowRateLimiter) Update(windowSize time.Duration, limit uint64) {
     fwrl.windowSize = windowSize
     fwrl.limit = limit
     if fwrl.status {
         fwrl.ticker.Stop()
         fwrl.Start()
     }
 }

关于原子操作:

        在代码中使用了atomic包的原子操作,目的是为了保证高并发读写下限流器的数据准确性。atomic包的实现在sre/internal/runtime/atomic,本质上使用汇编语言保证了操作的原子性。

        例如swapUint64函数中,调用.Xchg64的代码如下:

TEXT ·Xchg64(SB), NOSPLIT, $0-24
     MOVD    ptr+0(FP), R0
     MOVD    new+8(FP), R1
 #ifndef GOARM64_LSE
     MOVBU   internal∕cpu·ARM64+const_offsetARM64HasATOMICS(SB), R4
     CBZ     R4, load_store_loop
 #endif
     SWPALD  R1, (R0), R2
     MOVD    R2, ret+16(FP)
     RET
 #ifndef GOARM64_LSE
 load_store_loop:
     LDAXR   (R0), R2
     STLXR   R1, (R0), R3
     CBNZ    R3, load_store_loop
     MOVD    R2, ret+16(FP)
     RET
 #endif

计数器限流的严重问题: 这个算法虽然简单,但是有一个十分致命的问题,那就是临界问题,我们看下图:

img

        从上图中我们可以看到,假设有一个恶意用户,他在0:59时,瞬间发送了100个请求,并且1:00又瞬间发送了100个请求,那么其实这个用户在 1秒里面,瞬间发送了200个请求。

        我们刚才规定的是1分钟最多100个请求(规划的吞吐量),也就是每秒钟最多1.7个请求,用户通过在时间窗口的重置节点处突发请求, 可以瞬间超过我们的速率限制。

        用户有可能通过算法的这个漏洞,瞬间压垮我们的应用。


滑动窗口限流器

        跟固定窗口限流器差不多,只不过粒度更小了。

ed2ba998132a08adbb203869895441c

 type SlidingWindowRateLimiter struct {
     windowSize time.Duration
     subWindow  time.Duration
     limit      uint64
     counters   []uint64
     start      uint64
     ticker     *time.Ticker
     subticker  *time.Ticker
     stop       chan struct{}
     status     bool
 }
 ​
 func NewSlidingWindowRateLimiter(windowSize, subWindow time.Duration, limit uint64) (*SlidingWindowRateLimiter, error) {
     if windowSize <= subWindow {
         return nil, errors.New("wrong size")
     }
     numSubWindows := int(windowSize / subWindow)
     return &SlidingWindowRateLimiter{
         windowSize: windowSize,
         subWindow:  subWindow,
         limit:      limit,
         counters:   make([]uint64, numSubWindows),
         start:      uint64(time.Now().UnixNano()),
         stop:       make(chan struct{}),
     }, nil
 }
  • windowSize限流器周期

  • subWindow滑动窗口周期

  • limit最大访问次数限制

  • counters滑动窗口计数器切片

  • start周期开启时间

  • ticker周期计时器

  • subticker窗口计时器

  • stop关闭信号管道

  • status限流器状态

启动和关闭:

        滑窗是小粒度的固定窗口,计算index并且重置计数

 func (swrl *SlidingWindowRateLimiter) Start() {
     swrl.ticker = time.NewTicker(swrl.windowSize)
     swrl.subticker = time.NewTicker(swrl.subWindow)
     swrl.status = true
     go func() {
         for {
             select {
             case <-swrl.subticker.C:
                 now := uint64(time.Now().UnixNano())
                 index := int((now - swrl.start) / uint64(swrl.subWindow))
                 atomic.StoreUint64(&swrl.counters[index%len(swrl.counters)], 0)
             case <-swrl.ticker.C:
                 swrl.start = uint64(time.Now().UnixNano())
             case <-swrl.stop:
                 swrl.ticker.Stop()
                 swrl.status = false
                 return
             }
         }
     }()
 }
 ​
 func (swrl *SlidingWindowRateLimiter) Close() {
     close(swrl.stop)
 }

其他方法:

        这里Do和Check都是要计算周期内每个滑窗总和

func (swrl *SlidingWindowRateLimiter) Do() bool {
     if !fwrl.status {
         return false
     }
     now := uint64(time.Now().UnixNano())
     startIndex := int((now - swrl.start) / uint64(swrl.subWindow))
     endIndex := int((now - swrl.start + uint64(swrl.windowSize)) / uint64(swrl.subWindow))
     // 计算周期内每个滑窗总和是否大于limit
     sum := uint64(0)
     for i := startIndex; i <= endIndex; i++ {
         index := i
         if index >= len(swrl.counters) {
             index -= len(swrl.counters)
         }
         sum += atomic.LoadUint64(&swrl.counters[index])
     }
     if sum >= swrl.limit {
         return false
     }
     // 增加当前子窗口的计数
     currentIndex := int((now - swrl.start) / uint64(swrl.subWindow))
     atomic.AddUint64(&swrl.counters[currentIndex], 1)
     return true
 }
 ​
 ​
 func (swrl *SlidingWindowRateLimiter) Check() uint64 {
     if !fwrl.status {
         return 0
     }
     now := uint64(time.Now().UnixNano())
     startIndex := int((now - swrl.start) / uint64(swrl.subWindow))
     endIndex := int((now - swrl.start + uint64(swrl.windowSize)) / uint64(swrl.subWindow))
     sum := uint64(0)
     for i := startIndex; i <= endIndex; i++ {
         index := i
         if index >= len(swrl.counters) {
             index -= len(swrl.counters)
         }
         sum += atomic.LoadUint64(&swrl.counters[index])
     }
     return swrl.limit - sum
 }
 ​
 func (swrl *SlidingWindowRateLimiter) Update(windowSize time.Duration, subWindow time.Duration, limit uint64) {
     swrl.windowSize = windowSize
     swrl.limit = limit
     swrl.subWindow = subWindow
     if swrl.status {
         swrl.ticker.Stop()
         numSubWindows := int(windowSize / swrl.subWindow)
         swrl.counters = make([]uint64, numSubWindows)
         swrl.Start()
     }
 }

漏桶限流器

        漏桶,可以想象成一个木桶下面钻了一个小孔,把水倒进来就是请求访问,水漏出去就是允许请求。

        理论上小孔流出水的速率是不变的,也就是允许请求的速率是不变的,这就是漏桶的特点,你可以随便倒水,但是流出水速率恒定,实现按了平稳的访问速率。

a93dc412009e69880b771bf38aac777

 type LeakyBucket struct {
     capacity  uint
     remaining uint
     ticker    *time.Ticker
     reset     time.Time
     rate      time.Duration
     mutex     sync.Mutex
     stop       chan struct{}
     status    bool
 }
 ​
 func NewLeakyBucket(capacity uint, rate time.Duration) (*LeakyBucket, error) {
     return &LeakyBucket{
         capacity:   capacity,
         remaining:  capacity,
         rate:       rate,
         status:     false,
     }
 }
  • capacity桶的容量

  • remaining剩余的容量

  • ticker计时器

  • reset重置的时间

  • rate漏桶的速率

  • mutes互斥锁

  • stop关闭信号管道

  • status桶的状态

启动和关闭:

 func (lb *LeakyBucket) Start() {
     lb.ticker = time.NewTicker(swrl.rate)
     lb.status = true
     lb.reset = time.Now().Add(rate)
     go func() {
         for {
             select {
             case <-lb.ticker.C:
                 lb.mutex.lock()
                 if lb.remaining < lb.capacity {
                     lb.remaining += 1
                 }
                 lb.mutex.unlock()
             case <-lb.stop:
                 lb.ticker.Stop()
                 lb.status = false
                 return
             }
         }
     }()
 }
 ​
 func (lb *LeakyBucket) Close() {
     close(lb.stop)
 }

其他方法:

// 返回容量
 func (lb *LeakyBucket) Capacity() uint {
     return lb.capacity
 }
 ​
 // 桶里剩余容量
 func (lb *LeakyBucket) Remaining() uint {
     return lb.remaining
 }
 ​
 // 重置桶
 func (lb *LeakyBucket) Reset() time.Time {
     lb.remaining = lb.capacity
     // 更新reset时间为一个rate后,这样就不用加锁了
     lb.reset = time.Now().Add(rate)
     return lb.reset
 }
 ​
 // 往桶里加请求
 func (lb *LeakyBucket) Add(amount uint) (bool, error) {
     lb.mutex.Lock()
     defer lb.mutex.Unlock()
     // 时间在重置前那就重置后再取
     if time.Now().After(lb.reset) {
         lb.reset = time.Now().Add(lb.rate)
         lb.remaining = lb.capacity
     }
     if amount > lb.remaining {
         return false, errors.New("too many")
     }
     lb.remaining -= amount
     return true, nil
 }

漏桶的问题:

        漏桶的出水速度固定,也就是请求放行速度是固定的,因此漏桶不能有效应对突发流量,但是能起到平滑突发流量(整流)的作用。

        漏桶出口的速度固定,不能灵活的应对后端能力提升,比如,通过动态扩容,后端流量从1000QPS提升到1WQPS,漏桶没有办法。


令牌桶限流器

        令牌桶算法以一个设定的速率产生令牌并放入令牌桶,每次用户请求都得申请令牌,如果令牌不足,则拒绝请求。 ​ 令牌桶算法中新请求到来时会从桶里拿走一个令牌,如果桶内没有令牌可拿,就拒绝服务。当然,令牌的数量也是有上限的。令牌的数量与时间和发放速率强相关,时间流逝的时间越长,会不断往桶里加入越多的令牌,如果令牌发放的速度比申请速度快,令牌桶会放满令牌,直到令牌占满整个令牌桶。

a93dc412009e69880b771bf38aac777

type TookenBucket struct {
     capacity  uint
     tooken    uint
     ticker    *time.Ticker
     reset     time.Time
     rate      time.Duration
     mutex     sync.Mutex
     stop       chan struct{}
     status    bool
 }
 ​
 func NewTookenBucket(capacity uint, rate time.Duration) (*TookenBucket, error) {
     return &TookenBucket{
         capacity:   capacity,
         tooken:     0,
         rate:       rate,
         status:     false,
     }
 }
  • capacity桶的容量

  • tooken剩余的tooken

  • ticker计时器

  • reset重置的时间

  • rate漏桶的速率

  • mutes互斥锁

  • stop关闭信号管道

  • status桶的状态

启动和关闭:

 func (lb *TookenBucket) Start() {
     tb.ticker = time.NewTicker(swrl.rate)
     tb.status = true
     tb.reset = time.Now().Add(rate)
     go func() {
         for {
             select {
             case <-tb.ticker.C:
                 tb.mutex.lock()
                 if tb.tooken < lb.capacity {
                     tb.tooken += 1
                 }
                 tb.mutex.unlock()
             case <-tb.stop:
                 tb.ticker.Stop()
                 tb.status = false
                 return
             }
         }
     }()
 }
 ​
 func (tb *TookenBucket) Close() {
     close(tb.stop)
 }

其他方法:

 // 返回容量
 func (tb *TookenBucket) Capacity() uint {
     return tb.capacity
 }
 ​
 // 桶里tooken
 func (tb *TookenBucket) Cheak() uint {
     return tb.tooken
 }
 ​
 // 重置桶
 func (tb *TookenBucket) Reset() time.Time {
     tb.tooken = 0
     // 更新reset时间为一个rate后,这样就不用加锁了
     tb.reset = time.Now().Add(rate)
     return tb.reset
 }
 ​
 // 往桶里取令牌
 func (tb *TookenBucket) Took(amount uint) (bool, error) {
     tb.mutex.Lock()
     defer tb.mutex.Unlock()
     // 时间在重置前那就重置后再取
     if time.Now().After(tb.reset) {
         tb.reset = time.Now().Add(tb.rate)
         tb.token = 0
         return false, nil
     }
     if amount > tb.tooken {
         return false, errors.New("too many")
     }
     tb.tooken -= amount
     return true, nil
 }

总结

        以上就是四种限流方法实现,一般在高并发实战中,采用漏桶+令牌桶。


http://www.niftyadmin.cn/n/5797050.html

相关文章

LeetCode169. 多数元素(2024冬季每日一题 39)

给定一个大小为 n 的数组 nums &#xff0c;返回其中的多数元素。多数元素是指在数组中出现次数 大于 ⌊ n/2 ⌋ 的元素。 你可以假设数组是非空的&#xff0c;并且给定的数组总是存在多数元素。 示例 1&#xff1a; 输入&#xff1a;nums [3,2,3] 输出&#xff1a;3 示例 2…

RFdiffusion get_torsions函数解读

函数功能 get_torsions 函数根据输入的原子坐标(xyz_in)和氨基酸序列(seq),计算一组主链和侧链的扭转角(torsions)。同时生成备用扭转角(torsions_alt),用于表示可以镜像翻转的几何结构,并返回掩码(tors_mask)和是否平面化(tors_planar)的信息。 输入参数 xyz…

Spark-Streaming集成Kafka

Spark Streaming集成Kafka是生产上最多的方式&#xff0c;其中集成Kafka 0.10是较为简单的&#xff0c;即&#xff1a;Kafka分区和Spark分区之间是1:1的对应关系&#xff0c;以及对偏移量和元数据的访问。与高版本的Kafka Consumer API 集成时做了一些调整&#xff0c;下面我们…

深入理解 Java 中的 ArrayList 和 List:泛型与动态数组

深入理解 Java 中的 ArrayList 和 List&#xff1a;泛型与动态数组 在 Java 编程中&#xff0c;ArrayList 和 List 是最常用的集合类之一。它们帮助我们管理动态数据&#xff0c;支持按索引访问、增加、删除元素等操作。尤其在使用泛型时&#xff0c;理解它们之间的关系及应用…

未来将要被淘汰的编程语言

COBOL - 这是一种非常古老的语言&#xff0c;主要用于大型企业系统和政府机构。随着老一代IT工作人员的退休&#xff0c;COBOL程序员变得越来越少。Fortran - 最初用于科学和工程计算&#xff0c;Fortran在特定领域仍然有其应用&#xff0c;但随着更现代的语言&#xff08;如Py…

C++中处理对象的状态变化

在C中&#xff0c;处理对象的状态变化通常涉及多个方面&#xff0c;包括封装、观察者模式、状态模式、事件系统等。以下是几种常见的方法和策略&#xff1a; 1. 封装 (Encapsulation) 封装是面向对象编程的基本原则之一&#xff0c;它通过将对象的状态&#xff08;属性&#x…

如何调用yolov8的模型(restful和c++)

文章目录 方法一、通过RESTful API调用(推荐)第一步:部署yolo8服务端第二步:java中调用api方法二、JNI调用(本地调用)第一步:编写c/c++封装代码第二步:生成jni头文件和动态库第三步:在java中调用jni函数其他资料导出模型实际应用的语句1.静态图片分类推理2.静态图片目…

门户文件在线预览如何实现?

1.在线预览方案优劣介绍 1、在线预览方案客户&#xff0c;现在有3个方案&#xff1a; a、Aspose组件,收费是2999美元&#xff0c;折合人民币20000左右,具体可以上官网看看&#xff1a;这个在线预览插件的直接获取的pdf流, b、通过JACOB实现Office文档转换为PDF&#xff0c;Ite…