在Python中实现客户端限制API访问速率
限速算法
交易所或网站一般会限制每个API的访问速率,在服务器端限流主要有漏桶和令牌桶两种实现方式,参考限流算法之漏桶与令牌桶 。具体实现也有许多现成工具,此处不赘述。
不过据我观察与测试,OKEx API的限流方式既不是漏桶也不是令牌桶,经搜索发现其应是滑动窗口算法。以其历史资金费API 为例,文档里写着:
限速: 10次/2s
限速规则:IP +instrumentID
经验证其规则如下:
∀ t ∈ R, 在t到t+2之间请求数 ≤ 10
说中文就是:在时间轴上任意2秒内,请求数小于等于10个。比如当t ∈ [0, 2]
,10个请求可以全在[0, 0.1]
内发出,此后从t=0.1
到t=2
的所有请求都会被拒绝。假设第一次请求发生在t=0
,第二次请求在t=0.1
,则第11次请求必须在t=2
之后,第12次必须在t=2.1
以后。因为允许并发请求,这显然不是漏桶算法。由于在t ∈ [0.1, 2]
不再补充令牌,这应该也不是令牌桶算法。
如果客户端根据漏桶算法限制API访问速率,则需要以0.2秒的间隔发送请求,这虽然符合了OKEx的要求,但无法并发,牺牲了效率。
若是遵从令牌桶算法,桶的容量应为10个令牌,因为最多同时可以发10个请求,若如此令牌添加速率就成了问题。假设每x秒添加一个令牌,且在t=0
一下子发送了10个请求,要是x < 2
,会出现2秒内发送了11次请求,被服务器拒绝。要是x ≥ 2
,则无法达到平均0.2秒一次的访问速率。如果规定x = 0.2
,则令牌桶容量必须为1,变成和漏桶一样,无法并发。
不过反过来,要是一种算法满足OKEx限速的要求,则其必然满足令牌桶算法的要求,假设桶的容量为10而令牌添加速率为每0.2秒一个,因为服务器在任意2秒内至少可以处理10次请求。
实现方式
关于如何在Python里进行客户端限流,网上已有许多文章,不过没有一个是我满意的。大部分都提到了Semaphore,其中很多只是用Semaphore限制了并发连接数, 并没有限制请求速率。而触及限制请求速率的,其实现非常粗糙,在asyncio框架下,大致如下:
sem = asyncio.Semaphore(10)
async def example(query):
async with sem:
await request(query)
await asyncio.sleep(2)
这种实现,虽然保证了2秒内不多于10次请求,但每次请求都要额外多等2秒,即便一共只有1个请求。敏锐的读者可能已经察觉到这里有优化空间,如果改成
Semaphore(5)
和sleep(1)
,同样是10次/2s,但每个协程只需要等1秒。如果改成Semaphore(2)
和sleep(0.4)
,每个协程只需等0.4秒。
如果是Semaphore(1)
和sleep(0.2)
,则只需等0.2秒。但这样还哪有并发,要asyncio干什么?完全变成了漏桶模式。
经过摸索,在我的OKEx程序中,访问速率限制由以下定制类实现,继承自asyncio.Semaphore
。
class REST_Semaphore(asyncio.Semaphore):
"""A custom semaphore to be used with REST API with velocity limit under asyncio
"""
def __init__(self, value: int, interval: int):
"""控制REST API访问速率
:param value: API limit
:param interval: Reset interval
"""
super().__init__(value)
# Queue of inquiry timestamps
self._inquiries = collections.deque(maxlen=value)
self._loop = asyncio.get_event_loop()
self._interval = interval
def __repr__(self):
return f'API velocity: {self._inquiries.maxlen} inquiries/{self._interval}s'
async def acquire(self):
await super().acquire()
if self._inquiries:
timelapse = time.monotonic() - self._inquiries.popleft()
# Wait until interval has passed since the first inquiry in queue returned.
if timelapse < self._interval:
await asyncio.sleep(self._interval - timelapse)
return True
def release(self):
self._inquiries.append(time.monotonic())
super().release()
其思路为,在self._value
从最大并发数降到0前,super().acquire
不阻塞。假设value = 10
,且同时有11个协程调用self.acquire
,在第11个协程调用时self._value = 0
,直到第1个请求返回,在self.release
里把self._value
加1且把时间戳加入self._inquiries
队列。
随后第11个协程退出super().acquire
,调用self._inquiries.popleft
获取第1个请求的时间戳,等到第1个请求返回的2秒后再发送请求。
在release
里而不是acquire
里self._inquiries.append
是因为如果记录的是发送时间,第1个请求传输到服务器的时间有可能比第11个请求的传输时间长,这样服务器接收到第11个请求的时间可能在第1个的2秒内。
有了这个类后,可以给每个API对应的方法安排一个类属性,分别控制不同API的访问速率。比如在publicAPI
类里添加一个类属性等于REST_Semaphore(10, 2)
,在get_historical_funding_rate
里以async with
调用,以下代码不再发生超速。否则即便把50改成11也会报错。
okex = await OKExAPI()
for _ in range(50):
tasks.append(asyncio.create_task(okex.publicAPI.get_historical_funding_rate('BTC-USDT-SWAP')))
res = await gather(*tasks)
在multiprocessing
下,一样可以限制多进程的并发数及访问速率,只需在创建进程时把p_Semaphore
实例传入,以上下文管理器调用。实现如下,多线程类似。
class p_Semaphore(ContextManager):
"""A custom semaphore to be used with REST API with velocity limit by processes
"""
def __init__(self, value: int, interval: int):
"""控制REST API并发连接
:param value: API limit
:param interval: Reset interval
"""
self._interval = interval
self._sem = multiprocessing.Semaphore(value)
# Queue of inquiry timestamps
self._inquiries = multiprocessing.Queue()
def __enter__(self):
self._sem.acquire()
if self._inquiries.qsize():
timelapse = time.monotonic() - self._inquiries.get()
# Wait until interval has passed since the first inquiry in queue returned.
if timelapse < self._interval:
time.sleep(self._interval - timelapse)
return True
def __exit__(self, *args):
self._inquiries.put(time.monotonic())
self._sem.release()
总结
要限制并发连接数和访问速率,只需在每次调用API时以上下文管理器调用以上Semaphore实例。创建实例时value
参数为最大并发数,interval
为刷新间隔。
以上实现遵从滑动窗口算法,且完全兼容令牌桶算法,只需让value
等于令牌桶容量,令value / interval
等于令牌填充速度。