Python 并发编程:asyncio vs threading vs multiprocessing

张开发
2026/4/15 23:01:14 15 分钟阅读

分享文章

Python 并发编程:asyncio vs threading vs multiprocessing
Python 并发编程asyncio vs threading vs multiprocessing核心结论asyncio适合 I/O 密集型任务内存开销小单线程并发threading适合 I/O 密集型任务多线程并发需注意 GIL 限制multiprocessing适合 CPU 密集型任务多进程并发无 GIL 限制性能对比I/O 密集型任务 asyncio threading multiprocessingCPU 密集型任务 multiprocessing threading ≈ asyncio一、并发编程基础1.1 并发与并行的区别并发多个任务交替执行宏观上同时进行并行多个任务同时执行微观上同时进行Python 中的并发模型线程threading多线程并发受 GIL 限制进程multiprocessing多进程并行无 GIL 限制协程asyncio单线程并发基于事件循环1.2 GIL全局解释器锁的影响GILPython 解释器的全局锁同一时刻只能有一个线程执行 Python 字节码影响多线程在 CPU 密集型任务中无法真正并行I/O 密集型任务中线程会释放 GIL因此仍有并发优势多进程不受 GIL 影响可实现真正并行二、asyncio 详解2.1 基本概念协程可暂停执行的函数通过async def定义事件循环管理协程的执行处理 I/O 操作Future表示异步操作的结果TaskFuture 的子类用于执行协程2.2 代码示例import asyncio import time async def fetch_data(url, delay): 模拟网络请求 print(f开始获取 {url} 的数据) await asyncio.sleep(delay) print(f完成获取 {url} 的数据) return f{url} 的数据 async def main(): 主协程 start_time time.time() # 并发执行多个协程 tasks [ fetch_data(https://api.example.com/data1, 2), fetch_data(https://api.example.com/data2, 3), fetch_data(https://api.example.com/data3, 1) ] results await asyncio.gather(*tasks) print(f所有请求完成结果: {results}) print(f总耗时: {time.time() - start_time:.2f} 秒) if __name__ __main__: asyncio.run(main())2.3 性能分析优点内存开销小无需线程/进程切换适合高并发 I/O 操作编程模型清晰避免回调地狱缺点需使用异步库不能直接调用同步函数不适合 CPU 密集型任务学习曲线较陡峭三、threading 详解3.1 基本概念线程轻量级进程共享内存空间Thread 类创建和管理线程Lock线程同步原语防止资源竞争ThreadPoolExecutor线程池管理线程生命周期3.2 代码示例import threading import time from concurrent.futures import ThreadPoolExecutor def fetch_data(url, delay): 模拟网络请求 print(f开始获取 {url} 的数据) time.sleep(delay) print(f完成获取 {url} 的数据) return f{url} 的数据 def main(): 主线程 start_time time.time() # 使用线程池 with ThreadPoolExecutor(max_workers3) as executor: futures [ executor.submit(fetch_data, https://api.example.com/data1, 2), executor.submit(fetch_data, https://api.example.com/data2, 3), executor.submit(fetch_data, https://api.example.com/data3, 1) ] results [future.result() for future in futures] print(f所有请求完成结果: {results}) print(f总耗时: {time.time() - start_time:.2f} 秒) if __name__ __main__: main()3.3 性能分析优点适合 I/O 密集型任务编程模型简单易于理解可直接调用同步函数缺点受 GIL 限制CPU 密集型任务性能受限线程切换开销较大需注意线程安全问题四、multiprocessing 详解4.1 基本概念进程独立的执行环境有自己的内存空间Process 类创建和管理进程Queue进程间通信机制Pool进程池管理进程生命周期4.2 代码示例import multiprocessing import time from concurrent.futures import ProcessPoolExecutor def compute_intensive_task(n): 模拟 CPU 密集型任务 print(f开始计算任务 {n}) result 0 for i in range(10**7): result i print(f完成计算任务 {n}) return result def main(): 主进程 start_time time.time() # 使用进程池 with ProcessPoolExecutor(max_workers3) as executor: futures [ executor.submit(compute_intensive_task, 1), executor.submit(compute_intensive_task, 2), executor.submit(compute_intensive_task, 3) ] results [future.result() for future in futures] print(f所有计算完成结果: {results}) print(f总耗时: {time.time() - start_time:.2f} 秒) if __name__ __main__: main()4.3 性能分析优点无 GIL 限制适合 CPU 密集型任务真正的并行执行进程间相互独立安全性高缺点内存开销大每个进程有独立内存空间进程间通信开销较大启动和管理开销较大五、性能对比实验5.1 I/O 密集型任务对比import asyncio import threading import multiprocessing import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor # 模拟 I/O 密集型任务 def io_task(delay): time.sleep(delay) return delay async def async_io_task(delay): await asyncio.sleep(delay) return delay # 测试 I/O 密集型任务 def test_io_performance(): tasks [1] * 10 # 10个任务每个任务延迟1秒 # 同步执行 start time.time() for task in tasks: io_task(task) sync_time time.time() - start print(f同步执行时间: {sync_time:.2f} 秒) # asyncio 执行 async def async_main(): start time.time() await asyncio.gather(*[async_io_task(task) for task in tasks]) return time.time() - start async_time asyncio.run(async_main()) print(fasyncio 执行时间: {async_time:.2f} 秒) # threading 执行 start time.time() with ThreadPoolExecutor(max_workers10) as executor: executor.map(io_task, tasks) thread_time time.time() - start print(fthreading 执行时间: {thread_time:.2f} 秒) # multiprocessing 执行 start time.time() with ProcessPoolExecutor(max_workers10) as executor: executor.map(io_task, tasks) process_time time.time() - start print(fmultiprocessing 执行时间: {process_time:.2f} 秒) if __name__ __main__: test_io_performance()5.2 CPU 密集型任务对比import threading import multiprocessing import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor # 模拟 CPU 密集型任务 def cpu_task(n): result 0 for i in range(n): result i return result # 测试 CPU 密集型任务 def test_cpu_performance(): tasks [10**7] * 4 # 4个任务每个任务计算10^7次 # 同步执行 start time.time() for task in tasks: cpu_task(task) sync_time time.time() - start print(f同步执行时间: {sync_time:.2f} 秒) # threading 执行 start time.time() with ThreadPoolExecutor(max_workers4) as executor: executor.map(cpu_task, tasks) thread_time time.time() - start print(fthreading 执行时间: {thread_time:.2f} 秒) # multiprocessing 执行 start time.time() with ProcessPoolExecutor(max_workers4) as executor: executor.map(cpu_task, tasks) process_time time.time() - start print(fmultiprocessing 执行时间: {process_time:.2f} 秒) if __name__ __main__: test_cpu_performance()5.3 实验结果分析任务类型同步执行asynciothreadingmultiprocessingI/O 密集型10个任务每个1秒10.0秒~1.0秒~1.0秒~1.0秒CPU 密集型4个任务每个10^7次计算4.0秒~4.0秒~4.0秒~1.0秒结论I/O 密集型任务asyncio 和 threading 性能接近multiprocessing 略慢进程启动开销CPU 密集型任务multiprocessing 性能显著优于 threading 和 asyncio无 GIL 限制六、最佳实践建议6.1 选择合适的并发模型asyncio适合网络请求、文件 I/O、数据库操作等 I/O 密集型任务场景Web 服务器、爬虫、API 调用threading适合I/O 密集型任务特别是需要调用同步库的场景场景传统 I/O 操作、GUI 应用multiprocessing适合CPU 密集型任务如数据处理、模型训练场景科学计算、图像处理、机器学习6.2 性能优化技巧asyncio使用异步库aiohttp、aiomysql 等避免在协程中执行阻塞操作合理设置事件循环threading使用线程池管理线程减少线程间通信和同步操作注意 GIL 影响multiprocessing使用进程池管理进程减少进程间通信开销合理设置进程数通常为 CPU 核心数6.3 代码质量保证错误处理在并发代码中妥善处理异常资源管理确保所有资源正确释放测试编写并发测试用例验证正确性和性能监控监控并发任务的执行状态和资源使用七、总结Python 提供了三种主要的并发编程模型asyncio、threading 和 multiprocessing。每种模型都有其适用场景和优缺点asyncio单线程协程适合 I/O 密集型任务内存开销小性能优异threading多线程并发适合 I/O 密集型任务编程模型简单但受 GIL 限制multiprocessing多进程并行适合 CPU 密集型任务无 GIL 限制但内存开销大在实际应用中应根据任务类型、性能要求和代码复杂度选择合适的并发模型。对于复杂系统也可以结合使用多种并发模型充分发挥各自的优势。技术演进的内在逻辑Python 的并发模型从 threading 到 multiprocessing再到 asyncio反映了对性能和编程体验的不断追求。每种模型都解决了特定场景下的问题共同构成了 Python 强大的并发编程生态。

更多文章