Python通过future处理并发问题

  • Post category:Python

Python通过future处理并发问题

在Python的标准库中,有一个名为concurrent.futures的库,它提供了一种方便的方式来处理并发任务。concurrent.futures实现了一些高级接口,允许我们使用线程或进程来异步执行任务,并通过future对象来处理结果。在本篇文章中,我们将介绍使用concurrent.futures处理并发问题的完整攻略。

Step 1: 异步执行任务

concurrent.futures提供了ThreadPoolExecutorProcessPoolExecutor用于线程池和进程池的创建。这两个类分别是ThreadPoolExecutorProcessPoolExecutor的子类。其中,ThreadPoolExecutor可用于并发执行函数和方法,ProcessPoolExecutor用于并发执行可串行化的任务。以ThreadPoolExecutor为例,代码如下所示:

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def process(n):
    # 模拟执行任务,这里使用time.sleep
    time.sleep(n)
    print(f"Process{n} 执行完成,总共花费时间:{n}s")
    return n

if __name__ == '__main__':
    numbers = [5, 3, 2, 1, 4]
    with ThreadPoolExecutor(max_workers=2) as executor:
        # 提交任务,返回future对象
        future_to_number = {executor.submit(process, number): number for number in numbers}
        for future in as_completed(future_to_number):
            number = future_to_number[future]
            try:
                # 获得结果
                result = future.result()
            except Exception as exc:
                print(f'Process{number} generated an exception: {exc}')
            else:
                print(f'Process{number} execute result: {result}')

这个例子中,我们首先定义了process函数, 它模拟了执行时间较长的任务。在__main__函数中,我们首先定义了任务列表numbers,并通过ThreadPoolExecutor来创建一个最大2个线程的线程池。然后我们循环提交任务,将future对象和元素值保存到一个字典中。接着,我们通过as_completed函数依次遍历future对象,检查其是否执行完毕并获得结果。最后,我们输出执行结果。

Step 2: 异步处理多个任务

在一些场景下,我们需要并行执行多个任务,比如下载多个文件,这时我们可以将多个任务分配到不同的线程或进程中,并通过Future对象来判断是否执行完毕以及结果。下面是一个示例:

from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
import requests

urls = [
    'https://www.baidu.com',
    'https://www.python.org',
    'https://docs.python.org/3/',
    'https://www.jd.com/',
    'https://www.taobao.com',
]

if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=2) as executor:
        # 提交任务并保存future对象
        future_to_url = {executor.submit(requests.get, url): url for url in urls}
        # 等待所有任务执行完毕
        wait(future_to_url.keys(), return_when=ALL_COMPLETED)
        # 输出执行结果
        for future in future_to_url:
            url = future_to_url[future]
            try:
                data = future.result()
            except Exception as exc:
                print(f'{url} generated an exception: {exc}')
            else:
                print(f'{url} execute finished,result:{data}')

在这个示例中,我们首先定义了需要下载的几个url,并通过ThreadPoolExecutor来创建一个最大2个线程的线程池。然后,我们循环提交任务,将future对象和url保存到一个字典中。接着,我们通过wait函数等待所有任务执行完毕。最后,我们输出执行结果。

Step 3: Future对象中其他属性的说明

除了result方法, Future对象还有一些其他的方法和属性,这里简单介绍几个常用的:

done

检查Future对象是否执行完毕,如果执行完毕则返回True,否则返回False

future.done()

running

检查Future对象是否在运行中,如果正在运行则返回True,否则返回False

future.running()

add_done_callback

添加回调函数,当Future对象执行完成后,回调函数将被调用。回调函数接收一个Future对象作为参数。

def callback(future):
    try:
        data = future.result()
    except Exception as exc:
        print('{0} generated an exception: {1}'.format(getattr(data, 'url', None), exc))
    else:
        print('{0} execute finished,result:{1}'.format(data.url, data))

future.add_done_callback(callback)

完整的攻略就是这样,我们介绍了concurrent.futures在使用Python处理并发问题中的一些基本知识。通过这篇文章,您可以了解如何使用Future等内置类和函数。