Parallel Execution

LensKit uses concurrent.futures.ProcessPoolExecutor to paralellize batch operations (see lenskit.batch).

The basic idea of this API is to create an invoker that has a model and a function, and then passing lists of argument sets to the function:

with invoker(model, func):
    results = list(func.map(args))

The model is persisted into shared memory to be used by the worker processes.

Parallel Model Ops

lenskit.util.parallel.invoker(model, func, n_jobs=None, *, persist_method=None)

Get an appropriate invoker for performing oeprations on model.

Parameters:
  • model (obj) – The model object on which to perform operations.

  • func (function) – The function to call. The function must be pickleable.

  • n_jobs (int or None) – The number of processes to use for parallel operations. If None, will call proc_count() with a maximum default process count of 4.

  • persist_method (str or None) – The persistence method to use. Passed as method to lenskit.sharing.persist().

Returns:

An invoker to perform operations on the model.

Return type:

ModelOpInvoker

lenskit.util.parallel.proc_count(core_div=2, max_default=None, level=0)

Get the number of desired jobs for multiprocessing operations. This does not affect Numba or MKL multithreading.

This count can come from a number of sources:

  • The LK_NUM_PROCS environment variable

  • The number of CPUs, divided by core_div (default 2)

Parameters:
  • core_div (int or None) – The divisor to scale down the number of cores; None to turn off core-based fallback.

  • max_default – The maximum number of processes to use if the environment variable is not configured.

  • level – The process nesting level. 0 is the outermost level of parallelism; subsequent levels control nesting. Levels deeper than 1 are rare, and it isn’t expected that callers actually have an accurate idea of the threading nesting, just that they are configuring a child. If the process count is unconfigured, then level 1 will use core_div, and deeper levels will use 1.

Returns:

The number of jobs desired.

Return type:

int

class lenskit.util.parallel.ModelOpInvoker

Bases: ABC

Interface for invoking operations on a model, possibly in parallel. The operation invoker is configured with a model and a function to apply, and applies that function to the arguments supplied in map. Child process invokers also route logging messages to the parent process, so logging works even with multiprocessing.

An invoker is a context manager that calls shutdown() when exited.

abstract map(*iterables)

Apply the configured function to the model and iterables. This is like map(), except it supplies the invoker’s model as the first object to func.

Parameters:

iterables – Iterables of arguments to provide to the function.

Returns:

An iterable of the results.

Return type:

iterable

Single Process Isolation

We also have a single-process isolation function that runs a function in a subprocess.

lenskit.util.parallel.run_sp(func, *args, **kwargs)

Run a function in a subprocess and return its value. This is for achieving subprocess isolation, not parallelism. The subprocess is configured so things like logging work correctly, and is initialized with a derived random seed.