Python 多进程池进行并发处理

  • Post category:Python

Python多进程池可以实现并发处理,通过同时启动多个进程,来提高代码的执行效率。本文将详细讲解Python多进程池的使用方法,包括创建多进程池、添加任务、异步处理任务等操作。

创建多进程池

要使用Python多进程池,首先需要导入multiprocessing模块中的Pool类,然后实例化一个Pool对象,设置进程池的最大容量。

import multiprocessing

# 创建一个进程池,容量为5
pool = multiprocessing.Pool(5)

创建进程池后,就可以向进程池中添加任务了。

添加任务

要向进程池中添加任务,需要使用pool.apply_async()方法。此方法的参数为一个函数以及函数参数,实现了对多进程池的异步任务提交。

例如要进行计算操作,则可以将任务包装成一个函数,然后传递给pool.apply_async()

import time

def worker(num):
    print("子进程%d正在执行..." % num)
    time.sleep(2)
    return num * 2

for i in range(10):
    pool.apply_async(worker, (i,))

上述代码中,我们定义了一个名为worker的函数,函数接受一个参数num,然后在函数内部打印一个信息并休眠2秒,最后返回num的两倍。

接下来,我们通过for循环向进程池中添加10个任务,每个任务都调用worker函数,并将i作为函数参数传递给worker函数。

异步处理任务

任务添加完毕后,我们需要等待所有任务执行完毕,然后获取任务的结果。

可以使用进程池的get()方法来获取任务结果。该方法会阻塞主进程,直到所有任务均执行完毕并获取到结果,然后将结果返回。

pool.close()
pool.join()

results = []
while not pool._taskqueue.empty():
    result = pool.get()
    results.append(result)

print(results)

上述代码中,我们调用pool.close()方法关闭进程池,然后调用pool.join()方法等待任务执行完毕。接着,我们定义了一个空列表results用于接收任务结果,然后循环遍历进程池中的任务队列,调用pool.get()方法获取任务结果并添加到results列表中。最后,输出results列表。

示例说明

我们再来看两个具体的示例,加深对Python多进程池的理解。

示例1:使用多进程池下载文件

import os
import requests
import multiprocessing

# 定义要下载的文件列表
file_list = [
    "https://www.python.org/static/img/python-logo.png",
    "https://upload.wikimedia.org/wikipedia/commons/thumb/6/6d/Windows_Settings_app_icon.png/240px-Windows_Settings_app_icon.png",
    "https://upload.wikimedia.org/wikipedia/commons/thumb/8/8f/Gnome-mime-application-pdf.svg/1200px-Gnome-mime-application-pdf.svg.png",
    "https://upload.wikimedia.org/wikipedia/commons/thumb/4/4e/Antu_file-manager-folder-blue.svg/1200px-Antu_file-manager-folder-blue.svg.png",
    "https://www.google.com/images/branding/googlelogo/1x/googlelogo_color_272x92dp.png",
]

# 定义一个任务函数,下载文件并保存到指定路径
def download_file(url, path):
    print("子进程%s正在下载文件%s" % (os.getpid(), url))
    response = requests.get(url)
    with open(path, "wb") as f:
        f.write(response.content)

# 创建一个进程池,容量为3
pool = multiprocessing.Pool(3)

# 向进程池中添加任务,每个任务下载一个文件
for i, url in enumerate(file_list):
    filename = "file%d.png" % i
    pool.apply_async(download_file, args=(url, filename))

# 等待所有任务执行完毕
pool.close()
pool.join()

print("文件下载完毕!")

上述代码中,我们首先定义了一个包含5个下载链接的文件列表file_list。然后定义了一个名为download_file的函数用于下载文件,接受两个参数——文件下载链接url和文件保存路径path。

我们再通过创建进程池pool并向进程池中添加任务,每个任务调用download_file函数,传递对应的下载链接和文件保存路径。

执行完毕后,控制台上将会显示下载过程中的所有日志,下载完成后,将打印出“文件下载完毕!”的提示。

示例2:使用多进程池处理数据

import random
import multiprocessing

# 数据模拟:生成100个数据
data = [random.randint(0, 100) for _ in range(100)]
print("原始数据:", data)

# 定义一个任务函数,计算数据的平方
def square(num):
    return num ** 2

# 创建一个进程池,容量为4
pool = multiprocessing.Pool(4)

# 向进程池中添加任务,每个任务计算一个数据的平方
for i, d in enumerate(data):
    pool.apply_async(square, args=(d,))

# 等待所有任务执行完毕
pool.close()
pool.join()

# 输出处理结果
print("处理结果:", [res.get() for res in pool._cache])

上述代码中,我们首先生成100个随机整数,存储在data列表中。然后定义一个名为square的函数,用于计算一个数据的平方。

接下来,创建一个容量为4的进程池pool,并向进程池中添加任务,每个任务调用square函数,传递对应的数据作为参数。

然后,关闭进程池并等待所有任务执行完毕。最后,输出处理结果,使用pool._cache获取所有任务结果的缓存列表,遍历该列表并通过res.get()方法获取任务的结果。