Python异步请求
小于 1 分钟
使用底层 urllib3 库,禁用 InsecureRequestWarning 警告:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)demo
# 定义异步函数,一次调用作为一次异步任务
async def _async_action(url) -> bool:
# 异步http客户端,发起异步请求
async with aiohttp.ClientSession() as session:
try:
async with session.post(f"{url}/v1/chat/completions", headers=headers, json=data, proxy=proxies.get("http"),
verify_ssl=False, timeout=20) as response:
if response.status == 200:
# 处理响应
print(f"{url} : ok")
return True
else:
print(f"{url} : {response.status}")
except Exception as e:
print(f"{url} : {e.__class__.__name__}")
# 定义异步事务,生成多个异步任务,并汇聚异步结果
async def async_transaction(urls, results):
# asyncio.create_task 创建一个异步任务
tasks = [asyncio.create_task(_async_action(url)) for url in urls]
# 汇聚异步任务结果
await asyncio.gather(*tasks)
for i in range(len(urls)):
# r 为任务i的返回值,即 _async_action 的返回值
r = tasks[i].result()
if r:
results.append(urls[i])
# 运行异步事务
results = [] # 异步结果存放数组
asyncio.run(async_transaction(urls, results))