Wednesday, August 19, 2020

Ned Batchelder: Do a pile of work

I had a large pile of data to feed through an expensive function. The concurrent.futures module in the Python standard library has always worked well for me as a simple way to farm out work across threads or processes.

For example, if my work function is “workfn”, and it takes tuples of arguments as produced by “argsfn()”, this is how you could run them all:

for args in argsfn():
    workfn(*args)

This is how you would run them on a number of threads:

import concurrent.futures as cf

with cf.ThreadPoolExecutor(max_workers=nthreads) as executor:
    for args in argsfn():
        executor.submit(workfn, *args)

But this will generate all of the arguments up-front. If I have millions of work invocations, this could be a problem. I wanted a way to feed the tasks in as they are processed, to keep the queue small. And I wanted a progress bar.

I started from this Stack Overflow answer, added in tqdm for a progress bar, and made this:

import concurrent.futures as cf
from tqdm import tqdm

def wait_first(futures):
    """
    Wait for the first future to complete.

    Returns:
        (done, not_done): two sets of futures.

    """
    return cf.wait(futures, return_when=cf.FIRST_COMPLETED)

def do_work(nthreads, argsfn, workfn):
    """
    Do a pile of work, maybe in threads, with a progress bar.

    Two callables are provided: `workfn` is the unit of work to be done,
    many times.  Its arguments are provided by calling `argsfn`, which
    must produce a sequence of tuples.  `argsfn` will be called a few
    times, and must produce the same sequence each time.

    Args:
        nthreads: the number of threads to use.
        argsfn: a callable that produces tuples, the arguments to `workfn`.
        workfn: a callable that does work.

    """
    total = sum(1 for _ in argsfn())
    with tqdm(total=total, smoothing=0.1) as progressbar:
        if nthreads:
            limit = 2 * nthreads
            not_done = set()
            with cf.ThreadPoolExecutor(max_workers=nthreads) as executor:
                for args in argsfn():
                    if len(not_done) >= limit:
                        done, not_done = wait_first(not_done)
                        progressbar.update(len(done))
                    not_done.add(executor.submit(workfn, *args))
                while not_done:
                    done, not_done = wait_first(not_done)
                    progressbar.update(len(done))
        else:
            for args in argsfn():
                workfn(*args)
                progressbar.update(1)

There might be a better way. I don’t like the duplication of the wait_first call, but this works, and produces the right results.

BTW: my actual work function spawned subprocesses, which is why a thread pool worked to give me parallelism. A pure-Python work function wouldn’t get a speed-up this way, but a ProcessPoolExecutor could help.



from Planet Python
via read more

No comments:

Post a Comment

TestDriven.io: Working with Static and Media Files in Django

This article looks at how to work with static and media files in a Django project, locally and in production. from Planet Python via read...