解决python ThreadPoolExecutor 线程池中的异常捕获问题

  • Post category:Python

解决Python ThreadPoolExecutor线程池中的异常捕获问题,需要注意以下几点步骤:

1. 设置自定义异常处理函数

在线程池中执行的任务,如果触发了异常,通常会直接导致线程被杀死。为了保证线程池的健壮性和可靠性,我们需要提前设置自定义异常处理函数。该函数应该在发现异常时,对异常进行捕获、处理,并记录相应的日志信息。

以下是一个示例的自定义异常处理函数handle_exception:

import traceback

def handle_exception(executor, context):
    """自定义异常处理函数"""
    # 记录日志信息
    print(f"An exception occurred: {context['exception']}")
    print(traceback.format_exc())

handle_exception函数接受两个参数:executorcontext。其中executor表示线程池对象,context表示任务上下文信息。当线程池中的任务出错时,handle_exception函数将会被调用,将错误信息输出到控制台,并使用traceback库打印堆栈信息。

2. 使用submit函数提交任务

在线程池中提交任务,需要使用ThreadPoolExecutorsubmit方法。submit方法接收一个可调用对象作为参数,该对象会被线程池异步执行。如果需要在提交任务的过程中传递参数,可以使用functools.partial函数对可调用对象进行封装。

以下是一个简单的例子:

from concurrent.futures import ThreadPoolExecutor
import functools

def task(param):
    """线程池中执行的任务"""
    # 模拟一个异常
    if param == 'error':
        raise Exception('Error')

    print(f"Task: {param}")
    return param

with ThreadPoolExecutor(max_workers=2, initializer=None, initargs=()) as executor:
    # 提交任务
    future1 = executor.submit(task, 'param1')
    future2 = executor.submit(task, 'param2')
    future3 = executor.submit(task, 'error')

    # 主线程等待任务执行完成
    for future in [future1, future2, future3]:
        try:
            result = future.result()
        except Exception as exc:
            # 如果任务抛出异常,异常将通过自定义异常处理函数进行处理
            executor.handle_exception(future, {"exception": exc})

在上述代码中,我们创建了一个最多可以同时运行2个线程的线程池。执行三个任务:task('param1')task('param2')task('error'),其中第三个任务会抛出Exception异常。通过submit方法提交任务后,我们使用了for循环来等待这些任务完成,并在捕获到异常时调用handle_exception函数进行异常处理。通过这样的方式,我们可以保证线程池的异常健壮性。

示例说明

现在我们以一个网站爬虫为例,演示如何使用ThreadPoolExecutor处理异常。

假设我们的爬虫程序需要爬取某个网站中的所有文章,每篇文章爬取一个线程。

import requests
from bs4 import BeautifulSoup

def retrieve_article(article_url):
    """获取一篇文章的内容"""
    page = requests.get(article_url)
    soup = BeautifulSoup(page.content, 'html.parser')
    article = soup.find('div', class_='article').get_text()
    return article

def crawl_website(website_url):
    """爬取指定网站中的所有文章"""
    page = requests.get(website_url)
    soup = BeautifulSoup(page.content, 'html.parser')
    article_links = soup.find_all('a', class_='article-link')

    # 将每篇文章存储到一个List中
    articles = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        # 提交任务
        future_to_url = {executor.submit(retrieve_article, link['href']): link['href'] for link in article_links}

        # 等待所有任务完成
        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                article = future.result()
            except Exception as exc:
                # 如果任务抛出异常,异常将通过自定义异常处理函数进行处理
                executor.handle_exception(future, {"exception": exc})
            else:
                # 没有异常的情况下,将文章添加到articles集合中
                articles.append((url, article))

    # 返回所有文章
    return articles

在上述代码中,我们首先使用requestsBeautifulSoup库获取网站中所有的文章链接,然后创建一个最多可以同时运行10个线程的线程池,并将每个文章的抓取任务添加到线程池中。当爬取任务完成时,我们将抓取的文章内容存储到articles集合中,并在发生异常的情况下调用handle_exception函数进行异常处理。最后返回爬取的所有文章。