Understanding Rate Limiting: Leaky Bucket and Token Bucket Algorithms with a Python Example
This article explains the principles of leaky‑bucket and token‑bucket rate‑limiting algorithms, compares their behavior in high‑concurrency e‑commerce scenarios, and provides a complete Python implementation to illustrate how token‑bucket can handle burst traffic while maintaining system stability.
High‑concurrency e‑commerce systems often face spikes such as flash sales, and to keep services stable they typically rely on three techniques: caching, degradation, and rate limiting.
The article introduces two widely used rate‑limiting algorithms: the leaky‑bucket algorithm and the token‑bucket algorithm.
The leaky‑bucket algorithm works like a bucket that leaks water at a constant rate; incoming requests are added to the bucket, and if the arrival rate exceeds the leak rate, excess requests overflow and are dropped, enforcing a strict throughput limit.
The token‑bucket algorithm refills the bucket with tokens at a fixed rate; a request can proceed only if a token is available, allowing occasional bursts when enough tokens have accumulated, which makes it more flexible for handling sudden traffic spikes.
A comparison highlights that the leaky bucket enforces a rigid output rate regardless of system idleness, while the token bucket controls the average rate but also permits limited bursts, making it preferable in most high‑traffic scenarios.
The article then provides a Python implementation of the token‑bucket algorithm, including a
# encoding: utf-8
"""
author: [email protected]
time: 2020/9/9 10:43 PM
func:
"""
import time
import multiprocessing
TEST = {
'all': {
'test1': {'test1': 20},
'test2': {'test2': 50},
'test3': {'test3': 80},
'test4': {'test4': 100},#表示突发100个请求
'test5': {'test5': 200},#表示突发200个请求
'test6': {'test6': 20},
}
}
class TokenBucket(object):
# rate是令牌发放速度,capacity是桶的大小
def __init__(self, rate, capacity):
self._rate = rate
self._capacity = capacity
self._current_amount = 0
self._last_consume_time = int(time.time())
# token_amount是发送数据需要的令牌数
def consume(self, token_amount):
time.sleep(1)
# 计算从上次发送到这次发送,新发放的令牌数量
increment = (int(time.time()) - self._last_consume_time) * self._rate
# 令牌数量不能超过桶的容量
self._current_amount = min(
increment + self._current_amount, self._capacity)
# 如果没有足够的令牌,则不能发送数据
if token_amount > self._current_amount:
return False
self._last_consume_time = int(time.time())
self._current_amount -= token_amount
return True
def job():
i = 100
while i>1:
for result in result_dict.values():
key = tuple(result.keys())[0]
rate = tuple(result.values())[0]
i = i-1
if i <= 0:
break
if not limiter.consume(rate):
print(key + ' 限流')
else:
print(key + ' 正常')
def run():
threads = [multiprocessing.Process(target=job) for i in range(3)]
for thread in threads:
thread.start()
if __name__ == '__main__':
result_dict = TEST["all"]
RATE = 30
CAPACITY = 120
limiter = TokenBucket(RATE, CAPACITY)
run()that demonstrates token generation, consumption, and a simple multiprocessing test harness.
The author notes that the example is straightforward and can be adapted to specific business scenarios where rate limiting is required.
References to additional articles and GitHub repositories are provided for readers who wish to explore the algorithms further.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Aikesheng Open Source Community
The Aikesheng Open Source Community provides stable, enterprise‑grade MySQL open‑source tools and services, releases a premium open‑source component each year (1024), and continuously operates and maintains them.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
