Python队列是多线程编程中不可或缺的数据结构,其基本操作包括创建、添加、读取和删除元素等。多线程队列可以帮助我们在多个线程之间共享数据,避免数据访问的冲突,提高程序的并发性能。
1. 创建队列
Python标准库中提供了两个队列模块:queue
和multiprocessing
。其中,queue
是线程安全的队列,适用于多线程编程;而multiprocessing
是进程队列,适用于多进程编程。
1.1 创建队列
创建队列的方法很简单,只需要调用queue.Queue()
函数即可:
import queue
q = queue.Queue() # 创建队列
1.2 创建优先级队列
有时我们需要在队列中设置元素的优先级,这时可以使用queue.PriorityQueue()
函数创建优先级队列:
import queue
q = queue.PriorityQueue() # 创建优先级队列
创建了队列之后,就可以使用队列提供的基本操作来操纵队列了。
2. 队列基本操作
2.1 添加元素到队列中
使用队列的put()
方法可以向队列中添加元素。如果队列已满,则put()
方法会一直等待队列中有空闲位置,或者等待超时,直到队列中有空闲位置。
q.put(item) # 往队列中添加元素
其中,item
是要添加的元素。
2.2 从队列中读取元素
使用队列的get()
方法可以从队列中读取元素。如果队列为空,则get()
方法会一直等待,直到队列中有元素为止。
item = q.get() # 从队列中获取元素
2.3 检查队列是否为空
使用队列的empty()
方法可以检查队列是否为空。如果队列为空,则返回True
;否则返回False
。
if q.empty():
print("队列为空")
else:
print("队列不为空")
2.4 检查队列是否已满
如果是固定大小的队列,使用队列的full()
方法可以检查队列是否已满。如果队列已满,则返回True
;否则返回False
。
if q.full():
print("队列已满")
else:
print("队列未满")
2.5 删除队列中的元素
使用队列的task_done()
方法可以删除队列中的元素。这个方法通常用于在get()
方法获取元素后,从队列中删除该元素。
q.task_done() # 删除队列中的元素
3. 多线程队列
多线程队列可以帮助实现多个线程之间的数据共享,避免数据访问冲突,提高程序并发性能。
3.1 向队列中添加元素
使用多线程队列的put()
方法可以向队列中添加元素。多线程队列中添加元素和普通队列相同。
import queue
import threading
def producer(q):
for i in range(10):
item = "item{}".format(i)
q.put(item)
print("生产者:生产一个元素:{}".format(item))
q = queue.Queue() # 创建多线程队列
t = threading.Thread(target=producer, args=(q,)) # 创建线程
t.start() # 启动线程
3.2 从队列中读取元素
使用多线程队列的get()
方法可以从队列中读取元素。多线程队列读取元素和普通队列相同。
import queue
import threading
def producer(q):
for i in range(10):
item = "item{}".format(i)
q.put(item)
print("生产者:生产一个元素:{}".format(item))
def consumer(q):
while True:
item = q.get()
if item is None:
break
print("消费者:消费一个元素:{}".format(item))
q = queue.Queue() # 创建多线程队列
t1 = threading.Thread(target=producer, args=(q,)) # 创建生产者线程
t2 = threading.Thread(target=consumer, args=(q,)) # 创建消费者线程
t1.start() # 启动生产者线程
t2.start() # 启动消费者线程
t1.join() # 等待生产者线程执行完毕
q.put(None) # 发送终止信号给消费者线程
t2.join() # 等待消费者线程执行完毕
在上面的示例中,我们创建了一个生产者线程和一个消费者线程。生产者线程不断向队列中添加元素,消费者线程则不断从队列中读取元素。同时,在生产者线程和消费者线程之间使用了None
来作为终止信号,当生产者线程生产完元素后,向队列中添加了一个None
,表示不再向队列中添加元素,消费者线程收到None
时,就会退出循环,终止线程的执行。
3.3 线程安全的队列操作
在多线程编程中,为了避免队列操作的竞争,我们应该使用线程安全的队列操作。例如,在读取元素时,如果队列为空,不应该一直等待队列中有元素,而是应该在等待一定时间之后,返回None
或抛出异常等。
item = q.get(block=True, timeout=None) # 阻塞读取元素,等待无限长的时间
item = q.get(block=True, timeout=1) # 阻塞读取元素,等待1秒钟
item = q.get(block=False) # 非阻塞读取元素,如果队列为空,直接返回None
上述代码中,get()
方法的block
参数可以设置读取元素时是否阻塞。如果block
为True
,则一直等待队列中有元素为止;如果为False
,则直接返回None
。
get()
方法的timeout
参数可以设置等待元素的最长时间。如果队列中有元素可读,则立即返回;如果等待时间超过了timeout
的值,则抛出queue.Empty
异常。
4. 示例
下面是一个简单的多线程队列示例。该程序创建了10个生产者线程和5个消费者线程,生产者线程不断向队列中添加元素,消费者线程则不断从队列中读取元素。
import queue
import threading
import time
import random
MAX_SIZE = 5 # 队列最大容量
PRODUCER_NUM = 10 # 生产者线程数量
CONSUMER_NUM = 5 # 消费者线程数量
def producer(q):
while True:
time.sleep(random.random()) # 随机等待一段时间
if q.qsize() < MAX_SIZE: # 判断队列是否已满
item = random.randint(1, 100) # 随机生成一个元素
q.put(item) # 添加元素到队列
print("生产者{}:向队列中添加一个元素:{}".format(threading.current_thread().name, item))
def consumer(q):
while True:
time.sleep(random.random()) # 随机等待一段时间
if not q.empty(): # 判断队列是否为空
item = q.get() # 从队列中读取一个元素
print("消费者{}:从队列中读取一个元素:{}".format(threading.current_thread().name, item))
q = queue.Queue(maxsize=MAX_SIZE) # 创建多线程队列
# 创建生产者线程
for i in range(PRODUCER_NUM):
t = threading.Thread(target=producer, args=(q,))
t.start()
# 创建消费者线程
for i in range(CONSUMER_NUM):
t = threading.Thread(target=consumer, args=(q,))
t.start()
在这个示例中,我们使用了random
模块来随机生成元素和等待时间。当队列已满时,生产者线程会等待一段时间之后再次尝试添加元素;当队列为空时,消费者线程会等待一段时间之后再次尝试读取元素。这样可以更好地模拟实际情况下的操作。