Queue.task_done is not there for the workers' benefit. It is there to support Queue.join.
If I give you a box of work assignments, do I care about when you've taken everything out of the box?
No. I care about when the work is done. Looking at an empty box doesn't tell me that. You and 5 other guys might still be working on stuff you took out of the box.
Queue.task_done lets workers say when a task is done. Someone waiting for all the work to be done with Queue.join will wait until enough task_done calls have been made, not when the queue is empty.
eigenfield points out in the comments that it seems really weird for a queue to have task_done/join methods. That's true, but it's really a naming problem. The queue module has bad name choices that make it sound like a general-purpose queue library, when it's really a thread communication library.
It'd be weird for a general-purpose queue to have task_done/join methods, but it's entirely reasonable for an inter-thread message channel to have a way to indicate that messages have been processed. If the class was called thread_communication.MessageChannel instead of queue.Queue and task_done was called message_processed, the intent would be a lot clearer.
(If you need a general-purpose queue rather than an inter-thread message channel, use collections.deque.)
Queue.task_done is not there for the workers' benefit. It is there to support Queue.join.
If I give you a box of work assignments, do I care about when you've taken everything out of the box?
No. I care about when the work is done. Looking at an empty box doesn't tell me that. You and 5 other guys might still be working on stuff you took out of the box.
Queue.task_done lets workers say when a task is done. Someone waiting for all the work to be done with Queue.join will wait until enough task_done calls have been made, not when the queue is empty.
eigenfield points out in the comments that it seems really weird for a queue to have task_done/join methods. That's true, but it's really a naming problem. The queue module has bad name choices that make it sound like a general-purpose queue library, when it's really a thread communication library.
It'd be weird for a general-purpose queue to have task_done/join methods, but it's entirely reasonable for an inter-thread message channel to have a way to indicate that messages have been processed. If the class was called thread_communication.MessageChannel instead of queue.Queue and task_done was called message_processed, the intent would be a lot clearer.
(If you need a general-purpose queue rather than an inter-thread message channel, use collections.deque.)
.task_done() is used to mark .join() that the processing is done.
If you use
.join()and don't call.task_done()for every processed item, your script will hang forever.
Ain't nothin' like a short example;
import logging
import queue
import threading
import time
items_queue = queue.Queue()
running = False
def items_queue_worker():
while running:
try:
item = items_queue.get(timeout=0.01)
if item is None:
continue
try:
process_item(item)
finally:
items_queue.task_done()
except queue.Empty:
pass
except:
logging.exception('error while processing item')
def process_item(item):
print('processing {} started...'.format(item))
time.sleep(0.5)
print('processing {} done'.format(item))
if __name__ == '__main__':
running = True
# Create 10 items_queue_worker threads
worker_threads = 10
for _ in range(worker_threads):
threading.Thread(target=items_queue_worker).start()
# Populate your queue with data
for i in range(100):
items_queue.put(i)
# Wait for all items to finish processing
items_queue.join()
running = False
multithreading - Python Queue get()/task_done() issue - Stack Overflow
Asyncio.Queue.task_done is not bound to particular object or `Queue.get` call. It can pass queue.join() with unprocessed objects in queue - Async-SIG - Discussions on Python.org
Behavior of Queue.task_done() and Queue.join()
task_done() called too many times?
Looking at Queue.task_done() and Queue.join(), it seems like the goal is to block the current thread until all items on the queue have been processed.
But this only works if we first put all items on the queue, yes? Otherwise the following could occur:
-
Produce
1 -
Consume
1,task_done() -
Stops
join -
Produce
2...
Am I missing something here?
I have a Queue where the code looks like this:
def doWork():
toprocess = []
while True:
while len(toprocess) <= 20 and not q.empty():
toprocess.append(q.get())
if len(toprocess) > 0:
for item in toprocess:
<do stuff>
dosomethingelse(toprocess)
toprocess = []
q.task_done()The idea is that it pulls 20 items off the queue, and then processes the items. But I get an error randomly, saying I've called task_done() too many times. Why would this have happened?
*fixed* Okay, thank you guys! Turns out you do need to run task_done() at some point after every call to q.get(). The fix was to run .task_done() x times, where x is the length of toprocess.