从Golang goroutine看Python异步:历程、用法与终极对比
下午和同事review代码时,看到Python异步并发的场景,不禁怀念起 Golang 中用go关键字启动异步的优雅与简洁。那 Python 中是否也能实现如此优雅的异步调用? 带着这个疑问,我翻阅了不少资料与文档,整理成这篇文章。接下来,我们就从 Python 异步的版本进化史说起,梳理其核心用法,再与 Golang goroutine 进行全方位对比,帮你彻底搞懂 Python 异步协程。
本文较长,可收藏细看。
一、Python异步的“进化之路”
Python的异步能力并非一蹴而就,而是经历了多版本迭代,从早期的“曲线救国”到如今的“原生支持”,核心是为了解决高并发IO场景的效率问题。
1. 早期探索:yield生成器模拟(2.x ~ 3.3)
在Python 3.4之前,并没有官方的异步框架,开发者只能通过**生成器(yield/yield from)**模拟协程的“暂停/恢复”逻辑。
原理是利用生成器的迭代特性,手动控制函数执行流程,但这种方式需要开发者自己处理调度逻辑,代码繁琐且易出错,只能作为小众方案使用。
2. 首次标准化:asyncio纳入标准库(3.4)
2014年发布的Python 3.4,正式将asyncio库纳入标准库,标志着Python官方支持异步编程。
但此时的语法仍不友好,需要通过@asyncio.coroutine装饰器定义协程,用yield from实现暂停等待,比如:
import asyncio
@asyncio.coroutine
def hello():
yield from asyncio.sleep(1) # 模拟IO等待
print("Hello")
loop = asyncio.get_event_loop()
loop.run_until_complete(hello())
loop.close()虽然有了官方框架,但装饰器+yield from的组合学习成本高,难以推广。
3. 语法革命:async/await语法糖(3.5)
2015年Python 3.5的发布,带来了异步编程的“里程碑”——引入async def和await关键字,彻底简化了协程的定义和使用。
原来的装饰器和yield from被逐步替代,代码可读性大幅提升,这也是我们现在使用的核心语法,比如:
import asyncio
async def hello(): # 直接用async def定义协程
await asyncio.sleep(1) # 用await暂停等待
print("Hello")
loop = asyncio.get_event_loop()
loop.run_until_complete(hello())4. 持续优化:简化启动与增强功能(3.7+)
后续版本持续优化异步体验,降低上手门槛:
3.7:新增
asyncio.run()函数,一键启动事件循环,替代了之前繁琐的get_event_loop()+run_until_complete()组合;3.8:
asyncio.create_task()成为顶层函数,简化了任务的创建;同时支持在异步函数中使用await表达式作为赋值语句的右侧;3.10:引入
TaskGroup(实验性,3.11正式稳定),更优雅地管理批量任务,支持自动取消和异常处理;3.11:针对IO密集型场景的吞吐量需求,大幅优化asyncio底层调度逻辑,协程切换速度提升3倍以上,显著缩小了与Golang在高并发IO场景下的性能差距;
3.12:
asyncio.TaskGroup功能进一步增强,支持嵌套使用,可更灵活地管理分层任务;优化异步迭代器性能,降低async for循环的开销;新增asyncio.to_thread()函数,可更便捷地将同步函数包装为异步任务,减少同步/异步混用的适配成本;新增asyncio.eager_task_factory()与asyncio.create_eager_task_factory()函数,提供“急切执行”的任务工厂实现,让任务创建后立即执行(无需等待事件循环下一轮调度),显著降低任务启动延迟,适配低延迟并发场景;3.13(预览特性):重点优化异步IO的底层实现,提升大并发场景下的任务调度吞吐量;计划增强与多进程的协同能力,简化“异步+多进程”混合编程的复杂度;同时优化异步异常的捕获与堆栈信息,便于问题排查。
二、Python异步协程核心用法
经过多版本迭代,Python异步协程的核心用法已非常稳定,掌握以下4个关键点,就能应对大部分高并发IO场景。
1. 核心概念速览
| 概念 | 作用 | 通俗理解 |
|---|---|---|
| async def | 定义协程函数,调用后返回协程对象(不立即执行) | “异步任务的模板” |
| await | 挂起协程,等待异步操作完成后恢复执行,仅能在async def内使用 | “暂停当前任务,去做其他事,做完再回来” |
| 事件循环(Event Loop) | 管理协程的执行、切换和调度,是异步编程的核心 | “任务调度员” |
| Task | 将协程包装为可并发执行的任务,由事件循环管理 | “被调度员管理的具体任务” |
2. 最简并发示例(类比Golang go关键字)
Golang用go func()启动并发任务,Python则用asyncio.create_task()或asyncio.gather()实现,示例如下:
import asyncio
# 定义协程函数(异步任务)
async def task_func(name):
print(f"任务{name}开始执行")
await asyncio.sleep(1) # 模拟IO操作(非阻塞)
print(f"任务{name}执行完成")
return f"任务{name}结果"
# 主协程(异步程序入口)
async def main():
# 方式1:用create_task创建任务(类比go 关键字)
task1 = asyncio.create_task(task_func("A"))
task2 = asyncio.create_task(task_func("B"))
# 等待任务完成,获取结果
result1 = await task1
result2 = await task2
print(f"最终结果:{result1}, {result2}")
# 方式2:用gather批量并发(更简洁,适合多任务)
# results = await asyncio.gather(
# task_func("A"),
# task_func("B"),
# task_func("C")
# )
# print(f"批量结果:{results}")
# 方式3:更优写法(Python 3.11+):用TaskGroup管理批量任务(推荐)
# 优势:自动管理任务生命周期,支持异常自动传播、批量取消,比gather更优雅
async def main_taskgroup():
async with asyncio.TaskGroup() as tg:
# 批量创建任务,无需手动收集
task1 = tg.create_task(task_func("A"))
task2 = tg.create_task(task_func("B"))
task3 = tg.create_task(task_func("C"))
# TaskGroup退出时,所有任务已完成
print(f"TaskGroup批量结果:{task1.result()}, {task2.result()}, {task3.result()}")
# 启动异步程序
if __name__ == "__main__":
asyncio.run(main())
# 启动TaskGroup版本
# asyncio.run(main_taskgroup())输出结果(并发执行,总耗时≈1秒):
任务A开始执行
任务B开始执行
任务A执行完成
任务B执行完成
最终结果:任务A结果, 任务B结果3. 实用场景:异步HTTP请求
Python异步最适合IO密集型场景(如网络请求、文件读写),以异步HTTP请求为例(需安装aiohttp库:pip install aiohttp):
import asyncio
import aiohttp
async def fetch_url(url, session):
async with session.get(url) as resp: # 异步上下文管理器
return await resp.text()[:100] # 仅返回前100个字符
async def main():
urls = [
"https://www.baidu.com",
"https://www.github.com",
"https://www.python.org"
]
# 异步创建HTTP会话,批量请求(原有写法:gather)
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(url, session) for url in urls]
results = await asyncio.gather(*tasks)
for url, result in zip(urls, results):
print(f"\n{url} 响应片段:{result}")
# 更优写法(Python 3.11+):用TaskGroup管理HTTP请求
# 优势:1. 无需手动收集tasks列表;2. 支持异常自动捕获,可批量取消任务;3. 代码更简洁
async def main_taskgroup():
urls = [
"https://www.baidu.com",
"https://www.github.com",
"https://www.python.org"
]
async with aiohttp.ClientSession() as session, asyncio.TaskGroup() as tg:
# 批量创建任务,用字典存储任务与URL的映射(便于获取结果)
task_map = {tg.create_task(fetch_url(url, session)): url for url in urls}
# 遍历任务获取结果
for task, url in task_map.items():
print(f"\n{url} 响应片段:{task.result()[:100]}")
if __name__ == "__main__":
asyncio.run(main())
# 启动TaskGroup版本
# asyncio.run(main_taskgroup())三、Python异步 vs Golang goroutine:全方位对比
很多开发者会拿两者对比,核心结论先摆出来:两者都是轻量级并发方案,只是基于不同设计理念,适配不同场景需求,各有优劣。具体对比如下:
| 对比维度 | Python 异步协程 | Golang goroutine |
|---|---|---|
| 启动方式 | 需先定义async函数,再通过create_task/gather启动,步骤相对规范,需遵循异步语法约束 | 直接用go 函数()启动,简洁直观,无需额外语法约束,上手门槛低 |
| 调度模型 | 单线程事件循环(用户态调度),需显式用await触发切换,调度逻辑可预测性强 | MPG模型(内核态+用户态结合调度),隐式切换,无需手动干预,调度灵活性高 |
| 阻塞处理 | 若在异步函数中使用同步阻塞操作(如requests、time.sleep),会阻塞整个事件循环,需配套使用异步库(如aiohttp、asyncio.sleep),对库的选型有一定要求 | 即使单个goroutine阻塞(如IO、time.Sleep),调度器会自动切换到其他goroutine,不影响整体并发,对开发者阻塞处理的要求较低 |
| 并发粒度 | 更适配IO密集型场景,单线程无法直接利用多核,CPU密集型需结合多进程协同,需额外处理进程间通信 | 适配IO密集型和CPU密集型多种场景,可通过GOMAXPROCS控制多核利用,并发适配范围更广 |
| 启动成本 | 协程占用内存小(≈几KB),可创建百万级协程,满足绝大多数高并发IO场景需求 | goroutine占用内存极小(≈2KB),可创建千万级goroutine,在超大规模并发场景下更具优势 |
| 生态支持 | 同步库生态极为丰富,异步库逐步完善,依托现有生态可快速实现异步改造,部分场景需封装异步逻辑 | 原生围绕高并发设计,标准库中异步相关工具(如channel、sync包)完善,高并发场景下生态适配性强 |
| 学习成本 | 需理解事件循环、可等待对象等概念,区分同步/异步库差异,需掌握的概念较多,学习曲线稍陡 | 核心只需掌握go关键字和channel通信,核心概念简洁,学习曲线平缓,上手速度快 |
核心差异的本质原因
两者的差异核心源于语言设计初心与生态基础:
Golang从语言层面原生支持并发,调度器深度集成内核;
Python的异步是基于现有生态的渐进式优化,依托单线程事件循环实现,受限于GIL(全局解释器锁)无法直接利用多核,但可与丰富的同步生态兼容,适合增量改造现有项目;
四、Python异步避坑指南(新手必看)
很多开发者用不好Python异步,核心是踩了“同步/异步混用”“误解并发本质”的坑,总结4个常见问题:
1. 坑1:同步阻塞操作阻塞事件循环
错误示例:在async函数中用time.sleep(1)(同步阻塞),会导致整个事件循环卡住,所有协程都无法执行;
解决方案:用asyncio.sleep(1)替代;HTTP请求用aiohttp替代requests,数据库操作用aiomysql替代pymysql。
2. 坑2:await滥用或误用
错误示例:在普通函数中使用await,或await非可等待对象(如普通整数、字符串);
解决方案:await仅能在async函数内使用,且只能跟“可等待对象”(协程、Task、Future)。
3. 坑3:误解“并发”=“并行”
Python异步是“单线程并发”,同一时间只有一个任务在执行,切换是“伪并行”;若需要利用多核,需结合multiprocessing库,用多进程+异步的组合方案。
4. 坑4:忽略任务异常处理
错误示例:未捕获Task的异常,会导致程序崩溃;
解决方案:用try/except捕获await的异常,或用task.add_done_callback()处理任务完成后的异常。
五、Python处理CPU密集型任务的方案(异步+多进程协同)
Python异步协程本身基于单线程,无法直接利用多核资源,处理CPU密集型任务(如大规模计算、数据加密解密等)时效率较低。核心解决方案是**“异步协程+多进程”协同**:用多进程利用多核,每个进程内用异步协程处理IO操作,兼顾CPU利用率和IO效率。
1. 核心实现方案
常用两种实现方式,根据场景选择:
方案一:asyncio + concurrent.futures.ProcessPoolExecutor:适合简单场景,将CPU密集型任务提交到进程池,异步等待结果返回;
方案二:multiprocessing + asyncio:适合复杂场景,手动创建多进程,每个进程启动独立事件循环,通过进程间通信(IPC)协同任务。
2. 方案一:asyncio + ProcessPoolExecutor(简单场景示例)
利用concurrent.futures.ProcessPoolExecutor创建进程池,可通过两种方式实现异步等待CPU密集型任务:一种是asyncio.wrap_future()手动包装(兼容性好),另一种是asyncio.run_in_executor()简化写法(更简洁)。以下分场景演示两种写法的实现:
场景1:批量无依赖CPU密集型任务
子场景1.1:用wrap_future手动包装
import asyncio
import concurrent.futures
import math
# 1. 定义CPU密集型同步函数(示例:大规模素数判断)
def is_prime(n):
"""判断一个数是否为素数(CPU密集型操作)"""
if n <= 1:
return False
if n == 2:
return True
if n % 2 == 0:
return False
for i in range(3, int(math.sqrt(n)) + 1, 2):
if n % i == 0:
return False
return True
def cpu_intensive_task(numbers):
"""批量处理素数判断(CPU密集型)"""
return [num for num in numbers if is_prime(num)]
# 2. 异步主函数:提交CPU密集型任务到进程池
async def main_wrap_future():
# 待处理的大规模数字列表(模拟CPU密集型任务)
task_data = [
[1000000007, 1000000009, 1000000021, 1000000033] * 5, # 任务1
[2000000003, 2000000009, 2000000027, 2000000039] * 5, # 任务2
[3000000007, 3000000011, 3000000013, 3000000023] * 5 # 任务3
]
# 创建进程池(进程数建议等于CPU核心数)
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
# 批量提交任务到进程池,手动用wrap_future包装为异步可等待对象
tasks = [
asyncio.wrap_future(executor.submit(cpu_intensive_task, data))
for data in task_data
]
# 异步等待所有CPU密集型任务完成
results = await asyncio.gather(*tasks)
# 输出结果
for i, result in enumerate(results, 1):
print(f"任务{i}完成,找到素数:{result}")
if __name__ == "__main__":
asyncio.run(main_wrap_future())子场景1.2:用run_in_executor简化写法(推荐)
核心优势:无需手动调用wrap_future(),run_in_executor直接返回可await的对象,代码更简洁,是Python 3.5+的推荐写法。
import asyncio
import concurrent.futures
import math
# 复用CPU密集型同步函数(与子场景1.1一致)
def is_prime(n):
if n <= 1:
return False
if n == 2:
return True
if n % 2 == 0:
return False
for i in range(3, int(math.sqrt(n)) + 1, 2):
if n % i == 0:
return False
return True
def cpu_intensive_task(numbers):
return [num for num in numbers if is_prime(num)]
# 异步主函数:用run_in_executor简化实现(原有写法)
async def main_run_in_executor():
task_data = [
[1000000007, 1000000009, 1000000021, 1000000033] * 5,
[2000000003, 2000000009, 2000000027, 2000000039] * 5,
[3000000007, 3000000011, 3000000013, 3000000023] * 5
]
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
# 获取当前运行的事件循环(3.7+推荐用get_running_loop,比get_event_loop更安全)
loop = asyncio.get_running_loop()
# 批量提交任务:run_in_executor直接封装“进程池+函数+参数”,返回可await对象
tasks = [
loop.run_in_executor(executor, cpu_intensive_task, data)
for data in task_data
]
results = await asyncio.gather(*tasks)
for i, result in enumerate(results, 1):
print(f"任务{i}完成,找到素数:{result}")
# 更优写法(Python 3.11+):用TaskGroup+run_in_executor管理批量CPU密集型任务
# 优势:1. 支持异常自动捕获,某一个任务失败不影响其他任务(可通过try/except控制);2. 代码更简洁,无需手动收集tasks;3. 支持批量取消任务
async def main_run_in_executor_taskgroup():
task_data = [
[1000000007, 1000000009, 1000000021, 1000000033] * 5,
[2000000003, 2000000009, 2000000027, 2000000039] * 5,
[3000000007, 3000000011, 3000000013, 3000000023] * 5
]
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
loop = asyncio.get_running_loop()
async with asyncio.TaskGroup() as tg:
# 用列表存储任务,便于后续获取结果(按顺序)
tasks = [
tg.create_task(loop.run_in_executor(executor, cpu_intensive_task, data))
for data in task_data
]
# TaskGroup退出时,所有任务已完成
for i, task in enumerate(tasks, 1):
print(f"任务{i}完成,找到素数:{task.result()}")
if __name__ == "__main__":
asyncio.run(main_run_in_executor())
# 启动TaskGroup版本
# asyncio.run(main_run_in_executor_taskgroup())场景2:多个异步任务依次依赖返回结果
子场景2.1:用wrap_future实现
核心逻辑:前一个任务的输出作为后一个任务的输入,通过await依次获取任务结果,实现递进式执行。示例中设计3个依赖任务:任务1获取素数列表 → 任务2计算素数平方和 → 任务3基于平方和计算最终结果。
import asyncio
import concurrent.futures
import math
# 1. 定义3个递进式CPU密集型函数(存在依赖关系)
def get_primes(numbers):
"""任务1:获取指定数字列表中的素数(基础数据准备)""" def is_prime(n):
if n <= 1:
return False
if n == 2:
return True
if n % 2 == 0:
return False
for i in range(3, int(math.sqrt(n)) + 1, 2):
if n % i == 0:
return False
return True
return [num for num in numbers if is_prime(num)]
def calc_square_sum(primes):
"""任务2:计算素数列表的平方和(依赖任务1的结果)""" return sum(num*num for num in primes)
def calc_final_result(square_sum, factor=2):
"""任务3:基于平方和计算最终结果(依赖任务2的结果)""" return square_sum * factor # 示例逻辑:平方和乘以系数
# 2. 异步主函数:用wrap_future实现依赖任务
async def main_dependent_wrap_future():
# 初始数据(将传递给第一个任务)
initial_numbers = [1000000007, 1000000009, 1000000021, 1000000033, 5000000003]
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
# 步骤1:执行任务1,获取素数列表(无前置依赖)
task1 = asyncio.wrap_future(executor.submit(get_primes, initial_numbers))
primes = await task1
print(f"任务1完成,获取到素数:{primes}")
# 步骤2:执行任务2,传入任务1的结果计算平方和(依赖任务1)
task2 = asyncio.wrap_future(executor.submit(calc_square_sum, primes))
square_sum = await task2
print(f"任务2完成,素数平方和:{square_sum}")
# 步骤3:执行任务3,传入任务2的结果计算最终值(依赖任务2)
task3 = asyncio.wrap_future(executor.submit(calc_final_result, square_sum))
final_result = await task3
print(f"任务3完成,最终结果:{final_result}")
if __name__ == "__main__":
asyncio.run(main_dependent_wrap_future())子场景2.2:用run_in_executor简化实现(素数计算依赖场景)
延续子场景2.1的素数计算场景,演示依赖任务的简化实现:任务1获取素数列表 → 任务2计算素数平方和 → 任务3基于平方和计算最终结果,用run_in_executor简化异步等待逻辑。
import asyncio
import concurrent.futures
import math
# 1. 定义3个递进式CPU密集型函数(存在依赖关系,与子场景2.1一致)
def get_primes(numbers):
"""任务1:获取指定数字列表中的素数(基础数据准备,CPU密集型)""" def is_prime(n):
if n <= 1:
return False
if n == 2:
return True
if n % 2 == 0:
return False
for i in range(3, int(math.sqrt(n)) + 1, 2):
if n % i == 0:
return False
return True
return [num for num in numbers if is_prime(num)]
def calc_square_sum(primes):
"""任务2:计算素数列表的平方和(依赖任务1结果,CPU密集型)""" return sum(num*num for num in primes)
def calc_final_result(square_sum, factor=2):
"""任务3:基于平方和计算最终结果(依赖任务2结果,CPU密集型)""" return square_sum * factor # 示例逻辑:平方和乘以系数
# 异步主函数:用run_in_executor实现依赖任务(简化写法)
async def main_dependent_run_in_executor():
# 初始数据(将传递给第一个任务)
initial_numbers = [1000000007, 1000000009, 1000000021, 1000000033, 5000000003]
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
loop = asyncio.get_running_loop()
# 步骤1:执行任务1,获取素数列表(无前置依赖)
primes = await loop.run_in_executor(
executor, get_primes, initial_numbers
)
print(f"任务1完成,获取到素数:{primes}")
# 步骤2:执行任务2,传入任务1的结果计算平方和(依赖任务1)
square_sum = await loop.run_in_executor(
executor, calc_square_sum, primes
)
print(f"任务2完成,素数平方和:{square_sum}")
# 步骤3:执行任务3,传入任务2的结果计算最终值(依赖任务2)
final_result = await loop.run_in_executor(
executor, calc_final_result, square_sum
)
print(f"任务3完成,最终结果:{final_result}")
# 更优写法:增加异常处理+任务超时控制(生产环境必备)
# 优势:1. 避免单个任务阻塞导致整体卡死;2. 异常可捕获,程序更健壮;3. 符合生产环境容错要求
async def main_dependent_robust(timeout=10):
initial_numbers = [1000000007, 1000000009, 1000000021, 1000000033, 5000000003]
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
loop = asyncio.get_running_loop()
try:
# 步骤1:获取素数(设置超时,避免无限等待)
primes = await asyncio.wait_for(
loop.run_in_executor(executor, get_primes, initial_numbers),
timeout=timeout
)
print(f"任务1完成,获取到素数:{primes}")
# 步骤2:计算平方和
square_sum = await asyncio.wait_for(
loop.run_in_executor(executor, calc_square_sum, primes),
timeout=timeout
)
print(f"任务2完成,素数平方和:{square_sum}")
# 步骤3:计算最终结果
final_result = await asyncio.wait_for(
loop.run_in_executor(executor, calc_final_result, square_sum),
timeout=timeout
)
print(f"任务3完成,最终结果:{final_result}")
return final_result
except asyncio.TimeoutError:
print(f"错误:任务执行超时(超过{timeout}秒)")
return None
except Exception as e:
print(f"错误:任务执行失败 - {str(e)}")
return None
# 启动执行
if __name__ == "__main__":
# 运行简化版本
# asyncio.run(main_dependent_run_in_executor())
# 运行健壮版本(推荐生产环境使用)
asyncio.run(main_dependent_robust())依赖场景说明:
- 执行顺序严格遵循“任务1→任务2→任务3”,前一个任务未完成时,后一个任务不会启动,通过await保证依赖顺序;
- 与子场景2.1(wrap_future写法)核心差异:
run_in_executor省去了executor.submit()+asyncio.wrap_future()的手动包装步骤,直接将“进程池+函数+参数”传入即可获得可await对象,代码更精简、可读性更高; - 适用场景:需要递进式计算的CPU密集型任务(如“数据筛选→数据计算→结果推导”);
- 输出示例:
任务1完成,获取到素数:[1000000007, 1000000009, 1000000021, 1000000033, 5000000003] 任务2完成,素数平方和:2800000048000000244 任务3完成,最终结果:5600000096000000488
`
3. 方案二:multiprocessing + asyncio(复杂场景示例)
手动创建多进程,每个进程启动独立的事件循环,通过multiprocessing.Queue传递任务和结果,适合需要精细控制进程生命周期的场景。
import asyncio
import multiprocessing
import math
# 1. 进程内执行的异步函数:处理从队列接收的CPU密集型任务
async def process_worker(task_queue, result_queue):
"""每个进程的工作函数:从队列取任务,处理后将结果存入队列"""
def is_prime(n):
if n <= 1:
return False
if n == 2:
return True
if n % 2 == 0:
return False
for i in range(3, int(math.sqrt(n)) + 1, 2):
if n % i == 0:
return False
return True
while True:
# 从队列获取任务(None表示结束信号)
task = await asyncio.to_thread(task_queue.get)
if task is None:
break
task_id, numbers = task
# 处理CPU密集型任务
primes = [num for num in numbers if is_prime(num)]
# 存入结果队列
await asyncio.to_thread(result_queue.put, (task_id, primes))
# 2. 启动进程并运行事件循环
def start_process(task_queue, result_queue):
"""启动进程内的事件循环"""
asyncio.run(process_worker(task_queue, result_queue))
# 3. 主进程:分发任务、收集结果
async def main():
# 创建进程间通信队列
task_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()
# 启动4个进程
processes = []
for _ in range(4):
p = multiprocessing.Process(
target=start_process,
args=(task_queue, result_queue)
)
p.start()
processes.append(p)
# 分发任务(任务ID + 待处理数字列表)
task_data = [
(1, [1000000007, 1000000009, 1000000021]),
(2, [2000000003, 2000000009, 2000000027]),
(3, [3000000007, 3000000011, 3000000013]),
(4, [4000000007, 4000000009, 4000000043])
]
for task in task_data:
task_queue.put(task)
# 发送结束信号(每个进程1个None)
for _ in range(4):
task_queue.put(None)
# 收集结果
results = {}
while len(results) < len(task_data):
task_id, primes = await asyncio.to_thread(result_queue.get)
results[task_id] = primes
# 输出结果
for task_id in sorted(results.keys()):
print(f"任务{task_id}完成,找到素数:{results[task_id]}")
# 等待所有进程结束
for p in processes:
p.join()
# 更优写法:用multiprocessing.Manager优化进程间通信(适合复杂场景)
# 优势:1. Manager.Queue支持更多数据类型,且可在多线程+多进程混合场景使用;2. 支持设置队列大小,避免内存溢出;3. 可通过Manager共享其他数据结构(如字典)
async def main_optimized():
# 用Manager创建进程间共享队列(设置最大容量,避免无限堆积)
with multiprocessing.Manager() as manager:
task_queue = manager.Queue(maxsize=10) # 最大容量10,超过则阻塞put操作
result_queue = manager.Queue(maxsize=10)
# 启动4个进程
processes = []
for _ in range(4):
p = multiprocessing.Process(
target=start_process,
args=(task_queue, result_queue)
)
p.start()
processes.append(p)
# 分发任务(带异常处理,避免队列满导致阻塞)
task_data = [
(1, [1000000007, 1000000009, 1000000021]),
(2, [2000000003, 2000000009, 2000000027]),
(3, [3000000007, 3000000011, 3000000013]),
(4, [4000000007, 4000000009, 4000000043])
]
for task in task_data:
try:
task_queue.put(task, block=True, timeout=5) # 5秒超时,避免无限等待
except multiprocessing.queues.Full:
print(f"警告:任务队列已满,无法放入任务{task[0]}")
# 发送结束信号
for _ in range(4):
task_queue.put(None, block=True, timeout=5)
# 收集结果
results = {}
while len(results) < len(task_data):
try:
task_id, primes = await asyncio.to_thread(
result_queue.get, block=True, timeout=10
)
results[task_id] = primes
except multiprocessing.queues.Empty:
print(f"警告:结果队列超时为空,当前已收集{len(results)}个结果")
break
# 输出结果
for task_id in sorted(results.keys()):
print(f"任务{task_id}完成,找到素数:{results[task_id]}")
# 等待所有进程结束,强制终止超时进程
for p in processes:
p.join(timeout=5)
if p.is_alive():
print(f"警告:进程{p.pid}超时未结束,强制终止")
p.terminate()
if __name__ == "__main__":
asyncio.run(main())
# 启动优化版本
# asyncio.run(main_optimized())关键说明:
- 进程数建议设置为CPU核心数(如4核CPU设为4),避免进程切换开销;
asyncio.to_thread()(Python 3.9+)可便捷地将同步的队列操作(如queue.get())包装为异步任务,避免阻塞事件循环;- 复杂场景下,也可使用
multiprocessing.Manager创建共享内存或字典,实现更灵活的进程间数据共享。
六、总结:该选Python异步还是Golang goroutine?
没有绝对的优劣,只有适合的场景:
若你是Python开发者,需要处理高并发IO场景(如爬虫、API服务),且不想切换语言:优先用Python异步协程,配合aiohttp、aiomysql等库,能大幅提升效率;
若你需要开发高并发中间件、微服务,或涉及CPU密集型场景:优先选Golang,goroutine的调度优势和生态完善度,能让并发编程更轻松;
若你追求“快速上手、低心智负担”:Golang的goroutine更具优势,一行代码启动并发,无需过多关注底层调度;Python异步则需要掌握更多概念,适合愿意深入理解异步原理的开发者;
最后,记住核心原则:Python异步适合“IO密集型+Python生态依赖 +增量改造 ”的场景, 处理CPU密集型需搭配多进程; Golang适合“高并发 多场景适配+快速上手+全新项目开发 ”的场景。根据自己的技术栈、项目需求选择即可。
如果觉得本文对你有帮助,欢迎转发分享。你在使用Python异步或Golang goroutine时遇到过哪些问题?评论区聊聊!