Python 使用多进程池和任务

  • Post category:Python

Python中的多进程池和任务可以用来提高程序的并发性能,实现并发处理任务。下面将详细讲解Python使用多进程池和任务的完整攻略,包括多进程池的创建和使用、任务的提交和回调处理。

多进程池的创建和使用

在Python中,可以通过multiprocessing模块中的Pool类来创建多进程池。Pool类可以接收一个参数,表示进程池中最大进程数。创建进程池后,可以在其中提交任务,由进程池自动分配进程来执行任务。

下面是一个简单的示例,创建一个进程池并提交5个任务,打印任务执行结果:

import multiprocessing

def f(x):
    return x * x

if __name__ == '__main__':
    with multiprocessing.Pool(processes=4) as pool:
        for result in pool.map(f, range(5)):
            print(result)

在上面的示例中,创建了一个进程池,最大进程数为4。在提交任务时使用了pool.map()方法,它可以接收一个可迭代对象和一个函数。pool.map()方法会将可迭代对象中的每个元素,作为参数传入函数中执行,并返回一个结果列表。在本例中,将1到5的整数传入计算平方的函数f()中,并通过pool.map()方法获取平方值的结果。最终结果将被打印输出。

任务的提交和回调处理

在多进程池中,任务主要通过apply()apply_async()map()map_async()方法进行提交。

apply()方法会同步提交任务,直到任务完成并返回结果,程序才会继续往下执行。而apply_async()方法是异步提交任务,提交任务后立即返回一个AsyncResult对象,表示任务的执行情况,并继续往下执行程序。

如果需要对执行任务的结果进行处理,则可以通过AsyncResult对象的get()方法获取任务执行结果,或者通过get()方法的回调函数来处理结果。调用get()方法时,程序会阻塞等待任务完成并返回结果。

下面是一个使用回调函数处理任务结果的示例:

import multiprocessing

def f(x):
    return x * x

def callback(result):
    print(result)

if __name__ == '__main__':
    with multiprocessing.Pool(processes=4) as pool:
        for i in range(5):
            pool.apply_async(f, (i,), callback=callback)
        pool.close()
        pool.join()

在上面的示例中,提交了5个任务,通过apply_async()方法设置了回调函数callback,用于处理任务执行结果。当任务完成并返回结果时,会自动调用回调函数,并将结果传给它。最终结果将被打印输出。

另一个任务处理的示例

下面是另一个任务处理的示例,这个示例用来统计一个目录下的所有文件的行数。我们将每个文件的行数统计任务提交到进程池中,并在回调函数中将所有文件的行数求和返回。

import multiprocessing
import os

def count_lines(file_path):
    with open(file_path, 'r') as f:
        return len(f.readlines())

def count_lines_callback(results):
    total_lines = 0
    for lines in results:
        total_lines += lines
    print('Total lines:', total_lines)

if __name__ == '__main__':
    file_dir = 'path/to/dir'
    file_list = [os.path.join(file_dir, filename) for filename in os.listdir(file_dir) if os.path.isfile(os.path.join(file_dir, filename))]
    with multiprocessing.Pool(processes=4) as pool:
        results = []
        for file_path in file_list:
            results.append(pool.apply_async(count_lines, (file_path,)))
        pool.close()
        pool.join()
        count_lines_callback([r.get() for r in results])

在上面的示例中,首先获取了一个目录下所有的文件列表,并将每个文件的行数统计任务提交到进程池中。在任务完成后,通过get()方法的回调函数将所有文件的行数求和,并输出统计结果。

以上就是关于Python使用多进程池和任务的完整攻略,希望对你有帮助。