Python中一种轻松实现并发编程的方法
原文地址:https://rednafi.github.io/digressions/python/2020/04/21/python-concurrent-futures.html
文章较长,建议收藏后再看。
使用Python编写并发代码时并不流畅,我们需要一些额外的思考,你手中的任务是I/O密集类型还是CPU密集型。此外,由于全局解释锁的存在,进一步增加了编写真正的并发代码的难度。
在Python中编写并发代码常常这样做:
如果任务是I/O密集的,可以使用标准库的
threading
模块,如果任务是CPU密集的,则multiprocessing
更合适。threading
和multiprocessing
的API提供了很多功能,但他们都是很低级别的代码,在我们业务核心逻辑上增加了额外的复杂性。
Python标准库还包含一个名为concurrent.futrures
的模块。在Python3.2中添加了该模块,为开发人员提供了更高级别的异步任务接口。它是在threading
和multiprocessing
模块之上的通用抽象层,提供了使用线程池和进程池运行任务的一些接口。当你只需要并发的运行一段完整的代码,而不想使用threading
和multiprocessing
增加额外的逻辑,破坏代码的完整性时,这个模块是一个好的解决方案。
# concurrent.futures 剖析
根据官方文档,
The concurrent.futures module provides a high-level interface for asynchronously executing callables.
这意味着你可以使用更高阶别的接口来使用线程和进程。该模块提供了一个名为Executor
的抽象类,你不能直接实例化它,需要使用它提供的两个子类之一来跑任务。
Executor (Abstract Base Class)
│
├── ThreadPoolExecutor
│ │A concrete subclass of the Executor class to
│ │manage I/O bound tasks with threading underneath
│
├── ProcessPoolExecutor
│ │A concrete subclass of the Executor class to
│ │manage CPU bound tasks with multiprocessing underneath
2
3
4
5
6
7
8
9
在模块内,这两个类与服务池交互并管理工作单元(workers)。Future
类用于管理工作单元的结果。使用工作池时,应用程序会创建适当的Executor
类(ThreadPoolExecutor
或ProcessPoolExecutor
)的实例,然后提交给他们任务运行。提交后,将返回一个Future
类的实例。当需要任务结果时,可以使用该Future
对象进行阻塞,直到拿到任务结果为止。
# Executor Objects
由于ThreadPoolExecutor
和ProcessPoolExecutor
有相同的API接口,我们主要来看下它们提供的两个方法。
# submit(func, args, *kwargs)
func
可调度对象(执行函数),执行参数args
, kwargs
。返回一个可调用的 Future
对象。
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 323, 1235)
print(future.result())
2
3
# map(func, *iterables, timeout=None, chunksize=1)
除了下边部分其他同submit
:
- 可迭代对象数据是立即生成的,不像其他迭代器是惰性的。
func
是异步执行的,并且可以同时进行对func
的多次调用。
返回的迭代器调用__next__()
方法时,将会引发concurrent.futures.TimeoutError
。从开始调用到timeout
时间后,结果将不获取。timeout
可以是int
或float
,如果未指定或者是None,则等待时间没有限制。
如果func
调用引发异常,则从迭代器中检索其结果时将抛出该异常。
使用ProcessPoolExecutor
时,此方法可将迭代项分为多个块,将其作为单独的任务块提交给池。这些块的大小可以通过chunksize
来设置。相比非常长的并发列表,使用chunksize
进行分块提交执行可以显著提高性能。
使用ThreadPoolExecutor
时,chunksize
无效。
# 运行并发任务的通用方法
我的许多脚本包含类似这样的代码:
for task in get_tasks():
perform(task)
2
get_tasks
返回一个可迭代对象,该迭代对象包含有特定功能任务的参数相关信息。perform
函数依次运行,每次只能运行一个。由于逻辑是顺序执行的,因此很容易理解。当任务数量少或单个任务的执行时间少、复杂度较低时,没有什么大问题。但是当任务数量巨大或单个任务很耗时时,整端代码性能就变得非常低下。
根据一般经验,ThreadPoolExecutor
在主要受I/O限制的任务时使用,例如发起多个http请求、将大量文件保存到磁盘等。ProcessPoolExecutor
在主要受CPU限制的任务中应用。如大运算量的任务、对大量图片处理应用和一次处理多个文本文件等。
# 使用 Executor.submit
运行任务
当你有许多任务时,可一次性执行它们,然后等待它们全部完成,再收集结果。
import concurrent.futures
with concurrent.futures.Executor() as executor:
futures = {executor.submit(perform, task) for task in get_tasks()}
for fut in concurrent.futures.as_completed(futures):
print(f"The outcome is {fut.result()}")
2
3
4
5
6
7
8
这里你首先创建一个执行器,该执行器在单独的进程或线程中执行所有任务。使用with
语句将创建一个上下文管理器,该管理器可确保在完成后通过隐士调用executor.shutdown()
函数来清理掉所有线程或进程。
在真实的代码中,你需要将Executor
替换为ThreadPoolExecutor
或ProcessPoolExecutor
这些可调用的类。然后使用推导式来开始所有的任务,使用executor.submit()
来调度每一个任务开始执行。每个任务执行后,会生成一个future
对象。一旦所有的任务都完成了,concurrent.futures.as_completed()
方法便可获取所有任务的结果。executor.result()
返回给你任务执行函数的结果值或抛出任务失败异常。
executor.submit()
方法是异步来调用执行任务的,并不包含任务的任何上下文信息。所以如果你想同时获取每个任务的信息,你需要自己处理。
import concurrent.futures
with concurrent.futures.Executor() as executor:
futures = {executor.submit(perform, task): task for task in get_tasks()}
for fut in concurrent.futures.as_completed(futures):
original_task = futures[fut]
print(f"The result of {original_task} is {fut.result()}")
2
3
4
5
6
7
8
9
注意,futures
是一个任务结果对应任务的一个字典结构。
# 使用Executor.map
来运行任务
按照预定的顺序收集结果的另一种方式是使用executor.map()
方法:
import concurrent.futures
with concurrent.futures.Executor() as executor:
for arg, res in zip(get_tasks(), executor.map(perform, get_tasks())):
print(f"The result of {arg} is {res}")
2
3
4
5
6
注意,map
方法会一次性执行所有任务,不会像正常迭代器那样有惰性。如果任务在执行的时候有问题,它会立即抛出,不再继续执行。
在Python3.5+中,executor.map()
可以设置一个参数chunksize
。当我们使用ProcessPoolExecutor
时,会使用该参数设置的值来分批次的执行任务。每次执行chunksize
个任务,该参数默认为1。当使用ThreadPoolExecutor
时,参数无效。
# 真实的例子
在开始例子之前,我们先写一个简单的装饰器,来帮助我们更好的查看执行时间。
import time
from functools import wraps
def timeit(method):
@wraps(method)
def wrapper(*args, **kwargs):
start_time = time.time()
result = method(*args, **kwargs)
end_time = time.time()
print(f"{method.__name__} => {(end_time-start_time)*1000} ms")
return result
return wrapper
2
3
4
5
6
7
8
9
10
11
12
13
14
这个装饰器可以这样用:
@timeit
def func(n):
return list(range(n))
2
3
它会打印执行函数的名字和执行时间。
# 使用多线程下载和保存文件
首先,让我们从这堆URL中下载一些pdf文件,并将它们保存到我们的磁盘。这是一个I/O类型的任务,我们使用ThreadPoolExecutor
类来操作。开始之前我们先看下顺序执行的代码:
from pathlib import Path
import urllib.request
def download_one(url):
"""
Downloads the specified URL and saves it to disk
"""
req = urllib.request.urlopen(url)
fullpath = Path(url)
fname = fullpath.name
ext = fullpath.suffix
if not ext:
raise RuntimeError("URL does not contain an extension")
with open(fname, "wb") as handle:
while True:
chunk = req.read(1024)
if not chunk:
break
handle.write(chunk)
msg = f"Finished downloading {fname}"
return msg
@timeit
def download_all(urls):
return [download_one(url) for url in urls]
if __name__ == "__main__":
urls = (
"http://www.irs.gov/pub/irs-pdf/f1040.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040sb.pdf",
)
results = download_all(urls)
for result in results:
print(result)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
>>> download_all => 22850.6863117218 ms
... Finished downloading f1040.pdf
... Finished downloading f1040a.pdf
... Finished downloading f1040ez.pdf
... Finished downloading f1040es.pdf
... Finished downloading f1040sb.pdf
2
3
4
5
6
在上边的代码块中,我定义了两个方法。download_one
方法负责从url下载pdf文件并保存到磁盘。它会检查url中的文件是否有扩展名,如果没有,它会抛出RunTimeError
异常。如果有扩展名,它会下载文件,并保存到磁盘。第二个方法download_all
仅仅遍历这些URL并依次对URL应用download_one
方法。这个顺序执行的代码大概花费了22.8s的时间。现在让我们来看下多线程版本的代码:
from pathlib import Path
import urllib.request
from concurrent.futures import ThreadPoolExecutor, as_completed
def download_one(url):
"""
Downloads the specified URL and saves it to disk
"""
req = urllib.request.urlopen(url)
fullpath = Path(url)
fname = fullpath.name
ext = fullpath.suffix
if not ext:
raise RuntimeError("URL does not contain an extension")
with open(fname, "wb") as handle:
while True:
chunk = req.read(1024)
if not chunk:
break
handle.write(chunk)
msg = f"Finished downloading {fname}"
return msg
@timeit
def download_all(urls):
"""
Create a thread pool and download specified urls
"""
with ThreadPoolExecutor(max_workers=13) as executor:
return executor.map(download_one, urls, timeout=60)
if __name__ == "__main__":
urls = (
"http://www.irs.gov/pub/irs-pdf/f1040.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040sb.pdf",
)
results = download_all(urls)
for result in results:
print(result)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
>>> download_all => 5042.651653289795 ms
... Finished downloading f1040.pdf
... Finished downloading f1040a.pdf
... Finished downloading f1040ez.pdf
... Finished downloading f1040es.pdf
... Finished downloading f1040sb.pdf
2
3
4
5
6
这个并发版本只花费了顺序版本大约1/4的时间。timeout
参数是说在这个时间之后,就不能再获取任务结果。max_works
参数是启动多少个工作线程,一般为2*multiprocessing.cpu_count()+1
。我的机器是6和12线程的,所以我选择启动13个线程。
你也可以尝试使用
ProcessPoolExecutor
类相同的接口方法来跑上边的代码。但是因为任务本身的特性,还是使用线程的性能稍好。
# 使用多进程运行CPU密集型任务
下边的示例,一个CPU密集型哈希函数。核心方法是顺序多次执行CPU密集型的哈希算法,另一个方法是多次执行这个核心方法。让我们来看下代码:
import hashlib
def hash_one(n):
"""A somewhat CPU-intensive task."""
for i in range(1, n):
hashlib.pbkdf2_hmac("sha256", b"password", b"salt", i * 10000)
return "done"
@timeit
def hash_all(n):
"""Function that does hashing in serial."""
for i in range(n):
hsh = hash_one(n)
return "done"
if __name__ == "__main__":
hash_all(20)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
>>> hash_all => 18317.330598831177 ms
分析hash_one
和hash_all
方法,会发现一个共同点,他们都有一个CPU密集的for循环代码。它大概执行花费了18s,让我们来看下使用ProcessPoolExecutor
版本:
import hashlib
from concurrent.futures import ProcessPoolExecutor
def hash_one(n):
"""A somewhat CPU-intensive task."""
for i in range(1, n):
hashlib.pbkdf2_hmac("sha256", b"password", b"salt", i * 10000)
return "done"
@timeit
def hash_all(n):
"""Function that does hashing in serial."""
with ProcessPoolExecutor(max_workers=10) as executor:
for arg, res in zip(range(n), executor.map(hash_one, range(n), chunksize=2)):
pass
return "done"
if __name__ == "__main__":
hash_all(20)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
>>> hash_all => 1673.842430114746 ms
仔细查看代码,你会发现在hash_one
中for循环的代码,仍然是顺序执行的。但是,在hash_all
中的for代码块是多进程执行的。这里我用10个工作进程按每个批次2个任务来并发执行。你可以尝试调整工作进程和并发任务数,已到达最佳的性能。这个并发版本比顺序版本快了近11倍。
# 并发中常遇到的坑
ThreadPoolExecutor
和ProcessPoolExecutor
它适用于简单的并发任务,有循环迭代的情况或仅允许运行子程序的情况。如果你的任务需要排队,或者需要用到多进程和多线程,你任然需要使用threading
和multiprocessing
模块。
在使用ThreadPoolExecutor
时,可能会出现死锁问题。当与Future相关联的可调用对象等待另一个Future的结果时,它们可能永远不会释放对线程的控制从而导致死锁。让我们来看一个示例:
import time
from concurrent.futures import ThreadPoolExecutor
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
with ThreadPoolExecutor(max_workers=2) as executor:
# here, the future from a depends on the future from b
# and vice versa
# so this is never going to be completed
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
print("Result from wait_on_b", a.result())
print("Result from wait_on_a", b.result())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
在这个例子中,wait_on_b
函数依赖wait_on_a
函数的结果。同时wait_on_a
函数又依赖wait_on_b
函数的结果,代码由于死锁永远不会结束。让我们看一下另一种死锁情况:
from concurrent.futures import ThreadPoolExecutor
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(wait_on_future)
print(future.result())
2
3
4
5
6
7
8
9
10
11
12
13
上边的代码,在一个线程中,提交了一个子执行任务。在子任务结束之前,不会释放线程,但是线程又被主任务使用,便造成死锁。使用多个线程可解决这个问题,但是这种写法本身就非常糟糕的,不提倡的。
有时候,并发代码的性能可能比顺序执行代码效率低下。发生这种情况的原因有很多:
- 1、线程用于执行CPU密集的任务。
- 2、使用多进程处理I/O密集的任务。
- 3、任务非常繁碎,无法使用多线程或多进程来处理。
生成和释放线程或进程是需要额外开销的。通常线程的操作比进程快的多,但是错误的使用反而会拖慢你的代码性能。下边是一个繁碎的示例,ThreadPoolExecutor
和ProcessPoolExecutor
性能均比顺序的性能差。
import math
PRIMES = [num for num in range(19000, 20000)]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
@timeit
def main():
for number in PRIMES:
print(f"{number} is prime: {is_prime(number)}")
if __name__ == "__main__":
main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
>>> 19088 is prime: False
... 19089 is prime: False
... 19090 is prime: False
... ...
... main => 67.65174865722656 ms
2
3
4
5
上边的示例验证列表中的数是否为质数。顺序版本大约花费了67ms完成。但是并发版本却化了 140ms 才完成。
from concurrent.futures import ThreadPoolExecutor
import math
num_list = [num for num in range(19000, 20000)]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
@timeit
def main():
with ThreadPoolExecutor(max_workers=13) as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, num_list)):
print(f"{number} is prime: {prime}")
if __name__ == "__main__":
main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
>>> 19088 is prime: False
... 19089 is prime: False
... 19090 is prime: False
... ...
... main => 140.17250061035156 ms
2
3
4
5
相同的代码,多进程版本:
from concurrent.futures import ProcessPoolExecutor
import math
num_list = [num for num in range(19000, 20000)]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
@timeit
def main():
with ProcessPoolExecutor(max_workers=13) as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, num_list)):
print(f"{number} is prime: {prime}")
if __name__ == "__main__":
main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
>>> 19088 is prime: False
... 19089 is prime: False
... 19090 is prime: False
... ...
... main => 311.3126754760742 ms
2
3
4
5
从直观上看,检测质数应该是CPU密集型的操作,最终结果却是顺序执行最快。所以,合理的评估任务计算是否值得使用多线程或多进程也是非常重要的。否则,我们写出的代码性能不一定更高效。