码农行者 码农行者
首页
  • Python

    • 语言特性
    • Django相关
    • Tornado
    • Celery
  • Golang

    • golang学习笔记
    • 对比python学习go
    • 模块学习
  • JavaScript

    • Javascript
  • 数据结构预算法笔记
  • ATS
  • Mongodb
  • Git
云原生
运维
垃圾佬的快乐
  • 数据库
  • 机器学习
  • 杂谈
  • 面试
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

DeanWu

软件工程师
首页
  • Python

    • 语言特性
    • Django相关
    • Tornado
    • Celery
  • Golang

    • golang学习笔记
    • 对比python学习go
    • 模块学习
  • JavaScript

    • Javascript
  • 数据结构预算法笔记
  • ATS
  • Mongodb
  • Git
云原生
运维
垃圾佬的快乐
  • 数据库
  • 机器学习
  • 杂谈
  • 面试
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • Python中一种轻松实现并发编程的方法

    • concurrent.futures 剖析
      • Executor Objects
        • submit(func, args, *kwargs)
        • map(func, *iterables, timeout=None, chunksize=1)
      • 运行并发任务的通用方法
        • 使用 Executor.submit 运行任务
        • 使用Executor.map来运行任务
      • 真实的例子
        • 使用多线程下载和保存文件
        • 使用多进程运行CPU密集型任务
      • 并发中常遇到的坑
        • 参考
        • 开发语言
        • Python
        • 语言特性
        DeanWu
        2020-05-18
        目录

        Python中一种轻松实现并发编程的方法

        原文地址:https://rednafi.github.io/digressions/python/2020/04/21/python-concurrent-futures.html

        文章较长,建议收藏后再看。

        使用Python编写并发代码时并不流畅,我们需要一些额外的思考,你手中的任务是I/O密集类型还是CPU密集型。此外,由于全局解释锁的存在,进一步增加了编写真正的并发代码的难度。

        在Python中编写并发代码常常这样做:

        如果任务是I/O密集的,可以使用标准库的threading模块,如果任务是CPU密集的,则multiprocessing更合适。threading和multiprocessing的API提供了很多功能,但他们都是很低级别的代码,在我们业务核心逻辑上增加了额外的复杂性。

        Python标准库还包含一个名为concurrent.futrures的模块。在Python3.2中添加了该模块,为开发人员提供了更高级别的异步任务接口。它是在threading和multiprocessing模块之上的通用抽象层,提供了使用线程池和进程池运行任务的一些接口。当你只需要并发的运行一段完整的代码,而不想使用threading和multiprocessing增加额外的逻辑,破坏代码的完整性时,这个模块是一个好的解决方案。

        # concurrent.futures 剖析

        根据官方文档,

        The concurrent.futures module provides a high-level interface for asynchronously executing callables.

        这意味着你可以使用更高阶别的接口来使用线程和进程。该模块提供了一个名为Executor的抽象类,你不能直接实例化它,需要使用它提供的两个子类之一来跑任务。

        Executor (Abstract Base Class)
        │
        ├── ThreadPoolExecutor
        │   │A concrete subclass of the Executor class to
        │   │manage I/O bound tasks with threading underneath
        │
        ├── ProcessPoolExecutor
        │   │A concrete subclass of the Executor class to
        │   │manage CPU bound tasks with multiprocessing underneath
        
        1
        2
        3
        4
        5
        6
        7
        8
        9

        在模块内,这两个类与服务池交互并管理工作单元(workers)。Future类用于管理工作单元的结果。使用工作池时,应用程序会创建适当的Executor类(ThreadPoolExecutor或ProcessPoolExecutor)的实例,然后提交给他们任务运行。提交后,将返回一个Future类的实例。当需要任务结果时,可以使用该Future对象进行阻塞,直到拿到任务结果为止。

        # Executor Objects

        由于ThreadPoolExecutor和ProcessPoolExecutor有相同的API接口,我们主要来看下它们提供的两个方法。

        # submit(func, args, *kwargs)

        func 可调度对象(执行函数),执行参数args, kwargs。返回一个可调用的 Future 对象。

        with ThreadPoolExecutor(max_workers=1) as executor:
            future = executor.submit(pow, 323, 1235)
            print(future.result())
        
        1
        2
        3

        # map(func, *iterables, timeout=None, chunksize=1)

        除了下边部分其他同submit:

        • 可迭代对象数据是立即生成的,不像其他迭代器是惰性的。
        • func 是异步执行的,并且可以同时进行对func的多次调用。

        返回的迭代器调用__next__()方法时,将会引发concurrent.futures.TimeoutError。从开始调用到timeout时间后,结果将不获取。timeout可以是int或float,如果未指定或者是None,则等待时间没有限制。

        如果func调用引发异常,则从迭代器中检索其结果时将抛出该异常。

        使用ProcessPoolExecutor时,此方法可将迭代项分为多个块,将其作为单独的任务块提交给池。这些块的大小可以通过chunksize来设置。相比非常长的并发列表,使用chunksize进行分块提交执行可以显著提高性能。

        使用ThreadPoolExecutor时,chunksize无效。

        # 运行并发任务的通用方法

        我的许多脚本包含类似这样的代码:

        for task in get_tasks():
            perform(task)
        
        1
        2

        get_tasks返回一个可迭代对象,该迭代对象包含有特定功能任务的参数相关信息。perform函数依次运行,每次只能运行一个。由于逻辑是顺序执行的,因此很容易理解。当任务数量少或单个任务的执行时间少、复杂度较低时,没有什么大问题。但是当任务数量巨大或单个任务很耗时时,整端代码性能就变得非常低下。

        根据一般经验,ThreadPoolExecutor 在主要受I/O限制的任务时使用,例如发起多个http请求、将大量文件保存到磁盘等。ProcessPoolExecutor 在主要受CPU限制的任务中应用。如大运算量的任务、对大量图片处理应用和一次处理多个文本文件等。

        # 使用 Executor.submit 运行任务

        当你有许多任务时,可一次性执行它们,然后等待它们全部完成,再收集结果。

        import concurrent.futures
        
        
        with concurrent.futures.Executor() as executor:
            futures = {executor.submit(perform, task) for task in get_tasks()}
        
            for fut in concurrent.futures.as_completed(futures):
                print(f"The outcome is {fut.result()}")
        
        1
        2
        3
        4
        5
        6
        7
        8

        这里你首先创建一个执行器,该执行器在单独的进程或线程中执行所有任务。使用with语句将创建一个上下文管理器,该管理器可确保在完成后通过隐士调用executor.shutdown()函数来清理掉所有线程或进程。

        在真实的代码中,你需要将Executor替换为ThreadPoolExecutor或ProcessPoolExecutor这些可调用的类。然后使用推导式来开始所有的任务,使用executor.submit()来调度每一个任务开始执行。每个任务执行后,会生成一个future对象。一旦所有的任务都完成了,concurrent.futures.as_completed()方法便可获取所有任务的结果。executor.result()返回给你任务执行函数的结果值或抛出任务失败异常。

        executor.submit()方法是异步来调用执行任务的,并不包含任务的任何上下文信息。所以如果你想同时获取每个任务的信息,你需要自己处理。

        import concurrent.futures
        
        
        with concurrent.futures.Executor() as executor:
            futures = {executor.submit(perform, task): task for task in get_tasks()}
        
            for fut in concurrent.futures.as_completed(futures):
                original_task = futures[fut]
                print(f"The result of {original_task} is {fut.result()}")
        
        1
        2
        3
        4
        5
        6
        7
        8
        9

        注意,futures是一个任务结果对应任务的一个字典结构。

        # 使用Executor.map来运行任务

        按照预定的顺序收集结果的另一种方式是使用executor.map()方法:

        import concurrent.futures
        
        
        with concurrent.futures.Executor() as executor:
            for arg, res in zip(get_tasks(), executor.map(perform, get_tasks())):
                print(f"The result of {arg} is {res}")
        
        1
        2
        3
        4
        5
        6

        注意,map方法会一次性执行所有任务,不会像正常迭代器那样有惰性。如果任务在执行的时候有问题,它会立即抛出,不再继续执行。

        在Python3.5+中,executor.map()可以设置一个参数chunksize。当我们使用ProcessPoolExecutor时,会使用该参数设置的值来分批次的执行任务。每次执行chunksize个任务,该参数默认为1。当使用ThreadPoolExecutor时,参数无效。

        # 真实的例子

        在开始例子之前,我们先写一个简单的装饰器,来帮助我们更好的查看执行时间。

        import time
        from functools import wraps
        
        
        def timeit(method):
            @wraps(method)
            def wrapper(*args, **kwargs):
                start_time = time.time()
                result = method(*args, **kwargs)
                end_time = time.time()
                print(f"{method.__name__} => {(end_time-start_time)*1000} ms")
        
                return result
            return wrapper
        
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14

        这个装饰器可以这样用:

        @timeit
        def func(n):
        return list(range(n))
        
        1
        2
        3

        它会打印执行函数的名字和执行时间。

        # 使用多线程下载和保存文件

        首先,让我们从这堆URL中下载一些pdf文件,并将它们保存到我们的磁盘。这是一个I/O类型的任务,我们使用ThreadPoolExecutor类来操作。开始之前我们先看下顺序执行的代码:

        from pathlib import Path
        import urllib.request
        
        
        def download_one(url):
            """
            Downloads the specified URL and saves it to disk
            """
        
            req = urllib.request.urlopen(url)
            fullpath = Path(url)
            fname = fullpath.name
            ext = fullpath.suffix
        
            if not ext:
                raise RuntimeError("URL does not contain an extension")
        
            with open(fname, "wb") as handle:
                while True:
                    chunk = req.read(1024)
                    if not chunk:
                        break
                    handle.write(chunk)
        
            msg = f"Finished downloading {fname}"
            return msg
        
        
        @timeit
        def download_all(urls):
            return [download_one(url) for url in urls]
        
        
        if __name__ == "__main__":
            urls = (
                "http://www.irs.gov/pub/irs-pdf/f1040.pdf",
                "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
                "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
                "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
                "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf",
            )
        
            results = download_all(urls)
            for result in results:
                print(result)
        
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        >>> download_all => 22850.6863117218 ms
        ... Finished downloading f1040.pdf
        ... Finished downloading f1040a.pdf
        ... Finished downloading f1040ez.pdf
        ... Finished downloading f1040es.pdf
        ... Finished downloading f1040sb.pdf
        
        1
        2
        3
        4
        5
        6

        在上边的代码块中,我定义了两个方法。download_one方法负责从url下载pdf文件并保存到磁盘。它会检查url中的文件是否有扩展名,如果没有,它会抛出RunTimeError异常。如果有扩展名,它会下载文件,并保存到磁盘。第二个方法download_all仅仅遍历这些URL并依次对URL应用download_one方法。这个顺序执行的代码大概花费了22.8s的时间。现在让我们来看下多线程版本的代码:

        from pathlib import Path
        import urllib.request
        from concurrent.futures import ThreadPoolExecutor, as_completed
        
        
        def download_one(url):
            """
            Downloads the specified URL and saves it to disk
            """
        
            req = urllib.request.urlopen(url)
            fullpath = Path(url)
            fname = fullpath.name
            ext = fullpath.suffix
        
            if not ext:
                raise RuntimeError("URL does not contain an extension")
        
            with open(fname, "wb") as handle:
                while True:
                    chunk = req.read(1024)
                    if not chunk:
                        break
                    handle.write(chunk)
        
            msg = f"Finished downloading {fname}"
            return msg
        
        
        @timeit
        def download_all(urls):
            """
            Create a thread pool and download specified urls
            """
        
            with ThreadPoolExecutor(max_workers=13) as executor:
                return executor.map(download_one, urls, timeout=60)
        
        
        if __name__ == "__main__":
            urls = (
                "http://www.irs.gov/pub/irs-pdf/f1040.pdf",
                "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
                "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
                "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
                "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf",
            )
        
            results = download_all(urls)
            for result in results:
                print(result)
        
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        >>> download_all => 5042.651653289795 ms
        ... Finished downloading f1040.pdf
        ... Finished downloading f1040a.pdf
        ... Finished downloading f1040ez.pdf
        ... Finished downloading f1040es.pdf
        ... Finished downloading f1040sb.pdf
        
        1
        2
        3
        4
        5
        6

        这个并发版本只花费了顺序版本大约1/4的时间。timeout参数是说在这个时间之后,就不能再获取任务结果。max_works参数是启动多少个工作线程,一般为2*multiprocessing.cpu_count()+1。我的机器是6和12线程的,所以我选择启动13个线程。

        你也可以尝试使用ProcessPoolExecutor类相同的接口方法来跑上边的代码。但是因为任务本身的特性,还是使用线程的性能稍好。

        # 使用多进程运行CPU密集型任务

        下边的示例,一个CPU密集型哈希函数。核心方法是顺序多次执行CPU密集型的哈希算法,另一个方法是多次执行这个核心方法。让我们来看下代码:

        import hashlib
        
        
        def hash_one(n):
            """A somewhat CPU-intensive task."""
        
            for i in range(1, n):
                hashlib.pbkdf2_hmac("sha256", b"password", b"salt", i * 10000)
        
            return "done"
        
        
        @timeit
        def hash_all(n):
            """Function that does hashing in serial."""
        
            for i in range(n):
                hsh = hash_one(n)
        
            return "done"
        
        
        if __name__ == "__main__":
            hash_all(20)
        
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        >>> hash_all => 18317.330598831177 ms
        
        1

        分析hash_one和hash_all方法,会发现一个共同点,他们都有一个CPU密集的for循环代码。它大概执行花费了18s,让我们来看下使用ProcessPoolExecutor版本:

        import hashlib
        from concurrent.futures import ProcessPoolExecutor
        
        
        def hash_one(n):
            """A somewhat CPU-intensive task."""
        
            for i in range(1, n):
                hashlib.pbkdf2_hmac("sha256", b"password", b"salt", i * 10000)
        
            return "done"
        
        
        @timeit
        def hash_all(n):
            """Function that does hashing in serial."""
        
            with ProcessPoolExecutor(max_workers=10) as executor:
                for arg, res in zip(range(n), executor.map(hash_one, range(n), chunksize=2)):
                    pass
        
            return "done"
        
        
        if __name__ == "__main__":
            hash_all(20)
        
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        >>> hash_all => 1673.842430114746 ms
        
        1

        仔细查看代码,你会发现在hash_one中for循环的代码,仍然是顺序执行的。但是,在hash_all中的for代码块是多进程执行的。这里我用10个工作进程按每个批次2个任务来并发执行。你可以尝试调整工作进程和并发任务数,已到达最佳的性能。这个并发版本比顺序版本快了近11倍。

        # 并发中常遇到的坑

        ThreadPoolExecutor和ProcessPoolExecutor它适用于简单的并发任务,有循环迭代的情况或仅允许运行子程序的情况。如果你的任务需要排队,或者需要用到多进程和多线程,你任然需要使用threading和multiprocessing模块。

        在使用ThreadPoolExecutor时,可能会出现死锁问题。当与Future相关联的可调用对象等待另一个Future的结果时,它们可能永远不会释放对线程的控制从而导致死锁。让我们来看一个示例:

        import time
        from concurrent.futures import ThreadPoolExecutor
        
        
        def wait_on_b():
            time.sleep(5)
            print(b.result())  # b will never complete because it is waiting on a.
            return 5
        
        
        def wait_on_a():
            time.sleep(5)
            print(a.result())  # a will never complete because it is waiting on b.
            return 6
        
        
        with ThreadPoolExecutor(max_workers=2) as executor:
            # here, the future from a depends on the future from b
            # and vice versa
            # so this is never going to be completed
            a = executor.submit(wait_on_b)
            b = executor.submit(wait_on_a)
        
            print("Result from wait_on_b", a.result())
            print("Result from wait_on_a", b.result())
        
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25

        在这个例子中,wait_on_b函数依赖wait_on_a函数的结果。同时wait_on_a函数又依赖wait_on_b函数的结果,代码由于死锁永远不会结束。让我们看一下另一种死锁情况:

        from concurrent.futures import ThreadPoolExecutor
        
        
        def wait_on_future():
            f = executor.submit(pow, 5, 2)
            # This will never complete because there is only one worker thread and
            # it is executing this function.
            print(f.result())
        
        
        with ThreadPoolExecutor(max_workers=1) as executor:
            future = executor.submit(wait_on_future)
            print(future.result())
        
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13

        上边的代码,在一个线程中,提交了一个子执行任务。在子任务结束之前,不会释放线程,但是线程又被主任务使用,便造成死锁。使用多个线程可解决这个问题,但是这种写法本身就非常糟糕的,不提倡的。

        有时候,并发代码的性能可能比顺序执行代码效率低下。发生这种情况的原因有很多:

        • 1、线程用于执行CPU密集的任务。
        • 2、使用多进程处理I/O密集的任务。
        • 3、任务非常繁碎,无法使用多线程或多进程来处理。

        生成和释放线程或进程是需要额外开销的。通常线程的操作比进程快的多,但是错误的使用反而会拖慢你的代码性能。下边是一个繁碎的示例,ThreadPoolExecutor和ProcessPoolExecutor性能均比顺序的性能差。

        import math
        
        PRIMES = [num for num in range(19000, 20000)]
        
        
        def is_prime(n):
            if n < 2:
                return False
            if n == 2:
                return True
            if n % 2 == 0:
                return False
        
            sqrt_n = int(math.floor(math.sqrt(n)))
            for i in range(3, sqrt_n + 1, 2):
                if n % i == 0:
                    return False
            return True
        
        
        @timeit
        def main():
            for number in PRIMES:
                print(f"{number} is prime: {is_prime(number)}")
        
        
        if __name__ == "__main__":
            main()
        
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        >>> 19088 is prime: False
        ... 19089 is prime: False
        ... 19090 is prime: False
        ... ...
        ... main => 67.65174865722656 ms
        
        1
        2
        3
        4
        5

        上边的示例验证列表中的数是否为质数。顺序版本大约花费了67ms完成。但是并发版本却化了 140ms 才完成。

        from concurrent.futures import ThreadPoolExecutor
        import math
        
        num_list = [num for num in range(19000, 20000)]
        
        
        def is_prime(n):
            if n < 2:
                return False
            if n == 2:
                return True
            if n % 2 == 0:
                return False
        
            sqrt_n = int(math.floor(math.sqrt(n)))
            for i in range(3, sqrt_n + 1, 2):
                if n % i == 0:
                    return False
            return True
        
        
        @timeit
        def main():
            with ThreadPoolExecutor(max_workers=13) as executor:
                for number, prime in zip(PRIMES, executor.map(is_prime, num_list)):
                    print(f"{number} is prime: {prime}")
        
        
        if __name__ == "__main__":
            main()
        
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        >>> 19088 is prime: False
        ... 19089 is prime: False
        ... 19090 is prime: False
        ... ...
        ... main => 140.17250061035156 ms
        
        1
        2
        3
        4
        5

        相同的代码,多进程版本:

        from concurrent.futures import ProcessPoolExecutor
        import math
        
        num_list = [num for num in range(19000, 20000)]
        
        
        def is_prime(n):
            if n < 2:
                return False
            if n == 2:
                return True
            if n % 2 == 0:
                return False
        
            sqrt_n = int(math.floor(math.sqrt(n)))
            for i in range(3, sqrt_n + 1, 2):
                if n % i == 0:
                    return False
            return True
        
        
        @timeit
        def main():
            with ProcessPoolExecutor(max_workers=13) as executor:
                for number, prime in zip(PRIMES, executor.map(is_prime, num_list)):
                    print(f"{number} is prime: {prime}")
        
        
        if __name__ == "__main__":
            main()
        
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        >>> 19088 is prime: False
        ... 19089 is prime: False
        ... 19090 is prime: False
        ... ...
        ... main => 311.3126754760742 ms
        
        1
        2
        3
        4
        5

        从直观上看,检测质数应该是CPU密集型的操作,最终结果却是顺序执行最快。所以,合理的评估任务计算是否值得使用多线程或多进程也是非常重要的。否则,我们写出的代码性能不一定更高效。

        # 参考

        • concurrent.fututures- the official documentation (opens new window)
        • Easy Concurrency in Python (opens new window)
        • Adventures in Python with concurrent.futures (opens new window)
        #Python#concurrent
        上次更新: 2023/03/28, 16:27:19
        最近更新
        01
        chromebox/chromebook 刷bios步骤
        03-01
        02
        redis 集群介绍
        11-28
        03
        go语法题二
        10-09
        更多文章>
        Theme by Vdoing | Copyright © 2015-2024 DeanWu | 遵循CC 4.0 BY-SA版权协议
        • 跟随系统
        • 浅色模式
        • 深色模式
        • 阅读模式