Queue module is python 2 specific and queue module is python 3+
Importing the Queue module in python 3 will result in error
Answer from Abhishek on Stack Overflowimport - python queue vs. Queue? - Stack Overflow
SimpleQueue vs Queue in Python - what is the advantage of using SimpleQueue? - Stack Overflow
python - queue.Queue vs. collections.deque - Stack Overflow
python queue & multiprocessing queue: how they behave? - Stack Overflow
Videos
queue.SimpleQueue handles more than threadsafe concurrency. It handles reentrancy - it is safe to call queue.SimpleQueue.put in precarious situations where it might be interrupting other work in the same thread. For example, you can safely call it from __del__ methods, weakref callbacks, or signal module signal handlers.
If you need that, use queue.SimpleQueue.
The python documentations specifies that the simple queue cannot use the functionality of tracking (task_done, join). These can be used to track that every item in the queue has been processed by another process/ thread. example code:
import threading, queue
q = queue.Queue()
def worker():
while True:
item = q.get()
print(f'Working on {item}')
print(f'Finished {item}')
q.task_done()
# turn-on the worker thread
threading.Thread(target=worker, daemon=True).start()
# send thirty task requests to the worker
for item in range(30):
q.put(item)
print('All task requests sent\n', end='')
# block until all tasks are done
q.join()
print('All work completed')
In the above code the main thread uses join to wait for the other thread to finish processing every item it send. Meanwhile, the worker thread signals "task done" every time he handles an item in the queue. "task" is an item in the queue in this context.
Hope this helps,
for more documentation visit: https://docs.python.org/3/library/queue.html
queue.Queue and collections.deque serve different purposes. queue.Queue is intended for allowing different threads to communicate using queued messages/data, whereas collections.deque is simply intended as a data structure. That's why queue.Queue has methods like put_nowait(), get_nowait(), and join(), whereas collections.deque doesn't. queue.Queue isn't intended to be used as a collection, which is why it lacks the likes of the in operator.
It boils down to this: if you have multiple threads and you want them to be able to communicate without the need for locks, you're looking for queue.Queue; if you just want a queue or a double-ended queue as a datastructure, use collections.deque.
Finally, accessing and manipulating the internal deque of a queue.Queue is playing with fire - you really don't want to be doing that.
If all you're looking for is a thread-safe way to transfer objects between threads, then both would work (both for FIFO and LIFO). For FIFO:
Queue.put()andQueue.get()are thread-safe- Deques support thread-safe, memory efficient appends and pops from either side of the deque with approximately the same O(1) performance in either direction.
Note:
- Other operations on
dequemight not be thread safe, I'm not sure. dequedoes not block onpop()orpopleft()so you can't base your consumer thread flow on blocking till a new item arrives.
However, it seems that deque has a significant efficiency advantage. Here are some benchmark results in seconds using CPython 2.7.3 for inserting and removing 100k items
deque 0.0747888759791
Queue 1.60079066852
Here's the benchmark code:
import time
import Queue
import collections
q = collections.deque()
t0 = time.clock()
for i in xrange(100000):
q.append(1)
for i in xrange(100000):
q.popleft()
print 'deque', time.clock() - t0
q = Queue.Queue(200000)
t0 = time.clock()
for i in xrange(100000):
q.put(1)
for i in xrange(100000):
q.get()
print 'Queue', time.clock() - t0
For your second example, you already gave the explanation yourself---Queue is a module, which cannot be called.
For the third example: I assume that you use Queue.Queue together with multiprocessing. A Queue.Queue will not be shared between processes. If the Queue.Queue is declared before the processes then each process will receive a copy of it which is then independent of every other process. Items placed in the Queue.Queue by the parent before starting the children will be available to each child. Items placed in the Queue.Queue by the parent after starting the child will only be available to the parent. Queue.Queue is made for data interchange between different threads inside the same process (using the threading module). The multiprocessing queues are for data interchange between different Python processes. While the API looks similar (it's designed to be that way), the underlying mechanisms are fundamentally different.
multiprocessingqueues exchange data by pickling (serializing) objects and sending them through pipes.Queue.Queueuses a data structure that is shared between threads and locks/mutexes for correct behaviour.
Queue.Queue
Was created to work in concurrent environments spawned with the
threadingmodule.Each thread shares a reference to the
Queue.Queueobject among them. No copying or serialization of data happens here and all the threads have access to the same data inside the queue.
multiprocessing.Queue
Was created to work in parallel environments spawned with the
multiprocessingmodule.Each process gets access to a copy of the
multiprocessing.Queueobject among them. The contents of the queue are copied across the processes via pickle serialization. .
JoinableQueue has methods join() and task_done(), which Queue hasn't.
class multiprocessing.Queue( [maxsize] )
Returns a process shared queue implemented using a pipe and a few locks/semaphores. When a process first puts an item on the queue a feeder thread is started which transfers objects from a buffer into the pipe.
The usual Queue.Empty and Queue.Full exceptions from the standard libraryβs Queue module are raised to signal timeouts.
Queue implements all the methods of Queue.Queue except for task_done() and join().
class multiprocessing.JoinableQueue( [maxsize] )
JoinableQueue, a Queue subclass, is a queue which additionally has task_done() and join() methods.
task_done()
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises a ValueError if called more times than there were items placed in the queue.
join()
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
If you use JoinableQueue then you must call JoinableQueue.task_done() for each task removed from the queue or else the semaphore used to count the number of unfinished tasks may eventually overflow, raising an exception.
Based on the documentation, it's hard to be sure that Queue is actually empty. With JoinableQueue you can wait for the queue to empty by calling q.join(). In cases where you want to complete work in distinct batches where you do something discrete at the end of each batch, this could be helpful.
For example, perhaps you process 1000 items at a time through the queue, then send a push notification to a user that you've completed another batch. This would be challenging to implement with a normal Queue.
It might look something like:
import multiprocessing as mp
BATCH_SIZE = 1000
STOP_VALUE = 'STOP'
def consume(q):
for item in iter(q.get, STOP_VALUE):
try:
process(item)
# Be very defensive about errors since they can corrupt pipes.
except Exception as e:
logger.error(e)
finally:
q.task_done()
q = mp.JoinableQueue()
with mp.Pool() as pool:
# Pull items off queue as fast as we can whenever they're ready.
for _ in range(mp.cpu_count()):
pool.apply_async(consume, q)
for i in range(0, len(URLS), BATCH_SIZE):
# Put `BATCH_SIZE` items in queue asynchronously.
pool.map_async(expensive_func, URLS[i:i+BATCH_SIZE], callback=q.put)
# Wait for the queue to empty.
q.join()
notify_users()
# Stop the consumers so we can exit cleanly.
for _ in range(mp.cpu_count()):
q.put(STOP_VALUE)
NB: I haven't actually run this code. If you pull items off the queue faster than you put them on, you might finish early. In that case this code sends an update AT LEAST every 1000 items, and maybe more often. For progress updates, that's probably ok. If it's important to be exactly 1000, you could use an mp.Value('i', 0) and check that it's 1000 whenever your join releases.
I'm starting to dabble with Queue and Multithreading. Conceptually, I get when and where you would want to introduce these into your code. However, I just want to make sure I'm understanding the use case relationship between the two. Most online resources seem to treat the two as conjoined, but this seems to be under the assumption they will be used at larger scales.
My question, is Queue paired with Multithreading just so you can control the number of threads at any given moment without the program ending due to a lack of open threads? So, if I won't need more than 10 threads at a time then incorporating Queue might not be necessary. However, if I plan to have over 100 threads concurrently (assuming this is hitting some kind of processing limitation) then Queue should be incorporated?
Thanks!