queue.SimpleQueue handles more than threadsafe concurrency. It handles reentrancy - it is safe to call queue.SimpleQueue.put in precarious situations where it might be interrupting other work in the same thread. For example, you can safely call it from __del__ methods, weakref callbacks, or signal module signal handlers.
If you need that, use queue.SimpleQueue.
queue.SimpleQueue handles more than threadsafe concurrency. It handles reentrancy - it is safe to call queue.SimpleQueue.put in precarious situations where it might be interrupting other work in the same thread. For example, you can safely call it from __del__ methods, weakref callbacks, or signal module signal handlers.
If you need that, use queue.SimpleQueue.
The python documentations specifies that the simple queue cannot use the functionality of tracking (task_done, join). These can be used to track that every item in the queue has been processed by another process/ thread. example code:
import threading, queue
q = queue.Queue()
def worker():
while True:
item = q.get()
print(f'Working on {item}')
print(f'Finished {item}')
q.task_done()
# turn-on the worker thread
threading.Thread(target=worker, daemon=True).start()
# send thirty task requests to the worker
for item in range(30):
q.put(item)
print('All task requests sent\n', end='')
# block until all tasks are done
q.join()
print('All work completed')
In the above code the main thread uses join to wait for the other thread to finish processing every item it send. Meanwhile, the worker thread signals "task done" every time he handles an item in the queue. "task" is an item in the queue in this context.
Hope this helps,
for more documentation visit: https://docs.python.org/3/library/queue.html
multithreading - Learning about Queue module in python (how to run it) - Stack Overflow
Implementing an efficient queue in Python - Stack Overflow
When do you use queues and tuples instead of lists?
Stacks and Queues, when to use them?
Videos
The for loop is launching a number of worker threads to perform the function defined by "worker". Here is working code that should run on your system in python 2.7.
import Queue
import threading
# input queue to be processed by many threads
q_in = Queue.Queue(maxsize=0)
# output queue to be processed by one thread
q_out = Queue.Queue(maxsize=0)
# number of worker threads to complete the processing
num_worker_threads = 10
# process that each worker thread will execute until the Queue is empty
def worker():
while True:
# get item from queue, do work on it, let queue know processing is done for one item
item = q_in.get()
q_out.put(do_work(item))
q_in.task_done()
# squares a number and returns the number and its square
def do_work(item):
return (item,item*item)
# another queued thread we will use to print output
def printer():
while True:
# get an item processed by worker threads and print the result. Let queue know item has been processed
item = q_out.get()
print "%d squared is : %d" % item
q_out.task_done()
# launch all of our queued processes
def main():
# Launches a number of worker threads to perform operations using the queue of inputs
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.daemon = True
t.start()
# launches a single "printer" thread to output the result (makes things neater)
t = threading.Thread(target=printer)
t.daemon = True
t.start()
# put items on the input queue (numbers to be squared)
for item in range(10):
q_in.put(item)
# wait for two queues to be emptied (and workers to close)
q_in.join() # block until all tasks are done
q_out.join()
print "Processing Complete"
main()
Python 3 version per @handle
import queue
import threading
# input queue to be processed by many threads
q_in = queue.Queue(maxsize=0)
# output queue to be processed by one thread
q_out = queue.Queue(maxsize=0)
# number of worker threads to complete the processing
num_worker_threads = 10
# process that each worker thread will execute until the Queue is empty
def worker():
while True:
# get item from queue, do work on it, let queue know processing is done for one item
item = q_in.get()
q_out.put(do_work(item))
q_in.task_done()
# squares a number and returns the number and its square
def do_work(item):
return (item,item*item)
# another queued thread we will use to print output
def printer():
while True:
# get an item processed by worker threads and print the result. Let queue know item has been processed
item = q_out.get()
print("{0[0]} squared is : {0[1]}".format(item) )
q_out.task_done()
# launch all of our queued processes
def main():
# Launches a number of worker threads to perform operations using the queue of inputs
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.daemon = True
t.start()
# launches a single "printer" thread to output the result (makes things neater)
t = threading.Thread(target=printer)
t.daemon = True
t.start()
# put items on the input queue (numbers to be squared)
for item in range(10):
q_in.put(item)
# wait for two queues to be emptied (and workers to close)
q_in.join() # block until all tasks are done
q_out.join()
print( "Processing Complete" )
main()
You can think of the number of worker threads as the number of bank tellers at a bank. So people (your items) stand in line (your queue) to be processed by a bank teller (your worker thread). Queues are actually an easy and well understood mechanism to manage complexities in threads.
I have adjusted your code a bit to show how it works.
import queue
import time
from threading import Thread
def do_work(item):
print("processing", item)
def source():
item = 1
while True:
print("starting", item)
yield item
time.sleep(0.2)
item += 1
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
q = queue.Queue(maxsize=0)
def main():
for i in range(2):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
q.join() # block until all tasks are done
main()
As Uri Goren astutely noted above, the Python stdlib already implemented an efficient queue on your fortunate behalf: collections.deque.
What Not to Do
Avoid reinventing the wheel by hand-rolling your own:
- Linked list implementation. While doing so reduces the worst-case time complexity of your
dequeue()andenqueue()methods to O(1), thecollections.dequetype already does so. It's also thread-safe and presumably more space and time efficient, given its C-based heritage. - Python list implementation. As I note below, implementing the
enqueue()methods in terms of a Python list increases its worst-case time complexity to O(n). Since removing the last item from a C-based array and hence Python list is a constant-time operation, implementing thedequeue()method in terms of a Python list retains the same worst-case time complexity of O(1). But who cares?enqueue()remains pitifully slow.
To quote the official deque documentation:
Though
listobjects support similar operations, they are optimized for fast fixed-length operations and incur O(n) memory movement costs forpop(0)andinsert(0, v)operations which change both the size and position of the underlying data representation.
More critically, deque also provides out-of-the-box support for a maximum length via the maxlen parameter passed at initialization time, obviating the need for manual attempts to limit the queue size (which inevitably breaks thread safety due to race conditions implicit in if conditionals).
What to Do
Instead, implement your Queue class in terms of the standard collections.deque type as follows:
from collections import deque
class Queue:
'''
Thread-safe, memory-efficient, maximally-sized queue supporting queueing and
dequeueing in worst-case O(1) time.
'''
def __init__(self, max_size = 10):
'''
Initialize this queue to the empty queue.
Parameters
----------
max_size : int
Maximum number of items contained in this queue. Defaults to 10.
'''
self._queue = deque(maxlen=max_size)
def enqueue(self, item):
'''
Queues the passed item (i.e., pushes this item onto the tail of this
queue).
If this queue is already full, the item at the head of this queue
is silently removed from this queue *before* the passed item is
queued.
'''
self._queue.append(item)
def dequeue(self):
'''
Dequeues (i.e., removes) the item at the head of this queue *and*
returns this item.
Raises
----------
IndexError
If this queue is empty.
'''
return self._queue.pop()
The proof is in the hellish pudding:
>>> queue = Queue()
>>> queue.enqueue('Maiden in Black')
>>> queue.enqueue('Maneater')
>>> queue.enqueue('Maiden Astraea')
>>> queue.enqueue('Flamelurker')
>>> print(queue.dequeue())
Flamelurker
>>> print(queue.dequeue())
Maiden Astraea
>>> print(queue.dequeue())
Maneater
>>> print(queue.dequeue())
Maiden in Black
It Is Dangerous to Go Alone
Actually, don't do that either.
You're better off just using a raw deque object rather than attempting to manually encapsulate that object in a Queue wrapper. The Queue class defined above is given only as a trivial demonstration of the general-purpose utility of the deque API.
The deque class provides significantly more features, including:
...iteration, pickling,
len(d),reversed(d),copy.copy(d),copy.deepcopy(d), membership testing with the in operator, and subscript references such asd[-1].
Just use deque anywhere a single- or double-ended queue is required. That is all.
You can keep head and tail node instead of a queue list in queue class
class Node:
def __init__(self, item = None):
self.item = item
self.next = None
self.previous = None
class Queue:
def __init__(self):
self.length = 0
self.head = None
self.tail = None
def enqueue(self, value):
newNode = Node(value)
if self.head is None:
self.head = self.tail = newNode
else:
self.tail.next = newNode
newNode.previous = self.tail
self.tail = newNode
self.length += 1
def dequeue(self):
item = self.head.item
self.head = self.head.next
self.length -= 1
if self.length == 0:
self.tail = None
return item