Rate limiter Implementation ( Token bucket )

Checking if request from specific user (based on IP) is allowed to access service. Checking for tokens left.


   ...
   ...
   
	allow := common.GetIpRateLimiterInstance().AllowRequest(ReadUserIP(r))

	if !allow {
		go monitor.GetInstance().APIReport("AllowRequest", constant.ResponseStatusFail,
			constant.ErrTooManyRequest.Error(), start)
		return nil, constant.ErrTooManyRequest
	}
	...
	...
	

Implementation :

import "golang.org/x/time/rate"

// IPRateLimiter .
type IPRateLimiter struct {
	mu      *sync.RWMutex
	apiRate rate.Limit
	bToken  int
	//LRU cache, store maxIpList pair in cache
	ipCache *Cache
}

var maxIpList = 1000000
var limiterInstance *IPRateLimiter = nil

// NewIPRateLimiter create a new ip rate limiter     <----
func NewIPRateLimiter(apiRate int64) *IPRateLimiter {
	apiRates := rate.Limit(apiRate)
	// apiRate per second, maximum bToken consume at any time
	// there are two api, so total request per second will be 2 * apiRate per ip
	return &IPRateLimiter{
		mu:      &sync.RWMutex{},
		apiRate: apiRates,
		bToken:  2 * int(apiRates),
		ipCache: NewCacheInstance(maxIpList),
	}
}

// Get new instance to verify limits  <---
func GetIpRateLimiterInstance() *IPRateLimiter {       
	if limiterInstance == nil {
		limiterInstance = NewIPRateLimiter(config.GetApiRateLimit())     // <-- Its 10 req/sec
	}
	return limiterInstance
}

// AddIP creates a new rate limiter and adds it to the ip list map,
func (i *IPRateLimiter) AddIP(ip string) *rate.Limiter {
	i.mu.Lock()
	defer i.mu.Unlock()
	// already in map
	if limiter := i.ipCache.Get(ip); limiter != nil {
		return limiter
	}
	newLimiter := rate.NewLimiter(i.apiRate, i.bToken)

	i.ipCache.Put(ip, newLimiter)

	return newLimiter
}

// DeleteIP delete ip from rate limit map
func (i *IPRateLimiter) DeleteIP(ip string) {
	i.mu.Lock()
	defer i.mu.Unlock()
	i.ipCache.Delete(ip)

}

// GetLimiter get limiter by ip address, if not exit, create new one
func (i *IPRateLimiter) GetLimiter(ip string) *rate.Limiter {
	i.mu.Lock()
	limiter := i.ipCache.Get(ip)
	i.mu.Unlock()
	// release lock before addIP to avoid concurrency issue
	if limiter == nil {
		return i.AddIP(ip)
	}
	return limiter
}

// AllowRequest check request exceed rate or not   <---
func (i *IPRateLimiter) AllowRequest(ip string) bool {
	limit := i.GetLimiter(ip)
	allow := limit.Allow()
	return allow
}

# FYI :
------

config.GetApiRateLimit() 

func GetApiRateLimit() int64 {
	if conf.ApiRateLimit <= 0 {
		conf.ApiRateLimit = constant.DefaultApiRate
	}
	return conf.ApiRateLimit
}

DefaultApiRate   = 10

Additionally local Cache (LRU) to store these tokens per user. [This can be moved to Redis also, but that not critical/urgent as its okay to loose count, if crash happens ]

import (
	"golang.org/x/time/rate"
	"sync"
)

type Cache struct {
	Head, Tail *Node
	content    map[string]*Node
	Capacity   int
}

type Node struct {
	Prev, Next *Node
	Key        string
	Value      *rate.Limiter
}

var cacheInstance *Cache
var once sync.Once

func NewCacheInstance(capacity int) *Cache {
	once.Do(func() {
		cacheInstance = Constructor(capacity)
	})
	return cacheInstance
}
func Constructor(capacity int) *Cache {
	head := Node{
		Prev:  nil,
		Next:  nil,
		Key:   "",
		Value: nil,
	}

	tail := Node{
		Prev:  nil,
		Next:  nil,
		Key:   "",
		Value: nil,
	}

	head.Next = &tail
	tail.Prev = &head
	return &Cache{
		Head:     &head,
		Tail:     &tail,
		content:  make(map[string]*Node),
		Capacity: capacity,
	}
}

func (this *Cache) Get(key string) *rate.Limiter {
	if node, ok := this.content[key]; ok {
		this.remove(node)
		this.insert(node)
		return node.Value
	}

	return nil
}

func (this *Cache) Put(key string, value *rate.Limiter) {
	if _, ok := this.content[key]; ok {
		this.remove(this.content[key])
	}

	if len(this.content) == this.Capacity {
		this.remove(this.Tail.Prev)
	}

	this.insert(&Node{
		Prev:  nil,
		Next:  nil,
		Key:   key,
		Value: value,
	})
}

func (this *Cache) Delete(key string) {
	if node, ok := this.content[key]; ok {
		this.remove(node)
	}
}

func (this *Cache) remove(node *Node) {
	delete(this.content, node.Key)
	node.Prev.Next = node.Next
	node.Next.Prev = node.Prev
}

func (this *Cache) insert(node *Node) {
	this.content[node.Key] = node
	next := this.Head.Next
	this.Head.Next = node
	node.Prev = this.Head
	next.Prev = node
	node.Next = next
}

Comments (0)