In general, stacks are LIFO and queues are FIFO.
In Python, you can use the collections module to experiment with stacks and queues:
>>> from collections import deque
>>> stack = deque()
>>> stack.append(10)
>>> stack.append(20)
>>> stack.append(30)
>>> stack
deque([10, 20, 30])
>>> stack.pop() # LIFO
30
>>> stack.pop()
20
>>>
>>> queue = deque()
>>> queue.append(10)
>>> queue.append(20)
>>> queue.append(30)
>>> queue
deque([10, 20, 30])
>>> queue.popleft() # FIFO
10
>>> queue.popleft()
20
Answer from Raymond Hettinger on Stack OverflowIn general, stacks are LIFO and queues are FIFO.
In Python, you can use the collections module to experiment with stacks and queues:
>>> from collections import deque
>>> stack = deque()
>>> stack.append(10)
>>> stack.append(20)
>>> stack.append(30)
>>> stack
deque([10, 20, 30])
>>> stack.pop() # LIFO
30
>>> stack.pop()
20
>>>
>>> queue = deque()
>>> queue.append(10)
>>> queue.append(20)
>>> queue.append(30)
>>> queue
deque([10, 20, 30])
>>> queue.popleft() # FIFO
10
>>> queue.popleft()
20
See following links for more information:
Stack
Queue
Visually these two data structures can be seen in a following way:
Stack:

Description:
There are variations of this data structure. However, in simple terms - as one can observe in the image provided, when you add to this data structure you place on top of what is already there and when you remove you also take from the top. You can view it as a stack of books which you go through one by one starting from the top and all the way down.
Queue

Description:
There are also variations on this particular data structure, however in simple terms - as you can see in the image provided, when you add to this data structure the new element goes in the begining and when you remove its the last element from the list which is being removed. You can imagine it as a queue you got in a shop where you stand behind a lot of people waiting for your turn to come to the counter to pay for your items.
queue.SimpleQueue handles more than threadsafe concurrency. It handles reentrancy - it is safe to call queue.SimpleQueue.put in precarious situations where it might be interrupting other work in the same thread. For example, you can safely call it from __del__ methods, weakref callbacks, or signal module signal handlers.
If you need that, use queue.SimpleQueue.
The python documentations specifies that the simple queue cannot use the functionality of tracking (task_done, join). These can be used to track that every item in the queue has been processed by another process/ thread. example code:
import threading, queue
q = queue.Queue()
def worker():
while True:
item = q.get()
print(f'Working on {item}')
print(f'Finished {item}')
q.task_done()
# turn-on the worker thread
threading.Thread(target=worker, daemon=True).start()
# send thirty task requests to the worker
for item in range(30):
q.put(item)
print('All task requests sent\n', end='')
# block until all tasks are done
q.join()
print('All work completed')
In the above code the main thread uses join to wait for the other thread to finish processing every item it send. Meanwhile, the worker thread signals "task done" every time he handles an item in the queue. "task" is an item in the queue in this context.
Hope this helps,
for more documentation visit: https://docs.python.org/3/library/queue.html
queue.Queue and collections.deque serve different purposes. queue.Queue is intended for allowing different threads to communicate using queued messages/data, whereas collections.deque is simply intended as a data structure. That's why queue.Queue has methods like put_nowait(), get_nowait(), and join(), whereas collections.deque doesn't. queue.Queue isn't intended to be used as a collection, which is why it lacks the likes of the in operator.
It boils down to this: if you have multiple threads and you want them to be able to communicate without the need for locks, you're looking for queue.Queue; if you just want a queue or a double-ended queue as a datastructure, use collections.deque.
Finally, accessing and manipulating the internal deque of a queue.Queue is playing with fire - you really don't want to be doing that.
If all you're looking for is a thread-safe way to transfer objects between threads, then both would work (both for FIFO and LIFO). For FIFO:
Queue.put()andQueue.get()are thread-safe- Deques support thread-safe, memory efficient appends and pops from either side of the deque with approximately the same O(1) performance in either direction.
Note:
- Other operations on
dequemight not be thread safe, I'm not sure. dequedoes not block onpop()orpopleft()so you can't base your consumer thread flow on blocking till a new item arrives.
However, it seems that deque has a significant efficiency advantage. Here are some benchmark results in seconds using CPython 2.7.3 for inserting and removing 100k items
deque 0.0747888759791
Queue 1.60079066852
Here's the benchmark code:
import time
import Queue
import collections
q = collections.deque()
t0 = time.clock()
for i in xrange(100000):
q.append(1)
for i in xrange(100000):
q.popleft()
print 'deque', time.clock() - t0
q = Queue.Queue(200000)
t0 = time.clock()
for i in xrange(100000):
q.put(1)
for i in xrange(100000):
q.get()
print 'Queue', time.clock() - t0
JoinableQueue has methods join() and task_done(), which Queue hasn't.
class multiprocessing.Queue( [maxsize] )
Returns a process shared queue implemented using a pipe and a few locks/semaphores. When a process first puts an item on the queue a feeder thread is started which transfers objects from a buffer into the pipe.
The usual Queue.Empty and Queue.Full exceptions from the standard library’s Queue module are raised to signal timeouts.
Queue implements all the methods of Queue.Queue except for task_done() and join().
class multiprocessing.JoinableQueue( [maxsize] )
JoinableQueue, a Queue subclass, is a queue which additionally has task_done() and join() methods.
task_done()
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises a ValueError if called more times than there were items placed in the queue.
join()
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
If you use JoinableQueue then you must call JoinableQueue.task_done() for each task removed from the queue or else the semaphore used to count the number of unfinished tasks may eventually overflow, raising an exception.
Based on the documentation, it's hard to be sure that Queue is actually empty. With JoinableQueue you can wait for the queue to empty by calling q.join(). In cases where you want to complete work in distinct batches where you do something discrete at the end of each batch, this could be helpful.
For example, perhaps you process 1000 items at a time through the queue, then send a push notification to a user that you've completed another batch. This would be challenging to implement with a normal Queue.
It might look something like:
import multiprocessing as mp
BATCH_SIZE = 1000
STOP_VALUE = 'STOP'
def consume(q):
for item in iter(q.get, STOP_VALUE):
try:
process(item)
# Be very defensive about errors since they can corrupt pipes.
except Exception as e:
logger.error(e)
finally:
q.task_done()
q = mp.JoinableQueue()
with mp.Pool() as pool:
# Pull items off queue as fast as we can whenever they're ready.
for _ in range(mp.cpu_count()):
pool.apply_async(consume, q)
for i in range(0, len(URLS), BATCH_SIZE):
# Put `BATCH_SIZE` items in queue asynchronously.
pool.map_async(expensive_func, URLS[i:i+BATCH_SIZE], callback=q.put)
# Wait for the queue to empty.
q.join()
notify_users()
# Stop the consumers so we can exit cleanly.
for _ in range(mp.cpu_count()):
q.put(STOP_VALUE)
NB: I haven't actually run this code. If you pull items off the queue faster than you put them on, you might finish early. In that case this code sends an update AT LEAST every 1000 items, and maybe more often. For progress updates, that's probably ok. If it's important to be exactly 1000, you could use an mp.Value('i', 0) and check that it's 1000 whenever your join releases.
As Uri Goren astutely noted above, the Python stdlib already implemented an efficient queue on your fortunate behalf: collections.deque.
What Not to Do
Avoid reinventing the wheel by hand-rolling your own:
- Linked list implementation. While doing so reduces the worst-case time complexity of your
dequeue()andenqueue()methods to O(1), thecollections.dequetype already does so. It's also thread-safe and presumably more space and time efficient, given its C-based heritage. - Python list implementation. As I note below, implementing the
enqueue()methods in terms of a Python list increases its worst-case time complexity to O(n). Since removing the last item from a C-based array and hence Python list is a constant-time operation, implementing thedequeue()method in terms of a Python list retains the same worst-case time complexity of O(1). But who cares?enqueue()remains pitifully slow.
To quote the official deque documentation:
Though
listobjects support similar operations, they are optimized for fast fixed-length operations and incur O(n) memory movement costs forpop(0)andinsert(0, v)operations which change both the size and position of the underlying data representation.
More critically, deque also provides out-of-the-box support for a maximum length via the maxlen parameter passed at initialization time, obviating the need for manual attempts to limit the queue size (which inevitably breaks thread safety due to race conditions implicit in if conditionals).
What to Do
Instead, implement your Queue class in terms of the standard collections.deque type as follows:
from collections import deque
class Queue:
'''
Thread-safe, memory-efficient, maximally-sized queue supporting queueing and
dequeueing in worst-case O(1) time.
'''
def __init__(self, max_size = 10):
'''
Initialize this queue to the empty queue.
Parameters
----------
max_size : int
Maximum number of items contained in this queue. Defaults to 10.
'''
self._queue = deque(maxlen=max_size)
def enqueue(self, item):
'''
Queues the passed item (i.e., pushes this item onto the tail of this
queue).
If this queue is already full, the item at the head of this queue
is silently removed from this queue *before* the passed item is
queued.
'''
self._queue.append(item)
def dequeue(self):
'''
Dequeues (i.e., removes) the item at the head of this queue *and*
returns this item.
Raises
----------
IndexError
If this queue is empty.
'''
return self._queue.pop()
The proof is in the hellish pudding:
>>> queue = Queue()
>>> queue.enqueue('Maiden in Black')
>>> queue.enqueue('Maneater')
>>> queue.enqueue('Maiden Astraea')
>>> queue.enqueue('Flamelurker')
>>> print(queue.dequeue())
Flamelurker
>>> print(queue.dequeue())
Maiden Astraea
>>> print(queue.dequeue())
Maneater
>>> print(queue.dequeue())
Maiden in Black
It Is Dangerous to Go Alone
Actually, don't do that either.
You're better off just using a raw deque object rather than attempting to manually encapsulate that object in a Queue wrapper. The Queue class defined above is given only as a trivial demonstration of the general-purpose utility of the deque API.
The deque class provides significantly more features, including:
...iteration, pickling,
len(d),reversed(d),copy.copy(d),copy.deepcopy(d), membership testing with the in operator, and subscript references such asd[-1].
Just use deque anywhere a single- or double-ended queue is required. That is all.
You can keep head and tail node instead of a queue list in queue class
class Node:
def __init__(self, item = None):
self.item = item
self.next = None
self.previous = None
class Queue:
def __init__(self):
self.length = 0
self.head = None
self.tail = None
def enqueue(self, value):
newNode = Node(value)
if self.head is None:
self.head = self.tail = newNode
else:
self.tail.next = newNode
newNode.previous = self.tail
self.tail = newNode
self.length += 1
def dequeue(self):
item = self.head.item
self.head = self.head.next
self.length -= 1
if self.length == 0:
self.tail = None
return item
Though my understanding is limited about this subject, from what I did I can tell there is one main difference between multiprocessing.Queue() and multiprocessing.Manager().Queue():
- multiprocessing.Queue() is an object whereas multiprocessing.Manager().Queue() is an address (proxy) pointing to shared queue managed by the multiprocessing.Manager() object.
- therefore you can't pass normal multiprocessing.Queue() objects to Pool methods, because it can't be pickled.
- Moreover the python doc tells us to pay particular attention when using multiprocessing.Queue() because it can have undesired effects
Note When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe. This has some consequences which are a little surprising, but should not cause any practical difficulties – if they really bother you then you can instead use a queue created with a manager. After putting an object on an empty queue there may be an infinitesimal delay before the queue’s empty() method returns False and get_nowait() can return without raising Queue.Empty. If multiple processes are enqueuing objects, it is possible for the objects to be received at the other end out-of-order. However, objects enqueued by the same process will always be in the expected order with respect to each other.
Warning As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children. Note that a queue created using a manager does not have this issue.
There is a workaround to use multiprocessing.Queue() with Pool by setting the queue as a global variable and setting it for all processes at initialization :
queue = multiprocessing.Queue()
def initialize_shared(q):
global queue
queue=q
pool= Pool(nb_process,initializer=initialize_shared, initargs(queue,))
will create pool processes with correctly shared queues but we can argue that the multiprocessing.Queue() objects were not created for this use.
On the other hand the manager.Queue() can be shared between pool subprocesses by passing it as normal argument of a function.
In my opinion, using multiprocessing.Manager().Queue() is fine in every case and less troublesome. There might be some drawbacks using a manager but I'm not aware of it.
I have recently came over a problem with Manager().Queue(), when the SyncManager object - returned by multiprocessing.Manager() - seemingly dies, and the queues it manages block forever (even with *_nowait()).
I am not sure of the reason, or if the SyncManager really dies, the only clue I have is that I call multiprocessing.Manager() from a class instance, which has __del__(), which logs the process it is called from, and I can see this being __del__() called from the SyncManager process.
This means that my object has a copy in the SyncManager process, and it is garbage collected. This could mean that only my object was deleted, and the SyncManager is fine, but I do see that the corresponding queues becoming unresponsive correlate to the __del__() call in the SyncManager process.
I have no idea, how my object ends up in the SyncManager process. I usually pump out 50-200 managers - some with overlapping lifetimes, others not - until I see this problem. For objects that exist when the interpreter exits, __del__() is not called, and I usually not see the SyncManager objects dying by this log from __del__(), only on occasion. Probably when there is a problem, the SyncManager object first disposes of its objects, and only then will the interpreter exit, and this is Why I see the __del__() call on occasion.
I did see my queue become unresponsive even in cases, where I did not see the __del__() being called from the SyncManager.
I have also seen the SyncManager "die" without causing further problems.
By "unresponsive" I mean:
queue.get(timeout=1)
queue.put(timeout=1)
never return.
queue.get_nowait(timeout=1)
queue.put_nowait(timeout=1)
never return.
This became a bit more involved, then I originally wanted, but I let the details in, just in case it helps someone.
I used Manager().Queue() for a long time before without any problems. I suspect that either instantiating a lot of manager objects caused the problem, or instantiating a lot of managers led to a problem that has always existed surface.
I use Python 3.6.5.
There are a few things to mention.
At first, it might be a good idea to initialize both Queue and Stack with empty lists by default:
def __init__(self, queue=None):
self.queue = queue or [] # if queue is given, it overrides the default
Next, it doesn't make sense to write something like if len(self.queue)!=0 since it is equivalent to if self.queue (empty list is False when it's casted to bool). You can go even further and introduce the property called size using the built-in decorator:
@property
def size(self):
return len(self.queue)
Now it can be accessed by self.size without a function call.
Adopting an existing list is generally a bad idea. Code outside of the Queue and Stack classes could still hold a reference to the list and manipulate it behind your back. The simplest remedy is to always construct the object as an empty data structure.
In your Stack, adding and removing items at the front of a list (at index 0) gives the worst performance, since every existing element needs to be shifted over. This work is clearly visible in your .push(), but less obviously, self.stack.pop(0) also does that kind of work. For efficiency, you should append and truncate items at the end of the list instead.
What are the fundamental differences between queues and pipes in Python's
multiprocessingpackage?
Major Edit of this answer (CY2024): concurrency
As of modern python versions if you don't need your producers and consumers to communicate, that's the only real use-case for python multiprocessing.
If you only need python concurrency, use concurrent.futures.
This example uses concurrent.futures to make four calls to do_something_slow(), which has a one-second delay. If your machine has at least four cores, running this four-second-aggregate series of function calls only takes one-second.
By default, concurrent.futures spawns workers corresponding to the number of CPU cores you have.
import concurrent.futures
import time
def do_slow_thing(input_str: str) -> str:
"""Return modified input string after a 1-second delay"""
if isinstance(input_str, str):
time.sleep(1)
return "1-SECOND-DELAY " + input_str
else:
return "INPUT ERROR"
if __name__=="__main__":
# Define some inputs for process pool
all_inputs = [
"do",
"foo",
"moo",
"chew",
]
# Spawn a process pool with the default number of workers...
with concurrent.futures.ProcessPoolExecutor(max_workers=None) as executor:
# For each string in all_inputs, call do_slow_thing()
# in parallel across the process worker pool
these_futures = [executor.submit(do_slow_thing, ii) for ii in all_inputs]
# Wait for all processes to finish
concurrent.futures.wait(these_futures)
# Get the results from the process pool execution... each
# future.result() call is the return value from do_slow_thing()
string_outputs = [future.result() for future in these_futures]
for tmp in string_outputs:
print(tmp)
With at least four CPU cores, you'll see this printed after roughly one-second...
$ time python stackoverflow.py
1-SECOND-DELAY do
1-SECOND-DELAY foo
1-SECOND-DELAY moo
1-SECOND-DELAY chew
real 0m1.058s
user 0m0.060s
sys 0m0.017s
$
Original Answer
At this point, the only major use-case for multiprocessing is to facilitate your producers and consumers talking to each other during execution. Most people don't need that. However, if you want communication via queue / pipes, you can find my original answer to the OP's question below (which profiles how fast they are).
The existing comments on this answer refer to the aforementioned answer below
One additional feature of Queue() that is worth noting is the feeder thread. This section notes "When a process first puts an item on the queue a feeder thread is started which transfers objects from a buffer into the pipe." An infinite number of (or maxsize) items can be inserted into Queue() without any calls to queue.put() blocking. This allows you to store multiple items in a Queue(), until your program is ready to process them.
Pipe(), on the other hand, has a finite amount of storage for items that have been sent to one connection, but have not been received from the other connection. After this storage is used up, calls to connection.send() will block until there is space to write the entire item. This will stall the thread doing the writing until some other thread reads from the pipe. Connection objects give you access to the underlying file descriptor. On *nix systems, you can prevent connection.send() calls from blocking using the os.set_blocking() function. However, this will cause problems if you try to send a single item that does not fit in the pipe's file. Recent versions of Linux allow you to increase the size of a file, but the maximum size allowed varies based on system configurations. You should therefore never rely on Pipe() to buffer data. Calls to connection.send could block until data gets read from the pipe somehwere else.
In conclusion, Queue is a better choice than pipe when you need to buffer data. Even when you only need to communicate between two points.
For your second example, you already gave the explanation yourself---Queue is a module, which cannot be called.
For the third example: I assume that you use Queue.Queue together with multiprocessing. A Queue.Queue will not be shared between processes. If the Queue.Queue is declared before the processes then each process will receive a copy of it which is then independent of every other process. Items placed in the Queue.Queue by the parent before starting the children will be available to each child. Items placed in the Queue.Queue by the parent after starting the child will only be available to the parent. Queue.Queue is made for data interchange between different threads inside the same process (using the threading module). The multiprocessing queues are for data interchange between different Python processes. While the API looks similar (it's designed to be that way), the underlying mechanisms are fundamentally different.
multiprocessingqueues exchange data by pickling (serializing) objects and sending them through pipes.Queue.Queueuses a data structure that is shared between threads and locks/mutexes for correct behaviour.
Queue.Queue
Was created to work in concurrent environments spawned with the
threadingmodule.Each thread shares a reference to the
Queue.Queueobject among them. No copying or serialization of data happens here and all the threads have access to the same data inside the queue.
multiprocessing.Queue
Was created to work in parallel environments spawned with the
multiprocessingmodule.Each process gets access to a copy of the
multiprocessing.Queueobject among them. The contents of the queue are copied across the processes via pickle serialization. .
I'm not sure I'd consider semaphores and locks "more powerful methods", as you suggest.
Queues are generally a higher-order abstraction. In other words, you could use semaphores and locks to build thread-safe queues.
Which you'd use where depends on your application. Queues are good for passing "work" between threads and processes, and semaphores/locks are good for protecting critical sections or shared resources, so only one thread can access at a time.
Take a look at the source code for Python's thread-safe queue. The queue class builds a useful abstraction from 3 Conditions and a Lock, correctly.
I wouldn't say coordination is the hardest problem. In shared-state multithreading the hardest thing is preventing threads from "sharing". You always have to look out for non-deterministic behaviour due to threads accidentally sharing and stomping on each other's data.
So, I recommend you don't use threads at all. You should use the lower-level tools when you feel you haven't spent enough time tracking down heisenbugs, but if there's any way you can get away with using a simple queue, go for it.