在这个小Python教程中,我们将涵盖这两个重要的主题,以有效地对ChatGPT API进行调用:
1. 并行执行多个调用
import requests
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {OPENAI_API_KEY}"
response_json = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "ping"}],
"temperature": 0
# Sync call
import time
def delay_print(msg):
print(msg, end=" ")
def sync_print():
for i in range(10):
start_time = time.time()
print("\n", time.time() - start_time, "seconds.")
0 1 2 3 4 5 6 7 8 9
10.019574642181396 seconds.
#Async Call
import asyncio
async def delay_print_async(msg):
print(msg, end=" ")
await asyncio.sleep(1)
async def async_print():
asyncio.gather(*[delay_print_async(i) for i in range(10)])
start_time = time.time()
await async_print()
print("\n", time.time() - start_time, "seconds.")
0.0002448558807373047 seconds.
0 1 2 3 4 5 6 7 8 9
import aiohttp
async def get_completion(content):
async with aiohttp.ClientSession() as session:
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": content}],
"temperature": 0
}) as resp:
response_json = await resp.json()
return response_json["choices"][0]['message']["content"]
await get_completion("Ping")
async def get_completion_list(content_list):
return await asyncio.gather(*[get_completion(content) for content in content_list])
await get_completion_list(["ping", "pong"]*5)
async def get_completion(content, session):
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": content}],
"temperature": 0
}) as resp:
response_json = await resp.json()
return response_json["choices"][0]['message']["content"]
async def get_completion_list(content_list):
async with aiohttp.ClientSession() as session:
return await asyncio.gather(*[get_completion(content, session) for content in content_list])
await get_completion_list(["ping", "pong"]*5)
async def get_completion(content, session, semaphore):
async with semaphore:
await asyncio.sleep(1)
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": content}],
"temperature": 0
}) as resp:
response_json = await resp.json()
return response_json["choices"][0]['message']["content"]
async def get_completion_list(content_list, max_parallel_calls):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
async with aiohttp.ClientSession() as session:
return await asyncio.gather(*[get_completion(content, session, semaphore) for content in content_list])
start_time = time.perf_counter()
completion_list = await get_completion_list(["ping", "pong"]*5, 100)
print("Time elapsed: ", time.perf_counter() - start_time, "seconds.")
Time elapsed: 1.8094507199984946 seconds.
['Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!']
class ProgressLog:
def __init__(self, total):
self.total = total
self.done = 0
def increment(self):
self.done = self.done + 1
def __repr__(self):
return f"Done runs {self.done}/{self.total}."
async def get_completion(content, session, semaphore, progress_log):
async with semaphore:
await asyncio.sleep(1)
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": content}],
"temperature": 0
}) as resp:
response_json = await resp.json()
return response_json["choices"][0]['message']["content"]
async def get_completion_list(content_list, max_parallel_calls):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
progress_log = ProgressLog(len(content_list))
async with aiohttp.ClientSession() as session:
return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])
start_time = time.perf_counter()
completion_list = await get_completion_list(["ping", "pong"]*5, 100)
print("Time elapsed: ", time.perf_counter() - start_time, "seconds.")
Done runs 1/10.
Done runs 2/10.
Done runs 3/10.
Done runs 4/10.
Done runs 5/10.
Done runs 6/10.
Done runs 7/10.
Done runs 8/10.
Done runs 9/10.
Done runs 10/10.
Time elapsed: 1.755018908999773 seconds.
['Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!']
2. 重试调用以防失败
from tenacity import (
一个原因是我们的函数调用可能因为服务器过载而失败,这就需要我们在再次尝试之前等待一段时间。为了指定等待时间,我们将使用指数退避的方法,使用参数wait=wait_random_exponential(min=min_value, max=max_value)。这样会让等待时间随着函数失败的次数增加而增加。
import random
class ProgressLog:
def __init__(self, total):
self.total = total
self.done = 0
def increment(self):
self.done = self.done + 1
def __repr__(self):
return f"Done runs {self.done}/{self.total}."
@retry(wait=wait_random_exponential(min=1, max=60), before_sleep=print)
async def get_completion(content, session, semaphore, progress_log):
async with semaphore:
#await asyncio.sleep(1)
if random.random() < 0.2:
raise Exception("Random exception")
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": content}],
"temperature": 0
}) as resp:
response_json = await resp.json()
return response_json["choices"][0]['message']["content"]
async def get_completion_list(content_list, max_parallel_calls):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
progress_log = ProgressLog(len(content_list))
async with aiohttp.ClientSession() as session:
return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])
start_time = time.perf_counter()
completion_list = await get_completion_list(["ping", "pong"]*5, 100)
print("Time elapsed: ", time.perf_counter() - start_time, "seconds.")
<RetryCallState 133364377433616: attempt #1; slept for 0.74; last result: failed (Exception Random exception)>
<RetryCallState 133364377424496: attempt #1; slept for 0.79; last result: failed (Exception Random exception)>
Done runs 1/10.
Done runs 2/10.
Done runs 3/10.
Done runs 4/10.
Done runs 5/10.
Done runs 6/10.
Done runs 7/10.
Done runs 8/10.
Done runs 9/10.
Done runs 10/10.
Time elapsed: 1.1305301820011664 seconds.
['Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!']
import random
class ProgressLog:
def __init__(self, total):
self.total = total
self.done = 0
def increment(self):
self.done = self.done + 1
def __repr__(self):
return f"Done runs {self.done}/{self.total}."
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(2), before_sleep=print)
async def get_completion(content, session, semaphore, progress_log):
async with semaphore:
#await asyncio.sleep(1)
if random.random() < 0.9:
raise Exception("Random exception")
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": content}],
"temperature": 0
}) as resp:
response_json = await resp.json()
return response_json["choices"][0]['message']["content"]
async def get_completion_list(content_list, max_parallel_calls):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
progress_log = ProgressLog(len(content_list))
async with aiohttp.ClientSession() as session:
return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])
start_time = time.perf_counter()
completion_list = await get_completion_list(["ping", "pong"]*5, 100)
print("Time elapsed: ", time.perf_counter() - start_time, "seconds.")
<RetryCallState 133364608660048: attempt #1; slept for 0.1; last result: failed (Exception Random exception)>
<RetryCallState 133364377435680: attempt #1; slept for 0.71; last result: failed (Exception Random exception)>
<RetryCallState 133364377421472: attempt #1; slept for 0.17; last result: failed (Exception Random exception)>
<RetryCallState 133364377424256: attempt #1; slept for 0.37; last result: failed (Exception Random exception)>
<RetryCallState 133364377430928: attempt #1; slept for 0.87; last result: failed (Exception Random exception)>
<RetryCallState 133364377420752: attempt #1; slept for 0.42; last result: failed (Exception Random exception)>
<RetryCallState 133364377422576: attempt #1; slept for 0.47; last result: failed (Exception Random exception)>
<RetryCallState 133364377431312: attempt #1; slept for 0.11; last result: failed (Exception Random exception)>
<RetryCallState 133364377425840: attempt #1; slept for 0.69; last result: failed (Exception Random exception)>
<RetryCallState 133364377424592: attempt #1; slept for 0.89; last result: failed (Exception Random exception)>
Exception Traceback (most recent call last)
/usr/local/lib/python3.10/dist-packages/tenacity/_asyncio.py in __call__(self, fn, *args, **kwargs)
49 try:
---> 50 result = await fn(*args, **kwargs)
51 except BaseException: # noqa: B902
5 frames
Exception: Random exception
The above exception was the direct cause of the following exception:
RetryError Traceback (most recent call last)
/usr/local/lib/python3.10/dist-packages/tenacity/__init__.py in iter(self, retry_state)
324 if self.reraise:
325 raise retry_exc.reraise()
--> 326 raise retry_exc from fut.exception()
328 if self.wait:
RetryError: RetryError[<Future at 0x794b5057a590 state=finished raised Exception>]
import random
class ProgressLog:
def __init__(self, total):
self.total = total
self.done = 0
def increment(self):
self.done = self.done + 1
def __repr__(self):
return f"Done runs {self.done}/{self.total}."
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(2), before_sleep=print, retry_error_callback=lambda _: None)
async def get_completion(content, session, semaphore, progress_log):
async with semaphore:
#await asyncio.sleep(1)
if random.random() < 0.7:
raise Exception("Random exception")
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": content}],
"temperature": 0
}) as resp:
response_json = await resp.json()
return response_json["choices"][0]['message']["content"]
async def get_completion_list(content_list, max_parallel_calls):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
progress_log = ProgressLog(len(content_list))
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(1)) as session:
return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])
start_time = time.perf_counter()
completion_list = await get_completion_list(["ping", "pong"]*5, 100)
print("Time elapsed: ", time.perf_counter() - start_time, "seconds.")
<RetryCallState 133364377805024: attempt #1; slept for 0.22; last result: failed (Exception Random exception)>
<RetryCallState 133364377799456: attempt #1; slept for 0.53; last result: failed (Exception Random exception)>
<RetryCallState 133364377801328: attempt #1; slept for 0.24; last result: failed (Exception Random exception)>
<RetryCallState 133364377810208: attempt #1; slept for 0.38; last result: failed (Exception Random exception)>
<RetryCallState 133364377801616: attempt #1; slept for 0.54; last result: failed (Exception Random exception)>
<RetryCallState 133364377422096: attempt #1; slept for 0.59; last result: failed (Exception Random exception)>
<RetryCallState 133364377430592: attempt #1; slept for 0.07; last result: failed (Exception Random exception)>
<RetryCallState 133364377425648: attempt #1; slept for 0.05; last result: failed (Exception Random exception)>
Done runs 1/10.
Done runs 2/10.
Done runs 3/10.
Time elapsed: 2.6409040250000544 seconds.
['Pong!', 'Ping!', None, None, None, None, None, 'Ping!', None, None]
class ProgressLog:
def __init__(self, total):
self.total = total
self.done = 0
def increment(self):
self.done = self.done + 1
def __repr__(self):
return f"Done runs {self.done}/{self.total}."
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(20), before_sleep=print, retry_error_callback=lambda _: None)
async def get_completion(content, session, semaphore, progress_log):
async with semaphore:
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": content}],
"temperature": 0
}) as resp:
response_json = await resp.json()
return response_json["choices"][0]['message']["content"]
async def get_completion_list(content_list, max_parallel_calls):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
progress_log = ProgressLog(len(content_list))
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(10)) as session:
return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])
start_time = time.perf_counter()
completion_list = await get_completion_list(["ping", "pong"]*100, 100)
print("Time elapsed: ", time.perf_counter() - start_time, "seconds.")
<RetryCallState 133364375201936: attempt #1; slept for 0.57; last result: failed (TimeoutError )>
Time elapsed: 12.705538211999738 seconds.
import asyncio
import aiohttp
from tenacity import (
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {OPENAI_API_KEY}"
class ProgressLog:
def __init__(self, total):
self.total = total
self.done = 0
def increment(self):
self.done = self.done + 1
def __repr__(self):
return f"Done runs {self.done}/{self.total}."
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(20), before_sleep=print, retry_error_callback=lambda _: None)
async def get_completion(content, session, semaphore, progress_log):
async with semaphore:
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": content}],
"temperature": 0
}) as resp:
response_json = await resp.json()
return response_json["choices"][0]['message']["content"]
async def get_completion_list(content_list, max_parallel_calls, timeout):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
progress_log = ProgressLog(len(content_list))
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(timeout)) as session:
return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])