Videos
The for loop is launching a number of worker threads to perform the function defined by "worker". Here is working code that should run on your system in python 2.7.
import Queue
import threading
# input queue to be processed by many threads
q_in = Queue.Queue(maxsize=0)
# output queue to be processed by one thread
q_out = Queue.Queue(maxsize=0)
# number of worker threads to complete the processing
num_worker_threads = 10
# process that each worker thread will execute until the Queue is empty
def worker():
while True:
# get item from queue, do work on it, let queue know processing is done for one item
item = q_in.get()
q_out.put(do_work(item))
q_in.task_done()
# squares a number and returns the number and its square
def do_work(item):
return (item,item*item)
# another queued thread we will use to print output
def printer():
while True:
# get an item processed by worker threads and print the result. Let queue know item has been processed
item = q_out.get()
print "%d squared is : %d" % item
q_out.task_done()
# launch all of our queued processes
def main():
# Launches a number of worker threads to perform operations using the queue of inputs
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.daemon = True
t.start()
# launches a single "printer" thread to output the result (makes things neater)
t = threading.Thread(target=printer)
t.daemon = True
t.start()
# put items on the input queue (numbers to be squared)
for item in range(10):
q_in.put(item)
# wait for two queues to be emptied (and workers to close)
q_in.join() # block until all tasks are done
q_out.join()
print "Processing Complete"
main()
Python 3 version per @handle
import queue
import threading
# input queue to be processed by many threads
q_in = queue.Queue(maxsize=0)
# output queue to be processed by one thread
q_out = queue.Queue(maxsize=0)
# number of worker threads to complete the processing
num_worker_threads = 10
# process that each worker thread will execute until the Queue is empty
def worker():
while True:
# get item from queue, do work on it, let queue know processing is done for one item
item = q_in.get()
q_out.put(do_work(item))
q_in.task_done()
# squares a number and returns the number and its square
def do_work(item):
return (item,item*item)
# another queued thread we will use to print output
def printer():
while True:
# get an item processed by worker threads and print the result. Let queue know item has been processed
item = q_out.get()
print("{0[0]} squared is : {0[1]}".format(item) )
q_out.task_done()
# launch all of our queued processes
def main():
# Launches a number of worker threads to perform operations using the queue of inputs
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.daemon = True
t.start()
# launches a single "printer" thread to output the result (makes things neater)
t = threading.Thread(target=printer)
t.daemon = True
t.start()
# put items on the input queue (numbers to be squared)
for item in range(10):
q_in.put(item)
# wait for two queues to be emptied (and workers to close)
q_in.join() # block until all tasks are done
q_out.join()
print( "Processing Complete" )
main()
You can think of the number of worker threads as the number of bank tellers at a bank. So people (your items) stand in line (your queue) to be processed by a bank teller (your worker thread). Queues are actually an easy and well understood mechanism to manage complexities in threads.
I have adjusted your code a bit to show how it works.
import queue
import time
from threading import Thread
def do_work(item):
print("processing", item)
def source():
item = 1
while True:
print("starting", item)
yield item
time.sleep(0.2)
item += 1
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
q = queue.Queue(maxsize=0)
def main():
for i in range(2):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
q.join() # block until all tasks are done
main()
When you call queue.join() in the main thread, all it does is block the main threads until the workers have processed everything that's in the queue. It does not stop the worker threads, which continue executing their infinite loops.
If the worker threads are non-deamon, their continuing execution prevents the program from stopping irrespective of whether the main thread has finished.
I encountered the situation too, everything in the queue had been processed, but the main thread blocked at the point of Queue.task_done(), here is code block.
import queue
def test04():
q = queue.Queue(10)
for x in range(10):
q.put(x)
while q.not_empty:
print('content--->',q.get())
sleep(1)
re = q.task_done()
print('state--->',re,'\n')
q.join()
print('over \n')
test04()
You do
from queue import *
This imports all the classes from the queue module already. Change that line to
q = Queue(maxsize=0)
CAREFUL: "Wildcard imports (from import *) should be avoided, as they make it unclear which names are present in the namespace, confusing both readers and many automated tools". (Python PEP-8)
As an alternative, one could use:
from queue import Queue
q = Queue(maxsize=0)
That's because you're using : from queue import *
and then you're trying to use :
queue.Queue(maxsize=0)
remove the queue part, because from queue import * imports all the attributes to the current namespace. :
Queue(maxsize=0)
or use import queue instead of from queue import *.