python程序 线程队列queue使用方法解析

  • Post category:Python

Python程序线程队列queue使用方法解析

概述

queue(队列)是多线程编程中常用的数据结构,用于在多个线程之间安全地传递数据。Python的queue模块提供了同步的、线程安全的队列类,包括FIFO(先进先出)队列、LIFO(后进先出)队列等。

本文将介绍Python queue的使用方法,包括队列的创建、添加、获取、判断队列是否为空以及使用join和task_done方法协同多个线程等。

创建队列

Python的queue模块中包含了三个类:Queue、LifoQueue、PriorityQueue,分别代表了三种队列。Queue是最普遍的一种队列,LifoQueue是后进先出队列,PriorityQueue则是优先级队列。

# 创建Queue队列
import queue
q = queue.Queue()

# 创建LifoQueue队列
import queue
q = queue.LifoQueue()

# 创建PriorityQueue队列
import queue
q = queue.PriorityQueue()

添加元素

put方法

put方法用于向队列中添加元素,如果队列满了就会阻塞。

import queue
q = queue.Queue()
q.put(1)
q.put(2)
q.put(3)

put_nowait方法

put_nowait方法和put方法类似,但是不会阻塞。

import queue
q = queue.Queue()
q.put_nowait(1)
q.put_nowait(2)
q.put_nowait(3)

获取元素

get方法

get方法用于从队列中获取元素,如果队列为空就会阻塞。

import queue
q = queue.Queue()
q.put(1)
q.put(2)
q.put(3)
print(q.get())  # 输出1
print(q.get())  # 输出2
print(q.get())  # 输出3

get_nowait方法

get_nowait方法和get方法类似,但是不会阻塞。

import queue
q = queue.Queue()
q.put(1)
q.put(2)
q.put(3)
print(q.get_nowait())  # 输出1
print(q.get_nowait())  # 输出2
print(q.get_nowait())  # 输出3

判断队列是否为空

如果队列为空,get方法和get_nowait方法都会阻塞。使用empty方法可以判断队列是否为空。

import queue
q = queue.Queue()
print(q.empty())  # 输出True
q.put(1)
print(q.empty())  # 输出False

使用join和task_done方法协同多个线程

当向队列中添加元素时,可以使用join方法等待队列中所有元素被取完。

import queue
import threading

def worker(q):
    while True:
        item = q.get()
        if item is None:
            break
        # 执行任务
        print(item)
        q.task_done()

q = queue.Queue()

for i in range(5):
    t = threading.Thread(target=worker, args=(q,))
    t.daemon = True
    t.start()

for item in range(10):
    q.put(item)

q.join()

在上面的代码中,我们创建了5个线程,每个线程从队列中获取元素,执行任务。当队列中没有元素时,线程退出。

使用task_done方法可以通知队列,指定元素已经被处理完毕。

示例

示例1:使用Queue队列实现多线程传递数据

import queue
import threading
import time

# 生产者线程函数
def producer(q):
    for i in range(10):
        # 模拟生产产品
        time.sleep(1)
        product = "Product {}".format(i)
        # 向队列中添加产品
        q.put(product)

# 消费者线程函数
def consumer(q):
    while True:
        # 从队列中取出产品
        product = q.get()
        if product is None:
            break
        # 模拟处理产品
        print("Processing {}".format(product))
        time.sleep(2)
        # 通知队列,该元素已经被处理完毕
        q.task_done()

q = queue.Queue()

# 创建生产者线程
producer_thread = threading.Thread(target=producer, args=(q,))
producer_thread.start()

# 创建消费者线程
consumer_threads = []
for i in range(3):
    consumer_thread = threading.Thread(target=consumer, args=(q,))
    consumer_thread.start()
    consumer_threads.append(consumer_thread)

# 等待所有的产品被处理完毕
producer_thread.join()
for i in range(3):
    q.put(None)
for consumer_thread in consumer_threads:
    consumer_thread.join()
q.join()
print("All products have been processed.")

示例2:使用PriorityQueue队列实现任务调度

import queue
import threading
import time

# 任务类
class Task:
    def __init__(self, priority, name):
        self.priority = priority
        self.name = name

    # 用于比较任务的优先级
    def __lt__(self, other):
        return self.priority < other.priority

    # 用于打印任务
    def __str__(self):
        return "{}({})".format(self.name, self.priority)

# 执行任务的线程函数
def worker(q):
    while True:
        task = q.get()
        if task is None:
            break
        # 模拟执行任务
        print("Executing task: {}".format(task))
        time.sleep(2)
        # 通知队列,该元素已经被处理完毕
        q.task_done()

q = queue.PriorityQueue()

# 创建多个执行任务的线程
workers = []
for i in range(3):
    worker_thread = threading.Thread(target=worker, args=(q,))
    worker_thread.start()
    workers.append(worker_thread)

# 添加任务到队列
tasks = [
    Task(2, "Task 1"),
    Task(1, "Task 2"),
    Task(3, "Task 3"),
    Task(2, "Task 4"),
    Task(1, "Task 5"),
]
for task in tasks:
    q.put(task)

# 等待所有任务被处理完毕
q.join()

# 通知线程退出
for i in range(3):
    q.put(None)
for worker_thread in workers:
    worker_thread.join()

print("All tasks have been executed.")

总结

Python的queue模块提供了同步的、线程安全的队列类,可以用于多线程之间安全地传递数据。常用的队列有FIFO队列、LIFO队列和优先级队列。通过put和get方法向队列中添加和获取元素,并通过empty方法判断队列是否为空。使用join和task_done方法协同多个线程。