「Python|aiohttp|并发与协程」将HTTP请求提速成百上千倍!一次性掌握把requests请求改成协程的通用方法

Table of Contents

场景说明

大量数据请求场景

在爬虫通过定制化接口获取数据或者后端处理大量数据请求时,由于数据量较大,所以如果单纯使用同步方式的requests请求,对单台机器的利用率不高,数据处理的速度较慢,同时如果要提速需要增加机器会导致成本提高。

在这种情况下,使用并发是一种成本比较低的方式。在并发中多进程的并发是用于处理计算逻辑较多计算量较大的计算密集型任务,数据请求属于网络IO等待时间占处理时间的大头,不属于计算密集型,而是属于IO密集型任务。

用协程解决网络请求密集的任务

处理IO密集型任务,使用多线程或协程来实现效率的提升。而由于python默认的cpython解释器有全局解释器锁GIL的限制,使得python的多线程会有效率提高,但是无法完全发挥多线程的优势。

而协程在处理IO密集型任务上相当合适,但是在写法上跟我们习惯的同步方式执行的代码写法有些差别。

解决方案

在python中,将http请求使用异步方式执行,主要是使用aiohttp库来快速实现。

我们接下来将明确把一个同步方式的requests请求有哪几个部分,然后通过将各个部分替换成异步的aiohttp请求,来展示将同步请求改成异步请求的步骤。

使用requests库发送请求

最简单的请求代码如下:

1import requests 2 3response = requests.get("http://httpbin.org/get")

当然我们在业务中会需要使用到其他参数来满足我们的业务需求,所以可以考虑请求分为三个部分:

  • 构造请求相关的参数/值
  • 发送请求(考虑复用与目标服务器的连接,会需要使用session发送请求)
  • 处理返回结果

代码结构如下:

1"""请求过程分为三个部分: 21. 构造参数 32. 发送请求 43. 处理返回结果 5""" 6 7import requests 8 9"""构造参数, 包括但不限于: 10- headers 11- url 12- parameters 13- proxy 14- post请求的body 15- ...... 16""" 17headers = { 18 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 19 'Cache-Control': 'no-cache', 20 'Pragma': 'no-cache', 21 'Proxy-Connection': 'keep-alive', 22 'Referer': 'http://httpbin.org/', 23 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', 24 'accept': 'application/json', 25} 26url = 'http://httpbin.org/get' 27 28""" 29# 新建会话,使用会话发送请求 30# 也可以直接requests.get()进行请求 31""" 32session = requests.session() 33response = session.get(url, headers=headers, verify=False) 34 35"""处理返回结果 36主要是对返回结果的状态,返回值进行条件检查 37在满足某些条件时执行对应的处理逻辑 38""" 39print(response.status_code) 40if response.status_code == 200: 41 print(response.json()) 42

异步请求代码的结构

关于异步请求,我们只需要知道下面几点:

  • 同步函数就是用def定义的函数
  • 异步函数就是用async def定义的函数,同时函数体里面会包含await语句
  • 同步请求的逻辑是用循环语句一次接着一次进行"发起请求,等待返回,处理返回结果"的过程
  • 异步请求的逻辑是生成不同的请求任务,然后(几乎)同时发出这批请求,然后请求返回的先后顺序,依次处理各个请求的返回结果
  • 同步请求是一个def 函数传入所有要请求的数据,然后依次传给另一个def函数去执行单个请求的发送和处理逻辑
  • 异步请求是一个async def函数接收所有要请求的数据,然后将各个数据传入另一个async def函数来生成请求任务并批量执行这些请求任务。异步需要额外的一个def函数来启动这个外层的async def 函数

批量请求的同步方式和异步方式代码对照

我们分别展示一下完整的代码结构,然后解释二者的差异。 假设我们批量请求的requests代码结构如下:

1"""请求分为三个部分: 21. 构造参数 32. 发送请求 43. 处理返回结果 5 6""" 7 8import requests 9 10 11def send_single_request(session, parameters: dict): 12 # 构造参数, 包括但不限于headers, url, parameters, proxy, post请求的body 13 headers = { 14 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 15 'Cache-Control': 'no-cache', 16 'Pragma': 'no-cache', 17 'Proxy-Connection': 'keep-alive', 18 'Referer': 'http://httpbin.org/', 19 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', 20 'accept': 'application/json', 21 } 22 url = 'http://httpbin.org/get' 23 24 # 新建会话,使用会话发送请求 25 # (也可以直接requests.get()进行请求) 26 response = session.get(url, params=parameters, headers=headers) 27 28 # 处理返回结果 29 print(response.status_code) 30 if response.status_code == 200: 31 json_response = response.json() 32 print(json_response) 33 return json_response["args"] 34 35 36def send_batch_requests(batch_data): 37 all_result = [] 38 session = requests.session() 39 40 for parameters in batch_data: 41 request_result = send_single_request(session, parameters) 42 all_result.append(request_result) 43 44 return all_result 45 46 47# 业务模块在有请求需求时调用接口的业务逻辑 48def appliaction_logic_to_call_requests(): 49 batch_data_to_request = [{"num": num} for num in range(1, 11)] 50 all_result = send_batch_requests(batch_data_to_request) 51 # process the result 52 53 54appliaction_logic_to_call_requests()

则改成批量的aiohttp异步请求的方式,代码结构如下:

1"""请求分为三个部分: 21. 构造参数 32. 发送请求 43. 处理返回结果 5 6""" 7 8import aiohttp 9import asyncio 10import json 11 12 13async def send_single_request(session, parameters: dict): 14 # 构造参数, 包括但不限于headers, url, parameters, proxy, post请求的body 15 headers = { 16 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 17 'Cache-Control': 'no-cache', 18 'Pragma': 'no-cache', 19 'Proxy-Connection': 'keep-alive', 20 'Referer': 'http://httpbin.org/', 21 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', 22 'accept': 'application/json', 23 } 24 url = 'http://httpbin.org/get' 25 26 # 新建会话,使用会话发送请求 27 # (也可以直接requests.get()进行请求) 28 async with session.get(url, params=parameters, headers=headers) as response: 29 raw_response = await response.read() 30 31 # 处理返回结果 32 print(response.status) 33 if response.status == 200: 34 json_response = json.loads(raw_response) 35 print(json_response) 36 return json_response["args"] 37 38 39async def send_batch_requests(batch_data): 40 all_result = [] 41 async with aiohttp.ClientSession() as session: 42 # # 我更喜欢使用列表推导式来生成任务,简单明了 43 # tasks = [asyncio.create_task(send_single_request(session, parameters)) for parameters in batch_data] 44 45 tasks = [] 46 for parameters in batch_data: 47 single_task = asyncio.create_task(send_single_request(session, parameters)) 48 tasks.append(single_task) 49 all_result = await asyncio.gather(*tasks) 50 51 return all_result 52 53"""异步方式需要增加一个额外的函数给外部调用 54这样对外部调用逻辑来说,批量请求是同步还是异步是隐藏的细节(封装) 55""" 56def logic_to_start_async_tasks(batch_data): 57 all_result = asyncio.run(send_batch_requests(batch_data)) 58 return all_result 59 60 61# 业务模块在有请求需求时调用接口的业务逻辑 62def appliaction_logic_to_call_requests(): 63 batch_data_to_request = [{"num": num} for num in range(1, 11)] 64 all_result = logic_to_start_async_tasks(batch_data_to_request) 65 # process the result 66 67 68appliaction_logic_to_call_requests()

代码结构差异

用版本管理工具查看两个版本的代码,差异如下: 红色高亮代码为旧版本代码,即同步方式的requests结构代码 绿色高亮代码为新版本代码,即异步方式的aiohttp结构代码

导包差异


在这里插入图片描述

send_single_request差异


在这里插入图片描述

send_batch_requests差异


在这里插入图片描述

外部业务模块调用差异


在这里插入图片描述

差异说明

我们可以看到异步的的代码结构主要差异如下:

  • 具体进行异步请求的函数需要使用async def进行定义
  • async def的函数体必须包含await语句:
    • 批量创建和调度任务的函数send_batch_requestsawait用于等待任务的执行和返回:await asyncio.gather(*tasks)
    • 具体发送请求和处理返回结果的函数send_single_requestawait用于等待外部网络服务器返回awiat session.get()以及读取返回值await response.read()
    • send_single_request中也可以对response调用json()方法,但是也要使用await,(例:json_response = await response.json()),可以自行选择使用哪种用法。
  • session创建用的函数不同:
    • requestsrequests.session()创建一个可以复用的新会话
    • aiohttpaiohttp.ClientSession()创建一个可以复用的新会话
    • aiohttp的session不一定要搭配上下文管理器使用,也可以用session = aiohttp.ClientSession()创建session;使用上下文可以更好避免session没有关闭的问题
  • 可能存在一些属性或方法的名称有差异:
    • requests中的response状态码的属性名为status_code
    • aiohttp中的response状态码的属性名为status
  • 异步方法需要增加一个同步函数来启动异步创建和执行任务的逻辑
    • 为了让外部调用完全没有感知,我们在实际业务中会使用send_batch_requests作为同步函数的名称,异步函数可以改为async def create_and_execute_tasks,这样对于外部来说,在同步和异步两个版本中调用的API名称没有变化,都是调用的send_batch_requests方法

只要注意以上这些差异,就可以熟练地在异步和同步方式之间转换。

异步方式返回结果无序的解决方案

当我们使用同步的方式进行请求的时候,由于是完成上一个请求才会执行下一个请求,所以返回结果是有序的,如下:
在这里插入图片描述

而当我们使用异步的方式进行请求时,由于请求是直接接连发出的,不会等待上一个请求完成返回并处理后才发出下一个请求,并且请求返回的次序是不固定的(尽管我们是有序发出的请求),所以处理完的结果可能是无序的,如下:
在这里插入图片描述

由于我们在使用中需要知道哪个返回结果是对应哪个请求数据的,所以返回结果无序是需要解决的。解决的方法也很简单,异步请求的方法返回一个唯一标志,然后外部管理异步任务的异步方法根据返回的唯一标志进行排序或者其他处理

方式一:按请求顺序排序(请求顺序作为返回数据标识)

代码变动如下:
在这里插入图片描述

在这里插入图片描述
这样一来,最后返回的结果就是有序的:
在这里插入图片描述

方式二:自定义返回数据标识,返回一个键值对

使用数据请求顺序来说可以让返回结果有序,但从业务实际应用来说,这种方式应用场景较少。 因为实际数据使用都是通过数据内容来定位数据,而不是数据的顺序来定位数据。

举一个具体例子:

我们要请求10个用户的数据,由于对每一个用户可能是走同一套逻辑,所以这种情况可以是批量请求,然后按顺序把"用户ID"和"根据用户ID请求到的用户数据"传入后续的处理逻辑中。

但事实上用户数据的不同信息可能是由不同请求去获取的,比如用户信息包括地址和手机,而地址和手机分别存在不同的数据库表中,或者由不同的API获取,所以我们可以是发出get/user/addressget/user/telephone两个请求,然后返回结果是{"user_id": "xxx", "telephone": "xxxx"}{"user_id": "xxx", "address": "xxxxxx"},这种时候我们是知道要请求哪些信息项的,比如info_items = ["address", "telephone"]

所以可以有两种写法,如下:

1"""写法一:主要还是用顺序作为数据标识以及排序依据 2然后根据item在info_items的顺序与在结果的顺序一一对应的特点来返回最终结果 3""" 4info_items = ["address", "telephone"] 5 6tasks = [] 7for index, item in enumerate(info_items): 8 API = f"get/user/{item}" 9 session = aiohttp.ClientSession() 10 single_task = asyncio.create_task(send_single_request(session, item, identity=index)) 11 tasks.append(single_task) 12all_result = await asyncio.gather(*tasks) 13all_result = sorted(all_result, key=lambda result: int(result[0])) 14all_result = [result[1] for result in all_result] 15 16user_info = { 17 info_item[index]: all_result[index] for index in range(len(info_items)) 18} 19return user_info 20 21"""方式二: 直接用item作为identity 22然后直接用identity和返回结果生成键值对 23""" 24info_items = ["address", "telephone"] 25 26tasks = [] 27for item in info_items: 28 API = f"get/user/{item}" 29 session = aiohttp.ClientSession() 30 single_task = asyncio.create_task(send_single_request(session, item, identity=item)) 31 tasks.append(single_task) 32all_result = await asyncio.gather(*tasks) 33 34user_info = { 35 result[0]: result[1] for result in all_result 36} 37return user_info

个人推荐第二种方式,编写上更符合python的风格,阅读上也更清晰明了

源代码

完整的异步代码结构如下:

1"""请求分为三个部分: 21. 构造参数 32. 发送请求 43. 处理返回结果 5 6""" 7 8import aiohttp 9import asyncio 10import json 11 12 13async def send_single_request(session, parameters: dict, identity): 14 # 构造参数, 包括但不限于headers, url, parameters, proxy, post请求的body 15 headers = { 16 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 17 'Cache-Control': 'no-cache', 18 'Pragma': 'no-cache', 19 'Proxy-Connection': 'keep-alive', 20 'Referer': 'http://httpbin.org/', 21 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', 22 'accept': 'application/json', 23 } 24 url = 'http://httpbin.org/get' 25 26 # 新建会话,使用会话发送请求 27 # (也可以直接requests.get()进行请求) 28 async with session.get(url, params=parameters, headers=headers) as response: 29 raw_response = await response.read() 30 31 # 处理返回结果 32 # print(response.status) 33 if response.status == 200: 34 json_response = json.loads(raw_response) 35 # json_response = await response.json() 36 # print(json_response) 37 # return identity, json_response 38 return identity, {"args": json_response["args"], "url": json_response["url"]} 39 40 41async def create_and_execute_tasks(batch_data): 42 all_result = {} 43 async with aiohttp.ClientSession() as session: 44 # # 我更喜欢使用列表推导式来生成任务,简单明了 45 # tasks = [asyncio.create_task(send_single_request(session, parameters)) for parameters in batch_data] 46 47 tasks = [] 48 for index, parameters in enumerate(batch_data): 49 single_task = asyncio.create_task(send_single_request(session, parameters, identity=index)) 50 tasks.append(single_task) 51 identity_and_data_pairs = await asyncio.gather(*tasks) 52 all_result = {pair[0]: pair[1] for pair in identity_and_data_pairs} 53 54 return all_result 55 56 57"""异步方式需要增加一个额外的函数给外部调用 58这样对外部调用逻辑来说,批量请求是同步还是异步是隐藏的细节(封装) 59""" 60 61 62def send_batch_requests(batch_data): 63 all_result = asyncio.run(create_and_execute_tasks(batch_data)) 64 return all_result 65 66 67# 业务模块在有请求需求时调用接口的业务逻辑 68def appliaction_logic_to_call_requests(): 69 batch_data_to_request = [{"num": num} for num in range(1, 11)] 70 all_result = send_batch_requests(batch_data_to_request) 71 return all_result 72 # process the result 73 74 75print("异步方式返回结果") 76print(appliaction_logic_to_call_requests())

在实际业务开发中,根据具体的业务需求修改send_single_request以及create_and_execute_tasks即可,比如:

  • send_single_request中get请求方式改为post请求方式,并且增删使用的参数
  • create_and_execute_tasks从batch_data单个数据parameters中生成identity
  • ......

掌握了上面这些,使用python写异步的http请求,就手到擒来了(●ˇ∀ˇ●)

好书推荐:

好课推荐:

写文不易,如果对你有帮助的话,来一波点赞、收藏、关注吧~👇

Mastodon