Skip to content Skip to sidebar Skip to footer

How To Add A Pool Of Processes Available For A Multiprocessing Queue

I am following a preceding question here: how to add more items to a multiprocessing queue while script in motion the code I am working with now: import multiprocessing class My

Solution 1:

As a rule, you either use PoolorProcess(es) plus Queues. Mixing both is a misuse; the Pool already uses Queues (or a similar mechanism) behind the scenes.

If you want to do this with a Pool, change your code to (moving code to main function for performance and better resource cleanup than running in global scope):

defmain():
    myfancyclasses = [MyFancyClass('Fancy Dan'), ...] # define your MyFancyClass instances herewith multiprocessing.Pool(processes=4) as p:
        # Submit all the work
        futures = [p.apply_async(fancy.do_something) for fancy in myfancyclasses]

        # Done submitting, let workers exit as they run out of work
        p.close()

        # Wait until all the work is finishedfor f in futures:
            f.wait()

if __name__ == '__main__':
    main()

This could be simplified further at the expense of purity, with the .*map* methods of Pool, e.g. to minimize memory usage redefine main as:

defmain():
    myfancyclasses = [MyFancyClass('Fancy Dan'), ...] # define your MyFancyClass instances herewith multiprocessing.Pool(processes=4) as p:
        # No return value, so we ignore it, but we need to run out the result# or the work won't be donefor _ in p.imap_unordered(MyFancyClass.do_something, myfancyclasses):
            pass

Yes, technically either approach has a slightly higher overhead in terms of needing to serialize the return value you're not using so give it back to the parent process. But in practice, this cost is pretty low (since your function has no return, it's returning None, which serializes to almost nothing). An advantage to this approach is that for printing to the screen, you generally don't want to do it from the child processes (since they'll end up interleaving output), and you can replace the printing with returns to let the parent do the work, e.g.:

import multiprocessing

classMyFancyClass:
    def__init__(self, name):
        self.name = name

    defdo_something(self):
        proc_name = multiprocessing.current_process().name
        # Changed from print to returnreturn'Doing something fancy in {} for {}!'.format(proc_name, self.name)

defmain():
    myfancyclasses = [MyFancyClass('Fancy Dan'), ...] # define your MyFancyClass instances herewith multiprocessing.Pool(processes=4) as p:
        # Using the return value now to avoid interleaved outputfor res in p.imap_unordered(MyFancyClass.do_something, myfancyclasses):
            print(res)

if __name__ == '__main__':
    main()

Note how all of these solutions remove the need to write your own worker function, or manually manage Queues, because Pools do that grunt work for you.


Alternate approach using concurrent.futures to efficiently process results as they become available, while allowing you to choose to submit new work (either based on the results, or based on external information) as you go:

import concurrent.futures

from concurrent.futures import FIRST_COMPLETED

defmain():
    allow_new_work = True# Set to False to indicate we'll no longer allow new work
    myfancyclasses = [MyFancyClass('Fancy Dan'), ...] # define your initial MyFancyClass instances herewith concurrent.futures.ProcessPoolExecutor() as executor:
        remaining_futures = {executor.submit(fancy.do_something)
                             for fancy in myfancyclasses}
        while remaining_futures:
            done, remaining_futures = concurrent.futures.wait(remaining_futures,
                                                              return_when=FIRST_COMPLETED)
            for fut in done:
                result = fut.result()
                # Do stuff with result, maybe submit new work in responseif allow_new_work:
                if should_stop_checking_for_new_work():
                    allow_new_work = False# Let the workers exit when all remaining tasks done,# and reject submitting more work from now on
                    executor.shutdown(wait=False)
                elif has_more_work():
                    # Assumed to return collection of new MyFancyClass instances
                    new_fanciness = get_more_fanciness()
                    remaining_futures |= {executor.submit(fancy.do_something)
                                          for fancy in new_fanciness}
                    myfancyclasses.extend(new_fanciness)

Post a Comment for "How To Add A Pool Of Processes Available For A Multiprocessing Queue"