Skip to content

ny2ko/vimap

 
 

Repository files navigation

Build Status

Variations on imap, not in C

The vimap package is designed to provide a more flexible alternative for multiprocessing.imap_unordered. (You should read multiprocessing documentation if you haven't already, or else this README won't make sense!) It aspires to support HTTP-like clients processing data, though contains nothing client-specific.

What in particular makes it more flexible?

  • It facilitates custom initialization of processes (one could connect to a HTTP client, etc.), and cleanup.
  • It facilitates configuration passing to processes -- often, with multiprocessing, one ends up [wastefully] tupling configuration values with every input.
  • It facilitates passing non-serializable objects (before processes are forked)
  • Allows timeouts and retrying of failed tasks or processes

What do we aspire to do better than the regular multiprocessing library?

  • You don't have to hack to achieve custom process initialization [ c.f. http://stackoverflow.com/a/10118250/81636 ]
  • The API helps prevent dumb mistakes, like initializing a pool before you've defined relevant functions. [ http://stackoverflow.com/q/2782961/81636 ]
  • Aims to have better worker exception handling -- multiprocessing will leave around dead worker processes; we aim not to.
  • Collection of common use cases (reading from files, etc.)

Other features / design decisions:

  • Attempts to keep all workers busy by pre-emptively enqueuing tasks
  • Also, it probably doesn't affect you, but this library can also work around a bug in Python 2.6.7 [ http://stackoverflow.com/q/16684900/81636 ].

Defining worker functions

vimap provides its custom initialization and such via decorated functions. If your inputs are HTTP requests, and you want to get responses from any of a set of servers, you could express your program as such (using the requests HTTP library -- it's intuitive so you probably don't need to read its documentation),

import vimap.worker_process
@vimap.worker_process.worker
def send_reqests_worker(requests, server):
    s = requests.Session()
    for request in requests:
        yield s.post('http://{0}{1}'.format(server, request.uri), data=request.data)
    s.close()

What is happening? When the worker processes start up, a new session is opened. Each request (some pickleable object containing a .uri and a .data), sent by the parent process, is posted to the server. Then, the worker yields a single response, and this response is sent back to the parent process.

imapping data from the parent process

Let's continue the example,

import vimap.pool
pool = vimap.pool.fork(send_requests_worker.init_args(server=server) for server in my_servers)

This initializes a pool of workers. Each one gets a bound argument server. When the worker processes start up, they start running until they try to pull an element off of the requests iterator; then they must pause for the parent process to send data. The parent process can send data like so,

Request = namedtuple("Request", ["uri", "data"])
pool.imap(Request(**ujson.loads(line)) for line in fileinput.input()).block_ignore_output()

This reads lines from a file containing JSON input, and sends the loaded entries to the workers. In the real world, you'd probably want to make the workers do the JSON loading. The .block_ignore_output() will cause the entire iterable (input file) to be read, and [by default] close the pool after it's done.

variations on imap

The input binder

The first Variation on Imap tuples inputs with outputs. So, you have some [lazy] iterable of inputs,

x1, x2, x3, x4, x5, ...

and when you vimap this with some function f, you get back tuples,

(x1, f(x1)), (x2, f(x2)), ...

possibly not in order. Since it's streaming, one shouldn't need to keep around inputs for long -- the input binder will keep around O(# processes) inputs, hence it's safe to iterate through large inputs. In code, this could look like,

for input, output in pool.imap(iterable).zip_in_out():
    results[input] = output
# do some more processing

Handling exceptions

If you want to gracefully handle exceptions that bubble up to the main function of your worker processes, you can request that vimap yield back to you any exceptions it receives from the workers.

for input, output, typ in pool.imap(iterable).zip_in_out_typ():
    if typ == 'exception':
        print('Worker had an exception:')
        print(output.formatted_traceback)
    elif typ == 'output':
        print('I got some actual output from a worker!')
        print(output)

output will be an ExceptionContext namedtuple if the return type is exception; those contain a value of the exception raised and formatted_traceback string of the traceback that would have been printed to stderr.

About

Variations on imap

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Python 99.9%
  • Emacs Lisp 0.1%