现在,LLMs无处不在,尤其是ChatGPT。有很多应用是建立在它的基础上的,如果你还没有尝试过,你应该试一试。
在ChatGPT的基础上构建应用程序可能需要你进行多个并行调用。不幸的是,你并不是唯一一个。由于有太多的应用程序每天执行数百万次请求(顺便说一句,他们的工程团队真了不起),API经常会返回一个“请求过多”的错误。所以我们需要一个好的方法来处理这种错误,同时进行多个并行调用。
在这个小Python教程中,我们将涵盖这两个重要的主题,以有效地对ChatGPT API进行调用:
1. 并行执行多个调用
执行调用的最简单方法是同步执行,即发送请求并等待响应到达以继续程序。我们可以简单地这样做:
import requests
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {OPENAI_API_KEY}"
}
response_json = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "ping"}],
"temperature": 0
}).json()
print(response_json["choices"][0]["message"]["content"])
Pong!
如果我们在一个简单的系统中工作,这是可以的,但是,如果我们想要并行地对一个API或其他资源(如数据库)进行多个调用,我们可以异步地进行,以获得更快的响应。
异步执行任务将触发每个动作,并等待它们并行地完成,这将减少等待时间。
做到这一点的一个基本方法是创建不同的线程来处理每个请求,但是,使用异步调用有一个更好的方法。
使用异步调用通常更有效率,因为你可以指定你的应用程序应该等待的确切位置,而在传统的线程中,系统会自动让线程等待,这可能是次优的。
下面我们展示了一个例子,显示了使用同步和异步调用之间的区别。
# Sync call
import time
def delay_print(msg):
print(msg, end=" ")
time.sleep(1)
def sync_print():
for i in range(10):
delay_print(i)
start_time = time.time()
sync_print()
print("\n", time.time() - start_time, "seconds.")
0 1 2 3 4 5 6 7 8 9
10.019574642181396 seconds.
#Async Call
import asyncio
async def delay_print_async(msg):
print(msg, end=" ")
await asyncio.sleep(1)
async def async_print():
asyncio.gather(*[delay_print_async(i) for i in range(10)])
start_time = time.time()
await async_print()
print("\n", time.time() - start_time, "seconds.")
0.0002448558807373047 seconds.
0 1 2 3 4 5 6 7 8 9
asyncio.gather方法将触发所有传递给它的异步调用,并在它们准备好后返回它们的结果。
不幸的是,使用requests库进行异步调用是不可能的。要做到这一点,你可以使用aiohttp库。下面有一个如何使用aiohttp进行异步调用的例子。
import aiohttp
async def get_completion(content):
async with aiohttp.ClientSession() as session:
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": content}],
"temperature": 0
}) as resp:
response_json = await resp.json()
return response_json["choices"][0]['message']["content"]
await get_completion("Ping")
Pong!
正如之前所说,要进行异步请求,我们需要使用asyncio.gather方法。
async def get_completion_list(content_list):
return await asyncio.gather(*[get_completion(content) for content in content_list])
await get_completion_list(["ping", "pong"]*5)
['Pong!',虽然这样做是可行的,但这样进行调用并不理想,因为我们每次调用都要重新创建会话对象。我们可以通过重用同一个会话对象来节省资源和时间,像这样:
'Ping!',
'Pong!',
'Ping!',
'Pong!',
'Ping!',
'Pong!',
'Ping!',
'Pong!',
'Ping!']
async def get_completion(content, session):
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": content}],
"temperature": 0
}) as resp:
response_json = await resp.json()
return response_json["choices"][0]['message']["content"]
async def get_completion_list(content_list):
async with aiohttp.ClientSession() as session:
return await asyncio.gather(*[get_completion(content, session) for content in content_list])
await get_completion_list(["ping", "pong"]*5)
很简单,对吧?有了这个,你就可以轻松地进行多个调用。然而,一个问题是,通常这样进行无限制的调用并不是一个好的做法,因为你可能会使一个系统过载,并被惩罚,阻止你在一段时间内进行额外的请求(相信我,你会的)。所以,限制你同时可以进行的调用数量是一个好主意。你可以用asyncio.Semaphore类来轻松地做到这一点。
Semaphore类创建了一个上下文管理器,它将管理在其上下文中当前正在执行的异步调用的数量。如果达到最大数量,它将阻塞,直到一些调用完成。
async def get_completion(content, session, semaphore):
async with semaphore:
await asyncio.sleep(1)
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": content}],
"temperature": 0
}) as resp:
response_json = await resp.json()
return response_json["choices"][0]['message']["content"]
async def get_completion_list(content_list, max_parallel_calls):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
async with aiohttp.ClientSession() as session:
return await asyncio.gather(*[get_completion(content, session, semaphore) for content in content_list])
start_time = time.perf_counter()
completion_list = await get_completion_list(["ping", "pong"]*5, 100)
print("Time elapsed: ", time.perf_counter() - start_time, "seconds.")
print(completion_list)
Time elapsed: 1.8094507199984946 seconds.
['Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!']
在这里,一个可选的事情是报告调用的进度如何。你可以通过创建一个小类来保存进度,并在所有调用中共享。你可以像下面这样做:
class ProgressLog:
def __init__(self, total):
self.total = total
self.done = 0
def increment(self):
self.done = self.done + 1
def __repr__(self):
return f"Done runs {self.done}/{self.total}."
async def get_completion(content, session, semaphore, progress_log):
async with semaphore:
await asyncio.sleep(1)
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": content}],
"temperature": 0
}) as resp:
response_json = await resp.json()
progress_log.increment()
print(progress_log)
return response_json["choices"][0]['message']["content"]
async def get_completion_list(content_list, max_parallel_calls):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
progress_log = ProgressLog(len(content_list))
async with aiohttp.ClientSession() as session:
return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])
start_time = time.perf_counter()
completion_list = await get_completion_list(["ping", "pong"]*5, 100)
print("Time elapsed: ", time.perf_counter() - start_time, "seconds.")
print(completion_list)
Done runs 1/10.
Done runs 2/10.
Done runs 3/10.
Done runs 4/10.
Done runs 5/10.
Done runs 6/10.
Done runs 7/10.
Done runs 8/10.
Done runs 9/10.
Done runs 10/10.
Time elapsed: 1.755018908999773 seconds.
['Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!']
这就完成了关于如何进行多个异步请求的部分。有了这个,你可以进行多个异步调用,限制每次的调用数量,并报告进度。然而,还有一些问题需要处理。
发出的请求可能会因为各种不同的原因失败,比如服务器过载、连接中断、错误请求等。这些可能会产生异常或返回不可预测的响应,所以我们需要处理这些情况,并自动重试失败的调用。
2. 重试调用以防失败
为了处理失败的调用,我们将使用tenacity库。Tenacity提供了函数装饰器,它们会在函数调用产生异常的情况下自动重试。
from tenacity import (
retry,
stop_after_attempt,
wait_random_exponential,
)
为了给我们的调用提供重试功能,我们需要在函数前加上@retry装饰器。如果不指定额外的参数,那么函数在失败后会立即并无限地重试。这样做有一些不好的原因。
一个原因是我们的函数调用可能因为服务器过载而失败,这就需要我们在再次尝试之前等待一段时间。为了指定等待时间,我们将使用指数退避的方法,使用参数wait=wait_random_exponential(min=min_value, max=max_value)。这样会让等待时间随着函数失败的次数增加而增加。
另一个可选的事情是在每次重试发生时记录日志信息。我们可以通过给参数before_sleep提供一个函数来实现。这里我们使用print函数,但是更好的方法是使用logging模块,并且给这个参数传递一个logging.error或logging.debug函数。
为了演示,我们将生成随机的异常。
import random
class ProgressLog:
def __init__(self, total):
self.total = total
self.done = 0
def increment(self):
self.done = self.done + 1
def __repr__(self):
return f"Done runs {self.done}/{self.total}."
@retry(wait=wait_random_exponential(min=1, max=60), before_sleep=print)
async def get_completion(content, session, semaphore, progress_log):
async with semaphore:
#await asyncio.sleep(1)
if random.random() < 0.2:
raise Exception("Random exception")
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": content}],
"temperature": 0
}) as resp:
response_json = await resp.json()
progress_log.increment()
print(progress_log)
return response_json["choices"][0]['message']["content"]
async def get_completion_list(content_list, max_parallel_calls):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
progress_log = ProgressLog(len(content_list))
async with aiohttp.ClientSession() as session:
return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])
start_time = time.perf_counter()
completion_list = await get_completion_list(["ping", "pong"]*5, 100)
print("Time elapsed: ", time.perf_counter() - start_time, "seconds.")
print(completion_list)
<RetryCallState 133364377433616: attempt #1; slept for 0.74; last result: failed (Exception Random exception)>
<RetryCallState 133364377424496: attempt #1; slept for 0.79; last result: failed (Exception Random exception)>
Done runs 1/10.
Done runs 2/10.
Done runs 3/10.
Done runs 4/10.
Done runs 5/10.
Done runs 6/10.
Done runs 7/10.
Done runs 8/10.
Done runs 9/10.
Done runs 10/10.
Time elapsed: 1.1305301820011664 seconds.
['Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!']
这样会让我们的函数在重试之前等待一段时间。但是,失败的原因可能是系统性的,比如服务器宕机或者错误的负载。在这种情况下,我们希望我们的重试次数有限。我们可以用参数stop=stop_after_attempt(n)来实现。
import random
class ProgressLog:
def __init__(self, total):
self.total = total
self.done = 0
def increment(self):
self.done = self.done + 1
def __repr__(self):
return f"Done runs {self.done}/{self.total}."
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(2), before_sleep=print)
async def get_completion(content, session, semaphore, progress_log):
async with semaphore:
#await asyncio.sleep(1)
if random.random() < 0.9:
raise Exception("Random exception")
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": content}],
"temperature": 0
}) as resp:
response_json = await resp.json()
progress_log.increment()
print(progress_log)
return response_json["choices"][0]['message']["content"]
async def get_completion_list(content_list, max_parallel_calls):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
progress_log = ProgressLog(len(content_list))
async with aiohttp.ClientSession() as session:
return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])
start_time = time.perf_counter()
completion_list = await get_completion_list(["ping", "pong"]*5, 100)
print("Time elapsed: ", time.perf_counter() - start_time, "seconds.")
print(completion_list)
<RetryCallState 133364608660048: attempt #1; slept for 0.1; last result: failed (Exception Random exception)>
<RetryCallState 133364377435680: attempt #1; slept for 0.71; last result: failed (Exception Random exception)>
<RetryCallState 133364377421472: attempt #1; slept for 0.17; last result: failed (Exception Random exception)>
<RetryCallState 133364377424256: attempt #1; slept for 0.37; last result: failed (Exception Random exception)>
<RetryCallState 133364377430928: attempt #1; slept for 0.87; last result: failed (Exception Random exception)>
<RetryCallState 133364377420752: attempt #1; slept for 0.42; last result: failed (Exception Random exception)>
<RetryCallState 133364377422576: attempt #1; slept for 0.47; last result: failed (Exception Random exception)>
<RetryCallState 133364377431312: attempt #1; slept for 0.11; last result: failed (Exception Random exception)>
<RetryCallState 133364377425840: attempt #1; slept for 0.69; last result: failed (Exception Random exception)>
<RetryCallState 133364377424592: attempt #1; slept for 0.89; last result: failed (Exception Random exception)>
---------------------------------------------------------------------------
Exception Traceback (most recent call last)
/usr/local/lib/python3.10/dist-packages/tenacity/_asyncio.py in __call__(self, fn, *args, **kwargs)
49 try:
---> 50 result = await fn(*args, **kwargs)
51 except BaseException: # noqa: B902
5 frames
Exception: Random exception
The above exception was the direct cause of the following exception:
RetryError Traceback (most recent call last)
/usr/local/lib/python3.10/dist-packages/tenacity/__init__.py in iter(self, retry_state)
324 if self.reraise:
325 raise retry_exc.reraise()
--> 326 raise retry_exc from fut.exception()
327
328 if self.wait:
RetryError: RetryError[<Future at 0x794b5057a590 state=finished raised Exception>]
设置了这个参数后,一旦重试次数达到最大值,就会抛出一个RetryError异常。但是,我们可能想要继续运行而不产生异常,只是把调用的返回值保存为None,以便稍后处理。为了做到这一点,我们可以使用回调函数retry_error_callback,在发生RetryError错误时只返回None值:
import random
class ProgressLog:
def __init__(self, total):
self.total = total
self.done = 0
def increment(self):
self.done = self.done + 1
def __repr__(self):
return f"Done runs {self.done}/{self.total}."
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(2), before_sleep=print, retry_error_callback=lambda _: None)
async def get_completion(content, session, semaphore, progress_log):
async with semaphore:
#await asyncio.sleep(1)
if random.random() < 0.7:
raise Exception("Random exception")
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": content}],
"temperature": 0
}) as resp:
response_json = await resp.json()
progress_log.increment()
print(progress_log)
return response_json["choices"][0]['message']["content"]
async def get_completion_list(content_list, max_parallel_calls):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
progress_log = ProgressLog(len(content_list))
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(1)) as session:
return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])
start_time = time.perf_counter()
completion_list = await get_completion_list(["ping", "pong"]*5, 100)
print("Time elapsed: ", time.perf_counter() - start_time, "seconds.")
print(completion_list)
<RetryCallState 133364377805024: attempt #1; slept for 0.22; last result: failed (Exception Random exception)>
<RetryCallState 133364377799456: attempt #1; slept for 0.53; last result: failed (Exception Random exception)>
<RetryCallState 133364377801328: attempt #1; slept for 0.24; last result: failed (Exception Random exception)>
<RetryCallState 133364377810208: attempt #1; slept for 0.38; last result: failed (Exception Random exception)>
<RetryCallState 133364377801616: attempt #1; slept for 0.54; last result: failed (Exception Random exception)>
<RetryCallState 133364377422096: attempt #1; slept for 0.59; last result: failed (Exception Random exception)>
<RetryCallState 133364377430592: attempt #1; slept for 0.07; last result: failed (Exception Random exception)>
<RetryCallState 133364377425648: attempt #1; slept for 0.05; last result: failed (Exception Random exception)>
Done runs 1/10.
Done runs 2/10.
Done runs 3/10.
Time elapsed: 2.6409040250000544 seconds.
['Pong!', 'Ping!', None, None, None, None, None, 'Ping!', None, None]
这样,就会返回None值,而不是产生错误。
还有一个没有处理的问题是连接卡住的问题。这种情况发生在我们发出请求时,由于某种原因,主机保持连接,但既不失败也不返回任何东西。为了处理这种情况,我们需要设置一个超时时间,在调用在给定时间内没有返回值时返回。为了做到这一点,我们可以使用aiohttp库的timeout参数,以及aiohttp.ClientTimeout类。如果这里发生超时,就会抛出一个TimeoutError异常,然后由tenacity的retry装饰器处理,并自动再次运行函数。
class ProgressLog:
def __init__(self, total):
self.total = total
self.done = 0
def increment(self):
self.done = self.done + 1
def __repr__(self):
return f"Done runs {self.done}/{self.total}."
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(20), before_sleep=print, retry_error_callback=lambda _: None)
async def get_completion(content, session, semaphore, progress_log):
async with semaphore:
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": content}],
"temperature": 0
}) as resp:
response_json = await resp.json()
progress_log.increment()
print(progress_log)
return response_json["choices"][0]['message']["content"]
async def get_completion_list(content_list, max_parallel_calls):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
progress_log = ProgressLog(len(content_list))
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(10)) as session:
return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])
start_time = time.perf_counter()
completion_list = await get_completion_list(["ping", "pong"]*100, 100)
print("Time elapsed: ", time.perf_counter() - start_time, "seconds.")
<RetryCallState 133364375201936: attempt #1; slept for 0.57; last result: failed (TimeoutError )>
Time elapsed: 12.705538211999738 seconds.
太好了!现在我们有了一种强力的方法,可以运行多个并行的请求,如果发生某些失败,它们会自动重试,并且如果失败是系统性的,它们会返回None值。所以最终的代码如下:
import asyncio
import aiohttp
from tenacity import (
retry,
stop_after_attempt,
wait_random_exponential,
)
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {OPENAI_API_KEY}"
}
class ProgressLog:
def __init__(self, total):
self.total = total
self.done = 0
def increment(self):
self.done = self.done + 1
def __repr__(self):
return f"Done runs {self.done}/{self.total}."
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(20), before_sleep=print, retry_error_callback=lambda _: None)
async def get_completion(content, session, semaphore, progress_log):
async with semaphore:
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": content}],
"temperature": 0
}) as resp:
response_json = await resp.json()
progress_log.increment()
print(progress_log)
return response_json["choices"][0]['message']["content"]
async def get_completion_list(content_list, max_parallel_calls, timeout):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
progress_log = ProgressLog(len(content_list))
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(timeout)) as session:
return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])
总之,我们实现了以下功能: