When several producer threads put items in a queue.Queue, and a consumer thread gets items from the queue, there is no way for the consumer thread to tell the producers that they should stop feeding the queue. This snippet defines a subclass of Queue with a close method. After the queue has been closed, producer threads which attempt to put items in the queue will receive a Closed exception.
Priority queues and lifo queues are provided as well.
This code has been tested with python 2.6 and 3.1.
Closable thread safe Queue.
"""closablequeue.py
Thread safe queues that can be closed (Tested with python 2.6 and 3.1).
This module subclasses a new class Queue from (Q/q)ueue.Queue
with an additional thread safe method Queue.close(). After this call,
no thread can put items in the queue and the method Queue.put raises
a new Closed exception. There may remain items in the queue when close
is called, this items can be extracted by calling items = queue.close(empty=True)
or by calling Queue.get() after the close.
Subclasses PriorityQueue and LifoQueue are provided as well.
"""
import threading
import sys
try:
import queue as _Queue
from queue import Empty, Full
except ImportError:
import Queue as _Queue
from Queue import Empty, Full
from functools import update_wrapper
class Closed(Exception):
pass
class Queue(_Queue.Queue):
def __init__(self, maxsize=0):
_Queue.Queue.__init__(self, maxsize)
self.use_put = threading.Lock()
self._can_put = True
def close(self, empty=False):
"""close and the queue, forbidding subsequent 'put'.
If 'empty' is true, empty the queue, and return the queue items"""
self.use_put.acquire()
self._can_put = False
self.use_put.release()
items = []
if empty:
try:
while True:
items.append(self.get_nowait())
except Empty:
pass
return items
def put(self, item, block=True, timeout=None):
"""Put an item into the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until a free slot is available. If 'timeout' is
a positive number, it blocks at most 'timeout' seconds and raises
the Full exception if no free slot was available within that time.
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
"""
self.not_full.acquire()
try:
if self.maxsize > 0:
if not block:
if self._qsize() == self.maxsize:
raise Full
elif timeout is None:
while self._qsize() == self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a positive number")
else:
endtime = _time() + timeout
while self._qsize() == self.maxsize:
remaining = endtime - _time()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self.use_put.acquire()
try:
if self._can_put:
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
else:
raise Closed
finally:
self.use_put.release()
finally:
self.not_full.release()
for name in ("PriorityQueue", "LifoQueue"):
exec("""
class {0}(Queue):
_init = _Queue.{0}._init
_qsize = _Queue.{0}._qsize
_put = _Queue.{0}._put
_get = _Queue.{0}._get
""".format(name))
#************************************************
# Put this test code in a different file:
#************************************************
#!/usr/bin/env python
"""testqueue.py
Test program for module closablequeue.
Description:
In this example, 2 workers fill a truck with bags. Each worker
has its own thread and put the bags in a closable queue created
by the main thread. When the worker catches the Closed exception,
it stops filling the queue and exit.
The main thread creates the truck and workers and a queue, and starts
the workers' threads. When the truck is full, it closes the queue,
waits for the end of the other threads and prints the content of
the truck. Each bag in the trucks shows which worker put that bag
in the truck.
Note that we use a queue of maximum size 3. It means that when the
main thread closes the queue, there may be 0, 1 2 or 3 bags in the queue.
The main thread therefore closes the queue before the truck is completely full,
so that it can add 3 more bags to the truck if necessary.
"""
from closablequeue import Queue, Closed
import threading
def main():
class Truck(object):
def __init__(self, capacity):
self.capacity = capacity
self.content = []
def add(self, package):
self.content.append(package)
class Worker(object):
def __init__(self, index):
self.index = index
self.thread = threading.Thread(target=self._fill)
def fill(self, queue):
self.queue = queue
self.thread.start()
def _fill(self):
print("worker {0} starting".format(self.index))
try:
while True:
self.queue.put([self.index])
except Closed:
print("worker {0} found a closed queue".format(self.index))
truck = Truck(50)
queue = Queue(3)
workers = []
for i in range(2):
workers.append(Worker(i))
workers[i].fill(queue)
load = 0
while load < truck.capacity - queue.maxsize:
truck.add(queue.get())
load += 1
print("closing queue")
items = queue.close(True)
truck.content.extend(items)
for w in workers:
w.thread.join()
print("joined worker {0}".format(w.index))
print("truck's content: {0}".format(truck.content))
main()
Gribouillis 1,391 Programming Explorer Team Colleague
Be a part of the DaniWeb community
We're a friendly, industry-focused community of developers, IT pros, digital marketers, and technology enthusiasts meeting, networking, learning, and sharing knowledge.