Python 使用多进程池和任务

  • Post category:Python

使用多进程可以加速Python程序的运行速度,需要用到Python中的multiprocessing库。其中,多进程池和任务的使用方法是常见且重要的。下面我们将详细讲解Python使用多进程池和任务的完整攻略,并提供两个实际示例帮助理解。

多进程池使用方法

创建进程池的代码如下:

import multiprocessing

pool = multiprocessing.Pool(processes=4)

其中,processes参数是指进程池中进程的数量。一般应该根据自己的CPU核心数来配置。如果不传递该参数,则默认使用系统 CPU 的核心数。接下来可以向进程池中添加任务:

result = pool.apply_async(func, args)

其中func是任务函数,args是func任务函数的参数,返回结果可以通过result.get()方法获取。

需要注意的是,在使用进程池的时候,要注意函数的可序列化和全局变量的引用问题,否则会报错。一般需要将需要进行操作的数据或对象序列化或放置在共享内存中。

任务使用方法

每个进程在运行的时候并发执行的任务可能不止一个,而多任务的最大级别是进程池,因此我们需要在进程池的基础上,对任务进行调度。对于某个需要在进程池中运行的函数,我们可以将其放到任务队列中,如下所示:

import multiprocessing

def worker():
    print('Worker')

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker)
        jobs.append(p)
        p.start()

通过以上代码,我们创建了5个进程,并且每个进程中都执行了同一段代码。如果修改了worker函数,那么每个进程在运行时就会执行新的代码。

示例1:并行运行任务

import time
import multiprocessing
from multiprocessing import Pool

def run_task(task):
    time.sleep(1)
    return task + " is done"

if __name__ == '__main__':
    tasks = ['task1', 'task2', 'task3', 'task4', 'task5']

    start_time = time.time()

    with Pool(processes=5) as pool:
        results = pool.map(run_task, tasks)

    end_time = time.time()

    cost_time = end_time - start_time
    print("共耗时:{}秒".format(round(cost_time, 2)))
    print("运行结果:")
    for result in results:
        print(result)

以上代码中,我们创建了一个任务函数run_task,该函数会完成一个任务,并返回任务完成的状态。

在主函数中,我们创建任务列表,为每个任务启动一个进程来完成任务,并利用map函数将任务列表映射到进程池中,获取任务执行的返回结果。

示例2:进程池中的队列

import time
import multiprocessing
from multiprocessing import Pool, Manager

def run_task(task, result_queue):
    time.sleep(1)
    result_queue.put(task + " is done")

if __name__ == '__main__':
    task_list = ['task1', 'task2', 'task3', 'task4', 'task5']
    result_queue = Manager().Queue()

    start_time = time.time()

    with Pool(processes=5) as pool:
        for task in task_list:
            pool.apply_async(run_task, (task, result_queue))
        pool.close()
        pool.join()

    end_time = time.time()

    cost_time = end_time - start_time

    print("共耗时:{}秒".format(round(cost_time,2)))
    print("运行结果:")
    while not result_queue.empty():
        print(result_queue.get())

以上代码中,我们添加了队列参数,实质是向每个进程传入了可共享的队列对象result_queue。每个进程完成后都将自己的结果存入队列。

在主函数中,我们使用Pool进程池引擎,将任务送入进程池中,这时每个进程可以完成各自的任务,结果将通过队列返回,主函数可以将队列中的结果获取,从而实现了进程池中的任务协调和结果汇总。