Python 的多进程池可以实现并发处理,提高程序的性能与响应速度。在使用多进程池时,我们需要关注池的创建、进程的分配以及进程的回收等方面。下面是Python多进程池的完整攻略,包含创建进程池、提交任务、回收进程等内容。
确定要执行的任务
在使用Python多进程池时,需要确定需要执行的任务,即需要在多个进程中同时完成的任务。可以使用multiprocessing
模块的Process
对象定义需要执行的函数。
在下面的示例中,我们定义了一个函数task
,它将对给定的数字求平方,模拟复杂的运算任务。这个任务将作为多进程池中的工作任务,通过多个进程来执行。
import time
def task(number):
"""定义任务函数"""
print(f"Process {number} starting...")
time.sleep(2)
result = number ** 2
print(f"Process {number} finished with result {result}")
return result
创建进程池
在确定要执行的任务之后,我们需要创建多进程池。可以使用multiprocessing
模块的Pool
对象来创建多进程池。我们可以指定池中的进程数,即同时执行任务的最大进程数。下面的示例中,我们创建了一个进程池,其中包含4个进程。
import multiprocessing
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
提交任务
创建进程池之后,我们需要将任务提交给进程池进行处理。可以使用apply_async
方法提交需要执行的任务,该方法可以异步地处理任务,并且返回一个AsyncResult
对象,该对象用于检查任务是否已经完成。
for i in range(10):
result = pool.apply_async(task, args=(i,))
上面的示例中,我们使用apply_async
方法将任务task(i)
提交给进程池。args=(i,)
指定了任务函数的参数,即需要处理的数字i
。这个方法会返回一个AsyncResult
对象,我们可以使用该对象来检查任务是否已经完成,还可以通过get
方法获取任务的返回值。
获取任务结果
任务函数执行完毕之后,我们需要获取任务的结果。可以使用get
方法获取任务的返回值。但是需要注意的是,如果之前使用了apply_async
方法异步处理任务,那么在获取任务结果之前,需要等待异步任务完成。
# 关闭池
pool.close()
# 等待所有任务完成
pool.join()
# 获取任务结果
print([result.get() for result in results])
在上面的示例中,我们使用close
方法关闭进程池。在调用close
方法之后,我们需要调用join
方法等待所有任务都已经完成。最后,我们使用列表推导式来获取所有任务的返回值,列表中每个元素对应一个任务的执行结果。
完整示例
下面的示例展示了一个完整的多进程池应用场景,其中包含创建进程池、提交任务、获取任务结果等步骤。
import multiprocessing
import time
def task(number):
"""定义任务函数"""
print(f"Process {number} starting...")
time.sleep(2)
result = number ** 2
print(f"Process {number} finished with result {result}")
return result
if __name__ == '__main__':
# 创建进程池
pool = multiprocessing.Pool(processes=4)
# 提交任务
results = []
for i in range(10):
result = pool.apply_async(task, args=(i,))
results.append(result)
# 关闭池
pool.close()
# 等待所有任务完成
pool.join()
# 获取任务结果
print([result.get() for result in results])
另一个示例
下面的示例将使用Python多进程池处理多个网页的请求,通过抓取HTML内容并且比对修改时间来判断是否需要更新页面内容。
import requests
import multiprocessing
import hashlib
import os
import time
def download_page(url):
"""下载指定URL的HTML页面"""
response = requests.get(url)
if response.status_code == 200:
return response.text
def hash_page(html):
"""计算HTML内容的哈希值"""
return hashlib.md5(html.encode('utf-8')).hexdigest()
def check_page(url):
"""检查HTML页面是否需要更新"""
html = download_page(url)
filepath = f"{hash_page(html)}.html"
if os.path.isfile(filepath):
with open(filepath, 'r') as file:
last_html = file.read()
if html == last_html:
print(f"[{os.getpid()}] Page {url} not changed.")
else:
print(f"[{os.getpid()}] Page {url} updated.")
with open(filepath, 'w') as file:
file.write(html)
else:
print(f"[{os.getpid()}] Page {url} created.")
with open(filepath, 'w') as file:
file.write(html)
if __name__ == '__main__':
urls = [
"https://www.baidu.com",
"https://www.qq.com",
"https://www.zhihu.com",
"https://www.github.com",
]
# 创建进程池
pool = multiprocessing.Pool(processes=4)
# 提交任务
for url in urls:
pool.apply_async(check_page, args=(url,))
# 关闭池
pool.close()
# 等待所有任务完成
pool.join()
在上面的示例中,我们定义了三个函数:download_page
用于下载指定URL的HTML页面,hash_page
用于计算HTML内容的哈希值,check_page
用于检查HTML页面是否需要更新。我们将使用check_page
函数作为多进程池的任务。
在主函数中,我们提前定义了要检查的网页URL列表urls
,然后通过循环提交到进程池中。最后等待所有进程完成,并输出抓取结果。