How To Add A Pool Of Processes Available For A Multiprocessing Queue
Solution 1:
As a rule, you either use Pool
orProcess
(es) plus Queue
s. Mixing both is a misuse; the Pool
already uses Queue
s (or a similar mechanism) behind the scenes.
If you want to do this with a Pool
, change your code to (moving code to main
function for performance and better resource cleanup than running in global scope):
defmain():
myfancyclasses = [MyFancyClass('Fancy Dan'), ...] # define your MyFancyClass instances herewith multiprocessing.Pool(processes=4) as p:
# Submit all the work
futures = [p.apply_async(fancy.do_something) for fancy in myfancyclasses]
# Done submitting, let workers exit as they run out of work
p.close()
# Wait until all the work is finishedfor f in futures:
f.wait()
if __name__ == '__main__':
main()
This could be simplified further at the expense of purity, with the .*map*
methods of Pool
, e.g. to minimize memory usage redefine main
as:
defmain():
myfancyclasses = [MyFancyClass('Fancy Dan'), ...] # define your MyFancyClass instances herewith multiprocessing.Pool(processes=4) as p:
# No return value, so we ignore it, but we need to run out the result# or the work won't be donefor _ in p.imap_unordered(MyFancyClass.do_something, myfancyclasses):
pass
Yes, technically either approach has a slightly higher overhead in terms of needing to serialize the return value you're not using so give it back to the parent process. But in practice, this cost is pretty low (since your function has no return
, it's returning None
, which serializes to almost nothing). An advantage to this approach is that for printing to the screen, you generally don't want to do it from the child processes (since they'll end up interleaving output), and you can replace the print
ing with return
s to let the parent do the work, e.g.:
import multiprocessing
classMyFancyClass:
def__init__(self, name):
self.name = name
defdo_something(self):
proc_name = multiprocessing.current_process().name
# Changed from print to returnreturn'Doing something fancy in {} for {}!'.format(proc_name, self.name)
defmain():
myfancyclasses = [MyFancyClass('Fancy Dan'), ...] # define your MyFancyClass instances herewith multiprocessing.Pool(processes=4) as p:
# Using the return value now to avoid interleaved outputfor res in p.imap_unordered(MyFancyClass.do_something, myfancyclasses):
print(res)
if __name__ == '__main__':
main()
Note how all of these solutions remove the need to write your own worker
function, or manually manage Queue
s, because Pool
s do that grunt work for you.
Alternate approach using concurrent.futures
to efficiently process results as they become available, while allowing you to choose to submit new work (either based on the results, or based on external information) as you go:
import concurrent.futures
from concurrent.futures import FIRST_COMPLETED
defmain():
allow_new_work = True# Set to False to indicate we'll no longer allow new work
myfancyclasses = [MyFancyClass('Fancy Dan'), ...] # define your initial MyFancyClass instances herewith concurrent.futures.ProcessPoolExecutor() as executor:
remaining_futures = {executor.submit(fancy.do_something)
for fancy in myfancyclasses}
while remaining_futures:
done, remaining_futures = concurrent.futures.wait(remaining_futures,
return_when=FIRST_COMPLETED)
for fut in done:
result = fut.result()
# Do stuff with result, maybe submit new work in responseif allow_new_work:
if should_stop_checking_for_new_work():
allow_new_work = False# Let the workers exit when all remaining tasks done,# and reject submitting more work from now on
executor.shutdown(wait=False)
elif has_more_work():
# Assumed to return collection of new MyFancyClass instances
new_fanciness = get_more_fanciness()
remaining_futures |= {executor.submit(fancy.do_something)
for fancy in new_fanciness}
myfancyclasses.extend(new_fanciness)
Post a Comment for "How To Add A Pool Of Processes Available For A Multiprocessing Queue"