Python 多进程池进行并发处理

  • Post category:Python

Python 的多进程池可以实现并发处理,提高程序的性能与响应速度。在使用多进程池时,我们需要关注池的创建、进程的分配以及进程的回收等方面。下面是Python多进程池的完整攻略,包含创建进程池、提交任务、回收进程等内容。

确定要执行的任务

在使用Python多进程池时,需要确定需要执行的任务,即需要在多个进程中同时完成的任务。可以使用multiprocessing模块的Process对象定义需要执行的函数。

在下面的示例中,我们定义了一个函数task,它将对给定的数字求平方,模拟复杂的运算任务。这个任务将作为多进程池中的工作任务,通过多个进程来执行。

import time

def task(number):
    """定义任务函数"""
    print(f"Process {number} starting...")
    time.sleep(2)
    result = number ** 2
    print(f"Process {number} finished with result {result}")
    return result

创建进程池

在确定要执行的任务之后,我们需要创建多进程池。可以使用multiprocessing模块的Pool对象来创建多进程池。我们可以指定池中的进程数,即同时执行任务的最大进程数。下面的示例中,我们创建了一个进程池,其中包含4个进程。

import multiprocessing

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)

提交任务

创建进程池之后,我们需要将任务提交给进程池进行处理。可以使用apply_async方法提交需要执行的任务,该方法可以异步地处理任务,并且返回一个AsyncResult对象,该对象用于检查任务是否已经完成。

    for i in range(10):
        result = pool.apply_async(task, args=(i,))

上面的示例中,我们使用apply_async方法将任务task(i)提交给进程池。args=(i,)指定了任务函数的参数,即需要处理的数字i。这个方法会返回一个AsyncResult对象,我们可以使用该对象来检查任务是否已经完成,还可以通过get方法获取任务的返回值。

获取任务结果

任务函数执行完毕之后,我们需要获取任务的结果。可以使用get方法获取任务的返回值。但是需要注意的是,如果之前使用了apply_async方法异步处理任务,那么在获取任务结果之前,需要等待异步任务完成。

    # 关闭池
    pool.close()
    # 等待所有任务完成
    pool.join()

    # 获取任务结果
    print([result.get() for result in results])

在上面的示例中,我们使用close方法关闭进程池。在调用close方法之后,我们需要调用join方法等待所有任务都已经完成。最后,我们使用列表推导式来获取所有任务的返回值,列表中每个元素对应一个任务的执行结果。

完整示例

下面的示例展示了一个完整的多进程池应用场景,其中包含创建进程池、提交任务、获取任务结果等步骤。

import multiprocessing
import time


def task(number):
    """定义任务函数"""
    print(f"Process {number} starting...")
    time.sleep(2)
    result = number ** 2
    print(f"Process {number} finished with result {result}")
    return result


if __name__ == '__main__':
    # 创建进程池
    pool = multiprocessing.Pool(processes=4)
    # 提交任务
    results = []
    for i in range(10):
        result = pool.apply_async(task, args=(i,))
        results.append(result)
    # 关闭池
    pool.close()
    # 等待所有任务完成
    pool.join()
    # 获取任务结果
    print([result.get() for result in results])

另一个示例

下面的示例将使用Python多进程池处理多个网页的请求,通过抓取HTML内容并且比对修改时间来判断是否需要更新页面内容。

import requests
import multiprocessing
import hashlib
import os
import time


def download_page(url):
    """下载指定URL的HTML页面"""
    response = requests.get(url)
    if response.status_code == 200:
        return response.text


def hash_page(html):
    """计算HTML内容的哈希值"""
    return hashlib.md5(html.encode('utf-8')).hexdigest()


def check_page(url):
    """检查HTML页面是否需要更新"""
    html = download_page(url)
    filepath = f"{hash_page(html)}.html"
    if os.path.isfile(filepath):
        with open(filepath, 'r') as file:
            last_html = file.read()
        if html == last_html:
            print(f"[{os.getpid()}] Page {url} not changed.")
        else:
            print(f"[{os.getpid()}] Page {url} updated.")
            with open(filepath, 'w') as file:
                file.write(html)
    else:
        print(f"[{os.getpid()}] Page {url} created.")
        with open(filepath, 'w') as file:
            file.write(html)


if __name__ == '__main__':
    urls = [
        "https://www.baidu.com",
        "https://www.qq.com",
        "https://www.zhihu.com",
        "https://www.github.com",
    ]
    # 创建进程池
    pool = multiprocessing.Pool(processes=4)
    # 提交任务
    for url in urls:
        pool.apply_async(check_page, args=(url,))
    # 关闭池
    pool.close()
    # 等待所有任务完成
    pool.join()

在上面的示例中,我们定义了三个函数:download_page用于下载指定URL的HTML页面,hash_page用于计算HTML内容的哈希值,check_page用于检查HTML页面是否需要更新。我们将使用check_page函数作为多进程池的任务。

在主函数中,我们提前定义了要检查的网页URL列表urls,然后通过循环提交到进程池中。最后等待所有进程完成,并输出抓取结果。