Skip to content

从Golang goroutine看Python异步:历程、用法与终极对比

约 6372 字大约 21 分钟

Python语言特性

2025-12-23

下午和同事review代码时,看到Python异步并发的场景,不禁怀念起 Golang 中用go关键字启动异步的优雅与简洁。那 Python 中是否也能实现如此优雅的异步调用? 带着这个疑问,我翻阅了不少资料与文档,整理成这篇文章。接下来,我们就从 Python 异步的版本进化史说起,梳理其核心用法,再与 Golang goroutine 进行全方位对比,帮你彻底搞懂 Python 异步协程。

本文较长,可收藏细看。

一、Python异步的“进化之路”

Python的异步能力并非一蹴而就,而是经历了多版本迭代,从早期的“曲线救国”到如今的“原生支持”,核心是为了解决高并发IO场景的效率问题。

1. 早期探索:yield生成器模拟(2.x ~ 3.3)

在Python 3.4之前,并没有官方的异步框架,开发者只能通过**生成器(yield/yield from)**模拟协程的“暂停/恢复”逻辑。

原理是利用生成器的迭代特性,手动控制函数执行流程,但这种方式需要开发者自己处理调度逻辑,代码繁琐且易出错,只能作为小众方案使用。

2. 首次标准化:asyncio纳入标准库(3.4)

2014年发布的Python 3.4,正式将asyncio库纳入标准库,标志着Python官方支持异步编程。

但此时的语法仍不友好,需要通过@asyncio.coroutine装饰器定义协程,用yield from实现暂停等待,比如:


import asyncio

@asyncio.coroutine
def hello():
    yield from asyncio.sleep(1)  # 模拟IO等待
    print("Hello")

loop = asyncio.get_event_loop()
loop.run_until_complete(hello())
loop.close()

虽然有了官方框架,但装饰器+yield from的组合学习成本高,难以推广。

3. 语法革命:async/await语法糖(3.5)

2015年Python 3.5的发布,带来了异步编程的“里程碑”——引入async defawait关键字,彻底简化了协程的定义和使用。

原来的装饰器和yield from被逐步替代,代码可读性大幅提升,这也是我们现在使用的核心语法,比如:


import asyncio

async def hello():  # 直接用async def定义协程
    await asyncio.sleep(1)  # 用await暂停等待
    print("Hello")

loop = asyncio.get_event_loop()
loop.run_until_complete(hello())

4. 持续优化:简化启动与增强功能(3.7+)

后续版本持续优化异步体验,降低上手门槛:

  • 3.7:新增asyncio.run()函数,一键启动事件循环,替代了之前繁琐的get_event_loop()+run_until_complete()组合;

  • 3.8asyncio.create_task()成为顶层函数,简化了任务的创建;同时支持在异步函数中使用await表达式作为赋值语句的右侧;

  • 3.10:引入TaskGroup(实验性,3.11正式稳定),更优雅地管理批量任务,支持自动取消和异常处理;

  • 3.11:针对IO密集型场景的吞吐量需求,大幅优化asyncio底层调度逻辑,协程切换速度提升3倍以上,显著缩小了与Golang在高并发IO场景下的性能差距;

  • 3.12asyncio.TaskGroup功能进一步增强,支持嵌套使用,可更灵活地管理分层任务;优化异步迭代器性能,降低async for循环的开销;新增asyncio.to_thread()函数,可更便捷地将同步函数包装为异步任务,减少同步/异步混用的适配成本;新增asyncio.eager_task_factory()asyncio.create_eager_task_factory()函数,提供“急切执行”的任务工厂实现,让任务创建后立即执行(无需等待事件循环下一轮调度),显著降低任务启动延迟,适配低延迟并发场景;

  • 3.13(预览特性):重点优化异步IO的底层实现,提升大并发场景下的任务调度吞吐量;计划增强与多进程的协同能力,简化“异步+多进程”混合编程的复杂度;同时优化异步异常的捕获与堆栈信息,便于问题排查。

二、Python异步协程核心用法

经过多版本迭代,Python异步协程的核心用法已非常稳定,掌握以下4个关键点,就能应对大部分高并发IO场景。

1. 核心概念速览

概念作用通俗理解
async def定义协程函数,调用后返回协程对象(不立即执行)“异步任务的模板”
await挂起协程,等待异步操作完成后恢复执行,仅能在async def内使用“暂停当前任务,去做其他事,做完再回来”
事件循环(Event Loop)管理协程的执行、切换和调度,是异步编程的核心“任务调度员”
Task将协程包装为可并发执行的任务,由事件循环管理“被调度员管理的具体任务”

2. 最简并发示例(类比Golang go关键字)

Golang用go func()启动并发任务,Python则用asyncio.create_task()asyncio.gather()实现,示例如下:

import asyncio

# 定义协程函数(异步任务)
async def task_func(name):
    print(f"任务{name}开始执行")
    await asyncio.sleep(1)  # 模拟IO操作(非阻塞)
    print(f"任务{name}执行完成")
    return f"任务{name}结果"

# 主协程(异步程序入口)
async def main():
    # 方式1:用create_task创建任务(类比go 关键字)
    task1 = asyncio.create_task(task_func("A"))
    task2 = asyncio.create_task(task_func("B"))
    
    # 等待任务完成,获取结果
    result1 = await task1
    result2 = await task2
    print(f"最终结果:{result1}, {result2}")

    # 方式2:用gather批量并发(更简洁,适合多任务)
    # results = await asyncio.gather(
    #     task_func("A"),
    #     task_func("B"),
    #     task_func("C")
    # )
    # print(f"批量结果:{results}")

# 方式3:更优写法(Python 3.11+):用TaskGroup管理批量任务(推荐)
# 优势:自动管理任务生命周期,支持异常自动传播、批量取消,比gather更优雅
async def main_taskgroup():
    async with asyncio.TaskGroup() as tg:
        # 批量创建任务,无需手动收集
        task1 = tg.create_task(task_func("A"))
        task2 = tg.create_task(task_func("B"))
        task3 = tg.create_task(task_func("C"))
    # TaskGroup退出时,所有任务已完成
    print(f"TaskGroup批量结果:{task1.result()}, {task2.result()}, {task3.result()}")

# 启动异步程序
if __name__ == "__main__":
    asyncio.run(main())
    # 启动TaskGroup版本
    # asyncio.run(main_taskgroup())

输出结果(并发执行,总耗时≈1秒):


任务A开始执行
任务B开始执行
任务A执行完成
任务B执行完成
最终结果:任务A结果, 任务B结果

3. 实用场景:异步HTTP请求

Python异步最适合IO密集型场景(如网络请求、文件读写),以异步HTTP请求为例(需安装aiohttp库:pip install aiohttp):



import asyncio
import aiohttp

async def fetch_url(url, session):
    async with session.get(url) as resp:  # 异步上下文管理器
        return await resp.text()[:100]  # 仅返回前100个字符

async def main():
    urls = [
        "https://www.baidu.com",
        "https://www.github.com",
        "https://www.python.org"
    ]
    
    # 异步创建HTTP会话,批量请求(原有写法:gather)
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(url, session) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for url, result in zip(urls, results):
            print(f"\n{url} 响应片段:{result}")

# 更优写法(Python 3.11+):用TaskGroup管理HTTP请求
# 优势:1. 无需手动收集tasks列表;2. 支持异常自动捕获,可批量取消任务;3. 代码更简洁
async def main_taskgroup():
    urls = [
        "https://www.baidu.com",
        "https://www.github.com",
        "https://www.python.org"
    ]
    
    async with aiohttp.ClientSession() as session, asyncio.TaskGroup() as tg:
        # 批量创建任务,用字典存储任务与URL的映射(便于获取结果)
        task_map = {tg.create_task(fetch_url(url, session)): url for url in urls}
    
    # 遍历任务获取结果
    for task, url in task_map.items():
        print(f"\n{url} 响应片段:{task.result()[:100]}")

if __name__ == "__main__":
    asyncio.run(main())
    # 启动TaskGroup版本
    # asyncio.run(main_taskgroup())

三、Python异步 vs Golang goroutine:全方位对比

很多开发者会拿两者对比,核心结论先摆出来:两者都是轻量级并发方案,只是基于不同设计理念,适配不同场景需求,各有优劣。具体对比如下:

对比维度Python 异步协程Golang goroutine
启动方式需先定义async函数,再通过create_task/gather启动,步骤相对规范,需遵循异步语法约束直接用go 函数()启动,简洁直观,无需额外语法约束,上手门槛低
调度模型单线程事件循环(用户态调度),需显式用await触发切换,调度逻辑可预测性强MPG模型(内核态+用户态结合调度),隐式切换,无需手动干预,调度灵活性高
阻塞处理若在异步函数中使用同步阻塞操作(如requests、time.sleep),会阻塞整个事件循环,需配套使用异步库(如aiohttp、asyncio.sleep),对库的选型有一定要求即使单个goroutine阻塞(如IO、time.Sleep),调度器会自动切换到其他goroutine,不影响整体并发,对开发者阻塞处理的要求较低
并发粒度更适配IO密集型场景,单线程无法直接利用多核,CPU密集型需结合多进程协同,需额外处理进程间通信适配IO密集型和CPU密集型多种场景,可通过GOMAXPROCS控制多核利用,并发适配范围更广
启动成本协程占用内存小(≈几KB),可创建百万级协程,满足绝大多数高并发IO场景需求goroutine占用内存极小(≈2KB),可创建千万级goroutine,在超大规模并发场景下更具优势
生态支持同步库生态极为丰富,异步库逐步完善,依托现有生态可快速实现异步改造,部分场景需封装异步逻辑原生围绕高并发设计,标准库中异步相关工具(如channel、sync包)完善,高并发场景下生态适配性强
学习成本需理解事件循环、可等待对象等概念,区分同步/异步库差异,需掌握的概念较多,学习曲线稍陡核心只需掌握go关键字和channel通信,核心概念简洁,学习曲线平缓,上手速度快

核心差异的本质原因

两者的差异核心源于语言设计初心与生态基础

  • Golang从语言层面原生支持并发,调度器深度集成内核;

  • Python的异步是基于现有生态的渐进式优化,依托单线程事件循环实现,受限于GIL(全局解释器锁)无法直接利用多核,但可与丰富的同步生态兼容,适合增量改造现有项目;

四、Python异步避坑指南(新手必看)

很多开发者用不好Python异步,核心是踩了“同步/异步混用”“误解并发本质”的坑,总结4个常见问题:

1. 坑1:同步阻塞操作阻塞事件循环

错误示例:在async函数中用time.sleep(1)(同步阻塞),会导致整个事件循环卡住,所有协程都无法执行;

解决方案:用asyncio.sleep(1)替代;HTTP请求用aiohttp替代requests,数据库操作用aiomysql替代pymysql

2. 坑2:await滥用或误用

错误示例:在普通函数中使用await,或await非可等待对象(如普通整数、字符串);

解决方案:await仅能在async函数内使用,且只能跟“可等待对象”(协程、Task、Future)。

3. 坑3:误解“并发”=“并行”

Python异步是“单线程并发”,同一时间只有一个任务在执行,切换是“伪并行”;若需要利用多核,需结合multiprocessing库,用多进程+异步的组合方案。

4. 坑4:忽略任务异常处理

错误示例:未捕获Task的异常,会导致程序崩溃;

解决方案:用try/except捕获await的异常,或用task.add_done_callback()处理任务完成后的异常。

五、Python处理CPU密集型任务的方案(异步+多进程协同)

Python异步协程本身基于单线程,无法直接利用多核资源,处理CPU密集型任务(如大规模计算、数据加密解密等)时效率较低。核心解决方案是**“异步协程+多进程”协同**:用多进程利用多核,每个进程内用异步协程处理IO操作,兼顾CPU利用率和IO效率。

1. 核心实现方案

常用两种实现方式,根据场景选择:

  • 方案一:asyncio + concurrent.futures.ProcessPoolExecutor:适合简单场景,将CPU密集型任务提交到进程池,异步等待结果返回;

  • 方案二:multiprocessing + asyncio:适合复杂场景,手动创建多进程,每个进程启动独立事件循环,通过进程间通信(IPC)协同任务。

2. 方案一:asyncio + ProcessPoolExecutor(简单场景示例)

利用concurrent.futures.ProcessPoolExecutor创建进程池,可通过两种方式实现异步等待CPU密集型任务:一种是asyncio.wrap_future()手动包装(兼容性好),另一种是asyncio.run_in_executor()简化写法(更简洁)。以下分场景演示两种写法的实现:

场景1:批量无依赖CPU密集型任务

子场景1.1:用wrap_future手动包装

import asyncio
import concurrent.futures
import math

# 1. 定义CPU密集型同步函数(示例:大规模素数判断)
def is_prime(n):
    """判断一个数是否为素数(CPU密集型操作)"""
    if n <= 1:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    for i in range(3, int(math.sqrt(n)) + 1, 2):
        if n % i == 0:
            return False
    return True

def cpu_intensive_task(numbers):
    """批量处理素数判断(CPU密集型)"""
    return [num for num in numbers if is_prime(num)]

# 2. 异步主函数:提交CPU密集型任务到进程池
async def main_wrap_future():
    # 待处理的大规模数字列表(模拟CPU密集型任务)
    task_data = [
        [1000000007, 1000000009, 1000000021, 1000000033] * 5,  # 任务1
        [2000000003, 2000000009, 2000000027, 2000000039] * 5,  # 任务2
        [3000000007, 3000000011, 3000000013, 3000000023] * 5   # 任务3
    ]
    
    # 创建进程池(进程数建议等于CPU核心数)
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        # 批量提交任务到进程池,手动用wrap_future包装为异步可等待对象
        tasks = [
            asyncio.wrap_future(executor.submit(cpu_intensive_task, data))
            for data in task_data
        ]
        
        # 异步等待所有CPU密集型任务完成
        results = await asyncio.gather(*tasks)
        
        # 输出结果
        for i, result in enumerate(results, 1):
            print(f"任务{i}完成,找到素数:{result}")

if __name__ == "__main__":
    asyncio.run(main_wrap_future())
子场景1.2:用run_in_executor简化写法(推荐)

核心优势:无需手动调用wrap_future()run_in_executor直接返回可await的对象,代码更简洁,是Python 3.5+的推荐写法。




import asyncio
import concurrent.futures
import math

# 复用CPU密集型同步函数(与子场景1.1一致)
def is_prime(n):
    if n <= 1:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    for i in range(3, int(math.sqrt(n)) + 1, 2):
        if n % i == 0:
            return False
    return True

def cpu_intensive_task(numbers):
    return [num for num in numbers if is_prime(num)]

# 异步主函数:用run_in_executor简化实现(原有写法)
async def main_run_in_executor():
    task_data = [
        [1000000007, 1000000009, 1000000021, 1000000033] * 5,
        [2000000003, 2000000009, 2000000027, 2000000039] * 5,
        [3000000007, 3000000011, 3000000013, 3000000023] * 5
    ]
    
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        # 获取当前运行的事件循环(3.7+推荐用get_running_loop,比get_event_loop更安全)
        loop = asyncio.get_running_loop()
        
        # 批量提交任务:run_in_executor直接封装“进程池+函数+参数”,返回可await对象
        tasks = [
            loop.run_in_executor(executor, cpu_intensive_task, data)
            for data in task_data
        ]
        
        results = await asyncio.gather(*tasks)
        for i, result in enumerate(results, 1):
            print(f"任务{i}完成,找到素数:{result}")

# 更优写法(Python 3.11+):用TaskGroup+run_in_executor管理批量CPU密集型任务
# 优势:1. 支持异常自动捕获,某一个任务失败不影响其他任务(可通过try/except控制);2. 代码更简洁,无需手动收集tasks;3. 支持批量取消任务
async def main_run_in_executor_taskgroup():
    task_data = [
        [1000000007, 1000000009, 1000000021, 1000000033] * 5,
        [2000000003, 2000000009, 2000000027, 2000000039] * 5,
        [3000000007, 3000000011, 3000000013, 3000000023] * 5
    ]
    
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        loop = asyncio.get_running_loop()
        async with asyncio.TaskGroup() as tg:
            # 用列表存储任务,便于后续获取结果(按顺序)
            tasks = [
                tg.create_task(loop.run_in_executor(executor, cpu_intensive_task, data))
                for data in task_data
            ]
        # TaskGroup退出时,所有任务已完成
        for i, task in enumerate(tasks, 1):
            print(f"任务{i}完成,找到素数:{task.result()}")

if __name__ == "__main__":
    asyncio.run(main_run_in_executor())
    # 启动TaskGroup版本
    # asyncio.run(main_run_in_executor_taskgroup())

场景2:多个异步任务依次依赖返回结果

子场景2.1:用wrap_future实现

核心逻辑:前一个任务的输出作为后一个任务的输入,通过await依次获取任务结果,实现递进式执行。示例中设计3个依赖任务:任务1获取素数列表 → 任务2计算素数平方和 → 任务3基于平方和计算最终结果。



import asyncio
import concurrent.futures
import math

# 1. 定义3个递进式CPU密集型函数(存在依赖关系)
def get_primes(numbers):
    """任务1:获取指定数字列表中的素数(基础数据准备)"""    def is_prime(n):
        if n <= 1:
            return False
        if n == 2:
            return True
        if n % 2 == 0:
            return False
        for i in range(3, int(math.sqrt(n)) + 1, 2):
            if n % i == 0:
                return False
        return True
    return [num for num in numbers if is_prime(num)]

def calc_square_sum(primes):
    """任务2:计算素数列表的平方和(依赖任务1的结果)"""    return sum(num*num for num in primes)

def calc_final_result(square_sum, factor=2):
    """任务3:基于平方和计算最终结果(依赖任务2的结果)"""    return square_sum * factor  # 示例逻辑:平方和乘以系数

# 2. 异步主函数:用wrap_future实现依赖任务
async def main_dependent_wrap_future():
    # 初始数据(将传递给第一个任务)
    initial_numbers = [1000000007, 1000000009, 1000000021, 1000000033, 5000000003]
    
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        # 步骤1:执行任务1,获取素数列表(无前置依赖)
        task1 = asyncio.wrap_future(executor.submit(get_primes, initial_numbers))
        primes = await task1
        print(f"任务1完成,获取到素数:{primes}")
        
        # 步骤2:执行任务2,传入任务1的结果计算平方和(依赖任务1)
        task2 = asyncio.wrap_future(executor.submit(calc_square_sum, primes))
        square_sum = await task2
        print(f"任务2完成,素数平方和:{square_sum}")
        
        # 步骤3:执行任务3,传入任务2的结果计算最终值(依赖任务2)
        task3 = asyncio.wrap_future(executor.submit(calc_final_result, square_sum))
        final_result = await task3
        print(f"任务3完成,最终结果:{final_result}")

if __name__ == "__main__":
    asyncio.run(main_dependent_wrap_future())
子场景2.2:用run_in_executor简化实现(素数计算依赖场景)

延续子场景2.1的素数计算场景,演示依赖任务的简化实现:任务1获取素数列表 → 任务2计算素数平方和 → 任务3基于平方和计算最终结果,用run_in_executor简化异步等待逻辑。


import asyncio
import concurrent.futures
import math

# 1. 定义3个递进式CPU密集型函数(存在依赖关系,与子场景2.1一致)
def get_primes(numbers):
    """任务1:获取指定数字列表中的素数(基础数据准备,CPU密集型)"""    def is_prime(n):
        if n <= 1:
            return False
        if n == 2:
            return True
        if n % 2 == 0:
            return False
        for i in range(3, int(math.sqrt(n)) + 1, 2):
            if n % i == 0:
                return False
        return True
    return [num for num in numbers if is_prime(num)]

def calc_square_sum(primes):
    """任务2:计算素数列表的平方和(依赖任务1结果,CPU密集型)"""    return sum(num*num for num in primes)

def calc_final_result(square_sum, factor=2):
    """任务3:基于平方和计算最终结果(依赖任务2结果,CPU密集型)"""    return square_sum * factor  # 示例逻辑:平方和乘以系数

# 异步主函数:用run_in_executor实现依赖任务(简化写法)
async def main_dependent_run_in_executor():
    # 初始数据(将传递给第一个任务)
    initial_numbers = [1000000007, 1000000009, 1000000021, 1000000033, 5000000003]
    
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        loop = asyncio.get_running_loop()
        
        # 步骤1:执行任务1,获取素数列表(无前置依赖)
        primes = await loop.run_in_executor(
            executor, get_primes, initial_numbers
        )
        print(f"任务1完成,获取到素数:{primes}")
        
        # 步骤2:执行任务2,传入任务1的结果计算平方和(依赖任务1)
        square_sum = await loop.run_in_executor(
            executor, calc_square_sum, primes
        )
        print(f"任务2完成,素数平方和:{square_sum}")
        
        # 步骤3:执行任务3,传入任务2的结果计算最终值(依赖任务2)
        final_result = await loop.run_in_executor(
            executor, calc_final_result, square_sum
        )
        print(f"任务3完成,最终结果:{final_result}")

# 更优写法:增加异常处理+任务超时控制(生产环境必备)
# 优势:1. 避免单个任务阻塞导致整体卡死;2. 异常可捕获,程序更健壮;3. 符合生产环境容错要求
async def main_dependent_robust(timeout=10):
    initial_numbers = [1000000007, 1000000009, 1000000021, 1000000033, 5000000003]
    
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        loop = asyncio.get_running_loop()
        try:
            # 步骤1:获取素数(设置超时,避免无限等待)
            primes = await asyncio.wait_for(
                loop.run_in_executor(executor, get_primes, initial_numbers),
                timeout=timeout
            )
            print(f"任务1完成,获取到素数:{primes}")
            
            # 步骤2:计算平方和
            square_sum = await asyncio.wait_for(
                loop.run_in_executor(executor, calc_square_sum, primes),
                timeout=timeout
            )
            print(f"任务2完成,素数平方和:{square_sum}")
            
            # 步骤3:计算最终结果
            final_result = await asyncio.wait_for(
                loop.run_in_executor(executor, calc_final_result, square_sum),
                timeout=timeout
            )
            print(f"任务3完成,最终结果:{final_result}")
            return final_result
        except asyncio.TimeoutError:
            print(f"错误:任务执行超时(超过{timeout}秒)")
            return None
        except Exception as e:
            print(f"错误:任务执行失败 - {str(e)}")
            return None

# 启动执行
if __name__ == "__main__":
    # 运行简化版本
    # asyncio.run(main_dependent_run_in_executor())
    # 运行健壮版本(推荐生产环境使用)
    asyncio.run(main_dependent_robust())

依赖场景说明

  1. 执行顺序严格遵循“任务1→任务2→任务3”,前一个任务未完成时,后一个任务不会启动,通过await保证依赖顺序;
  2. 与子场景2.1(wrap_future写法)核心差异:run_in_executor省去了executor.submit()+asyncio.wrap_future()的手动包装步骤,直接将“进程池+函数+参数”传入即可获得可await对象,代码更精简、可读性更高;
  3. 适用场景:需要递进式计算的CPU密集型任务(如“数据筛选→数据计算→结果推导”);
  4. 输出示例: 任务1完成,获取到素数:[1000000007, 1000000009, 1000000021, 1000000033, 5000000003] 任务2完成,素数平方和:2800000048000000244 任务3完成,最终结果:5600000096000000488

`

3. 方案二:multiprocessing + asyncio(复杂场景示例)

手动创建多进程,每个进程启动独立的事件循环,通过multiprocessing.Queue传递任务和结果,适合需要精细控制进程生命周期的场景。



import asyncio
import multiprocessing
import math

# 1. 进程内执行的异步函数:处理从队列接收的CPU密集型任务
async def process_worker(task_queue, result_queue):
    """每个进程的工作函数:从队列取任务,处理后将结果存入队列"""
    def is_prime(n):
        if n <= 1:
            return False
        if n == 2:
            return True
        if n % 2 == 0:
            return False
        for i in range(3, int(math.sqrt(n)) + 1, 2):
            if n % i == 0:
                return False
        return True
    
    while True:
        # 从队列获取任务(None表示结束信号)
        task = await asyncio.to_thread(task_queue.get)
        if task is None:
            break
        task_id, numbers = task
        # 处理CPU密集型任务
        primes = [num for num in numbers if is_prime(num)]
        # 存入结果队列
        await asyncio.to_thread(result_queue.put, (task_id, primes))

# 2. 启动进程并运行事件循环
def start_process(task_queue, result_queue):
    """启动进程内的事件循环"""
    asyncio.run(process_worker(task_queue, result_queue))

# 3. 主进程:分发任务、收集结果
async def main():
    # 创建进程间通信队列
    task_queue = multiprocessing.Queue()
    result_queue = multiprocessing.Queue()
    
    # 启动4个进程
    processes = []
    for _ in range(4):
        p = multiprocessing.Process(
            target=start_process,
            args=(task_queue, result_queue)
        )
        p.start()
        processes.append(p)
    
    # 分发任务(任务ID + 待处理数字列表)
    task_data = [
        (1, [1000000007, 1000000009, 1000000021]),
        (2, [2000000003, 2000000009, 2000000027]),
        (3, [3000000007, 3000000011, 3000000013]),
        (4, [4000000007, 4000000009, 4000000043])
    ]
    for task in task_data:
        task_queue.put(task)
    
    # 发送结束信号(每个进程1个None)
    for _ in range(4):
        task_queue.put(None)
    
    # 收集结果
    results = {}
    while len(results) < len(task_data):
        task_id, primes = await asyncio.to_thread(result_queue.get)
        results[task_id] = primes
    
    # 输出结果
    for task_id in sorted(results.keys()):
        print(f"任务{task_id}完成,找到素数:{results[task_id]}")
    
    # 等待所有进程结束
    for p in processes:
        p.join()

# 更优写法:用multiprocessing.Manager优化进程间通信(适合复杂场景)
# 优势:1. Manager.Queue支持更多数据类型,且可在多线程+多进程混合场景使用;2. 支持设置队列大小,避免内存溢出;3. 可通过Manager共享其他数据结构(如字典)
async def main_optimized():
    # 用Manager创建进程间共享队列(设置最大容量,避免无限堆积)
    with multiprocessing.Manager() as manager:
        task_queue = manager.Queue(maxsize=10)  # 最大容量10,超过则阻塞put操作
        result_queue = manager.Queue(maxsize=10)
        
        # 启动4个进程
        processes = []
        for _ in range(4):
            p = multiprocessing.Process(
                target=start_process,
                args=(task_queue, result_queue)
            )
            p.start()
            processes.append(p)
        
        # 分发任务(带异常处理,避免队列满导致阻塞)
        task_data = [
            (1, [1000000007, 1000000009, 1000000021]),
            (2, [2000000003, 2000000009, 2000000027]),
            (3, [3000000007, 3000000011, 3000000013]),
            (4, [4000000007, 4000000009, 4000000043])
        ]
        for task in task_data:
            try:
                task_queue.put(task, block=True, timeout=5)  # 5秒超时,避免无限等待
            except multiprocessing.queues.Full:
                print(f"警告:任务队列已满,无法放入任务{task[0]}")
        
        # 发送结束信号
        for _ in range(4):
            task_queue.put(None, block=True, timeout=5)
        
        # 收集结果
        results = {}
        while len(results) < len(task_data):
            try:
                task_id, primes = await asyncio.to_thread(
                    result_queue.get, block=True, timeout=10
                )
                results[task_id] = primes
            except multiprocessing.queues.Empty:
                print(f"警告:结果队列超时为空,当前已收集{len(results)}个结果")
                break
        
        # 输出结果
        for task_id in sorted(results.keys()):
            print(f"任务{task_id}完成,找到素数:{results[task_id]}")
        
        # 等待所有进程结束,强制终止超时进程
        for p in processes:
            p.join(timeout=5)
            if p.is_alive():
                print(f"警告:进程{p.pid}超时未结束,强制终止")
                p.terminate()

if __name__ == "__main__":
    asyncio.run(main())
    # 启动优化版本
    # asyncio.run(main_optimized())

关键说明

  1. 进程数建议设置为CPU核心数(如4核CPU设为4),避免进程切换开销;
  2. asyncio.to_thread()(Python 3.9+)可便捷地将同步的队列操作(如queue.get())包装为异步任务,避免阻塞事件循环;
  3. 复杂场景下,也可使用multiprocessing.Manager创建共享内存或字典,实现更灵活的进程间数据共享。

六、总结:该选Python异步还是Golang goroutine?

没有绝对的优劣,只有适合的场景:

  • 若你是Python开发者,需要处理高并发IO场景(如爬虫、API服务),且不想切换语言:优先用Python异步协程,配合aiohttp、aiomysql等库,能大幅提升效率;

  • 若你需要开发高并发中间件、微服务,或涉及CPU密集型场景:优先选Golang,goroutine的调度优势和生态完善度,能让并发编程更轻松;

若你追求“快速上手、低心智负担”:Golang的goroutine更具优势,一行代码启动并发,无需过多关注底层调度;Python异步则需要掌握更多概念,适合愿意深入理解异步原理的开发者;

最后,记住核心原则:Python异步适合“IO密集型+Python生态依赖 +增量改造 ”的场景, 处理CPU密集型需搭配多进程; Golang适合“高并发 多场景适配+快速上手+全新项目开发 ”的场景。根据自己的技术栈、项目需求选择即可。

如果觉得本文对你有帮助,欢迎转发分享。你在使用Python异步或Golang goroutine时遇到过哪些问题?评论区聊聊!