Python程序线程队列queue使用方法解析
概述
queue(队列)是多线程编程中常用的数据结构,用于在多个线程之间安全地传递数据。Python的queue模块提供了同步的、线程安全的队列类,包括FIFO(先进先出)队列、LIFO(后进先出)队列等。
本文将介绍Python queue的使用方法,包括队列的创建、添加、获取、判断队列是否为空以及使用join和task_done方法协同多个线程等。
创建队列
Python的queue模块中包含了三个类:Queue、LifoQueue、PriorityQueue,分别代表了三种队列。Queue是最普遍的一种队列,LifoQueue是后进先出队列,PriorityQueue则是优先级队列。
# 创建Queue队列
import queue
q = queue.Queue()
# 创建LifoQueue队列
import queue
q = queue.LifoQueue()
# 创建PriorityQueue队列
import queue
q = queue.PriorityQueue()
添加元素
put方法
put方法用于向队列中添加元素,如果队列满了就会阻塞。
import queue
q = queue.Queue()
q.put(1)
q.put(2)
q.put(3)
put_nowait方法
put_nowait方法和put方法类似,但是不会阻塞。
import queue
q = queue.Queue()
q.put_nowait(1)
q.put_nowait(2)
q.put_nowait(3)
获取元素
get方法
get方法用于从队列中获取元素,如果队列为空就会阻塞。
import queue
q = queue.Queue()
q.put(1)
q.put(2)
q.put(3)
print(q.get()) # 输出1
print(q.get()) # 输出2
print(q.get()) # 输出3
get_nowait方法
get_nowait方法和get方法类似,但是不会阻塞。
import queue
q = queue.Queue()
q.put(1)
q.put(2)
q.put(3)
print(q.get_nowait()) # 输出1
print(q.get_nowait()) # 输出2
print(q.get_nowait()) # 输出3
判断队列是否为空
如果队列为空,get方法和get_nowait方法都会阻塞。使用empty方法可以判断队列是否为空。
import queue
q = queue.Queue()
print(q.empty()) # 输出True
q.put(1)
print(q.empty()) # 输出False
使用join和task_done方法协同多个线程
当向队列中添加元素时,可以使用join方法等待队列中所有元素被取完。
import queue
import threading
def worker(q):
while True:
item = q.get()
if item is None:
break
# 执行任务
print(item)
q.task_done()
q = queue.Queue()
for i in range(5):
t = threading.Thread(target=worker, args=(q,))
t.daemon = True
t.start()
for item in range(10):
q.put(item)
q.join()
在上面的代码中,我们创建了5个线程,每个线程从队列中获取元素,执行任务。当队列中没有元素时,线程退出。
使用task_done方法可以通知队列,指定元素已经被处理完毕。
示例
示例1:使用Queue队列实现多线程传递数据
import queue
import threading
import time
# 生产者线程函数
def producer(q):
for i in range(10):
# 模拟生产产品
time.sleep(1)
product = "Product {}".format(i)
# 向队列中添加产品
q.put(product)
# 消费者线程函数
def consumer(q):
while True:
# 从队列中取出产品
product = q.get()
if product is None:
break
# 模拟处理产品
print("Processing {}".format(product))
time.sleep(2)
# 通知队列,该元素已经被处理完毕
q.task_done()
q = queue.Queue()
# 创建生产者线程
producer_thread = threading.Thread(target=producer, args=(q,))
producer_thread.start()
# 创建消费者线程
consumer_threads = []
for i in range(3):
consumer_thread = threading.Thread(target=consumer, args=(q,))
consumer_thread.start()
consumer_threads.append(consumer_thread)
# 等待所有的产品被处理完毕
producer_thread.join()
for i in range(3):
q.put(None)
for consumer_thread in consumer_threads:
consumer_thread.join()
q.join()
print("All products have been processed.")
示例2:使用PriorityQueue队列实现任务调度
import queue
import threading
import time
# 任务类
class Task:
def __init__(self, priority, name):
self.priority = priority
self.name = name
# 用于比较任务的优先级
def __lt__(self, other):
return self.priority < other.priority
# 用于打印任务
def __str__(self):
return "{}({})".format(self.name, self.priority)
# 执行任务的线程函数
def worker(q):
while True:
task = q.get()
if task is None:
break
# 模拟执行任务
print("Executing task: {}".format(task))
time.sleep(2)
# 通知队列,该元素已经被处理完毕
q.task_done()
q = queue.PriorityQueue()
# 创建多个执行任务的线程
workers = []
for i in range(3):
worker_thread = threading.Thread(target=worker, args=(q,))
worker_thread.start()
workers.append(worker_thread)
# 添加任务到队列
tasks = [
Task(2, "Task 1"),
Task(1, "Task 2"),
Task(3, "Task 3"),
Task(2, "Task 4"),
Task(1, "Task 5"),
]
for task in tasks:
q.put(task)
# 等待所有任务被处理完毕
q.join()
# 通知线程退出
for i in range(3):
q.put(None)
for worker_thread in workers:
worker_thread.join()
print("All tasks have been executed.")
总结
Python的queue模块提供了同步的、线程安全的队列类,可以用于多线程之间安全地传递数据。常用的队列有FIFO队列、LIFO队列和优先级队列。通过put和get方法向队列中添加和获取元素,并通过empty方法判断队列是否为空。使用join和task_done方法协同多个线程。