Just call Executor.shutdown:
shutdown(wait=True)Signal the executor that it should free any resources that it is using when the currently pending futures are done executing. Calls to
Executor.submit()andExecutor.map()made after shutdown will raiseRuntimeError.If wait is
Truethen this method will not return until all the pending futures are done executing and the resources associated with the executor have been freed.
However if you keep track of your futures in a list then you can avoid shutting the executor down for future use using the futures.wait() function:
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)Wait for the
Futureinstances (possibly created by differentExecutorinstances) given byfsto complete. Returns a named 2-tuple of sets. The first set, named done, contains the futures that completed (finished or were cancelled) before the wait completed. The second set, named not_done, contains uncompleted futures.
note that if you don't provide a timeout it waits until all futures have completed.
You can also use futures.as_completed() instead, however you'd have to iterate over it.
Just call Executor.shutdown:
shutdown(wait=True)Signal the executor that it should free any resources that it is using when the currently pending futures are done executing. Calls to
Executor.submit()andExecutor.map()made after shutdown will raiseRuntimeError.If wait is
Truethen this method will not return until all the pending futures are done executing and the resources associated with the executor have been freed.
However if you keep track of your futures in a list then you can avoid shutting the executor down for future use using the futures.wait() function:
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)Wait for the
Futureinstances (possibly created by differentExecutorinstances) given byfsto complete. Returns a named 2-tuple of sets. The first set, named done, contains the futures that completed (finished or were cancelled) before the wait completed. The second set, named not_done, contains uncompleted futures.
note that if you don't provide a timeout it waits until all futures have completed.
You can also use futures.as_completed() instead, however you'd have to iterate over it.
As stated before, one can use Executor.shutdown(wait=True), but also pay attention to the following note in the documentation:
You can avoid having to call this method explicitly if you use the
withstatement, which will shutdown theExecutor(waiting as ifExecutor.shutdown()were called withwaitset toTrue):import shutil with ThreadPoolExecutor(max_workers=4) as e: e.submit(shutil.copy, 'src1.txt', 'dest1.txt') e.submit(shutil.copy, 'src2.txt', 'dest2.txt') e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
ThreadPoolExecutor problem with recursive tasks
python - How do I wait for ThreadPoolExecutor.map to finish - Stack Overflow
How to Make Python Wait
python - How do I wait when all ThreadPoolExecutor threads are busy? - Stack Overflow
The call to ThreadPoolExecutor.map does not block until all of its tasks are complete. Use wait to do this.
Copyfrom concurrent.futures import wait, ALL_COMPLETED
...
futures = [pool.submit(fn, args) for args in arg_list]
wait(futures, timeout=whatever, return_when=ALL_COMPLETED) # ALL_COMPLETED is actually the default
do_other_stuff()
You could also call list(results) on the generator returned by pool.map to force the evaluation (which is what you're doing in your original example). If you're not actually using the values returned from the tasks, though, wait is the way to go.
It's true that Executor.map() will not wait for all futures to finish. Because it returns a lazy iterator like @MisterMiyagi said.
But we can accomplish this by using with:
Copyimport time
from concurrent.futures import ThreadPoolExecutor
def hello(i):
time.sleep(i)
print(i)
with ThreadPoolExecutor(max_workers=2) as executor:
executor.map(hello, [1, 2, 3])
print("finish")
# output
# 1
# 2
# 3
# finish
As you can see, finish is printed after 1,2,3. It works because Executor has a __exit__() method, code is
Copydef __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
the shutdown method of ThreadPoolExecutor is
Copydef shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
self._shutdown = True
if cancel_futures:
# Drain all work items from the queue, and then cancel their
# associated futures.
while True:
try:
work_item = self._work_queue.get_nowait()
except queue.Empty:
break
if work_item is not None:
work_item.future.cancel()
# Send a wake-up to prevent threads calling
# _work_queue.get(block=True) from permanently blocking.
self._work_queue.put(None)
if wait:
for t in self._threads:
t.join()
shutdown.__doc__ = _base.Executor.shutdown.__doc__
So by using with, we can get the ability to wait until all futures finish.
One approach might be to keep track of your currently running threads via a set of Futures:
active_threads = set()
def pop_future(future):
active_threads.pop(future)
with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENCY) as executor:
while True:
while len(active_threads) >= CONCURRENCY:
time.sleep(0.1) # or whatever
message = pull_from_queue()
future = executor.submit(do_work_for_message, message)
active_threads.add(future)
future.add_done_callback(pop_future)
A more sophisticated approach might be to have the done_callback be the thing that triggers a queue pull, rather than polling and blocking, but then you need to fall back to polling the queue if the workers manage to get ahead of it.
Base on @Samwise's answer (https://stackoverflow.com/a/73396000/8388869), I have expand the ThreadPoolExecutor
import time
from concurrent.futures import Future, ThreadPoolExecutor
class AvailableThreadPoolExecutor(ThreadPoolExecutor):
"""ThreadPoolExecutor that keeps track of the number of available workers.
Refs:
inspired by https://stackoverflow.com/a/73396000/8388869
"""
def __init__(
self, max_workers=None, thread_name_prefix="", initializer=None, initargs=()
):
super().__init__(max_workers, thread_name_prefix, initializer, initargs)
self._running_worker_futures: set[Future] = set()
@property
def available_workers(self) -> int:
"""the number of available workers"""
return self._max_workers - len(self._running_worker_futures)
def wait_for_available_worker(self, timeout: float | None = None) -> None:
"""wait until there is an available worker
Args:
timeout: the maximum time to wait in seconds. If None, wait indefinitely.
Raises:
TimeoutError: if the timeout is reached.
"""
start_time = time.monotonic()
while True:
if self.available_workers > 0:
return
if timeout is not None and time.monotonic() - start_time > timeout:
raise TimeoutError
time.sleep(0.1)
def submit(self, fn, /, *args, **kwargs):
f = super().submit(fn, *args, **kwargs)
self._running_worker_futures.add(f)
f.add_done_callback(self._running_worker_futures.remove)
return f
It should work like that:
with AvailableThreadPoolExecutor(max_workers=CONCURRENCY) as executor:
while True:
executor.wait_for_available_worker()
message = pull_from_queue()
executor.submit(do_work_for_message, message)
If you are done with threads and want to look into processes, then this piece of code here looks very promising and simple, almost the same syntax as threads, but with the multiprocessing module.
When the timeout flag expires the process is terminated, very convenient.
import multiprocessing
def get_page(*args, **kwargs):
# your web page downloading code goes here
def start_get_page(timeout, *args, **kwargs):
p = multiprocessing.Process(target=get_page, args=args, kwargs=kwargs)
p.start()
p.join(timeout)
if p.is_alive():
# stop the downloading 'thread'
p.terminate()
# and then do any post-error processing here
if __name__ == "__main__":
start_get_page(timeout, *args, **kwargs)
In my code I used multiprocessing
import multiprocessing as mp
pool = mp.Pool()
for i in range(threadNumber):
pool.apply_async(publishMessage, args=(map_metrics, connection_parameters...,))
pool.close()
pool.terminate()