lenskit.parallel.ray#

Support for parallelism with Ray.

Functions

ensure_cluster()

inference_worker_cpus()

init_cluster(*[, num_cpus, proc_slots, ...])

Initialize or connect to a Ray cluster, with the LensKit options.

init_worker(*[, autostart])

Initialize a Ray worker process.

is_ray_worker()

Determine whether the current process is running on a Ray worker.

ray_active()

Query whether Ray is active.

ray_available()

Check if Ray is available.

ray_supported()

Check if this Ray setup is supported by LensKit.

training_worker_cpus()

Classes

RayOpInvoker(model, func, *[, limit])

TaskLimiter([limit])

Limit task concurrency using ray.wait().

lenskit.parallel.ray.init_cluster(*, num_cpus=None, proc_slots=None, resources=None, worker_parallel=None, global_logging=False, **kwargs)#

Initialize or connect to a Ray cluster, with the LensKit options.

The resulting cluster can be used by an invoker, or it can be used directly. The Ray invoker uses batching, though, so it only works well with many small tasks.

Parameters:
  • num_cpus (int | None) – The total number of CPUs to allow. Defaults to :fun:`effective_cpu_count`.

  • proc_slots (int | None) – The number of “process slots” for LensKit parallel operations. Defaults to the LensKit process count. These slots are recorded as the lk_process resource on the Ray cluster.

  • resources (dict[str, float] | None) – Additional custom resources to register in the Ray cluster.

  • worker_parallel (ParallelConfig | None) – Parallel processing configuration for worker processes. If None, uses the default.

  • global_logging (bool) – True to wire up logging in the workers at startup, instead of only connecting logs when a task is run.

  • kwargs – Other options to pass to ray.init().

Stability:

Experimental

lenskit.parallel.ray.ray_supported()#

Check if this Ray setup is supported by LensKit.

Return type:

bool

lenskit.parallel.ray.ray_available()#

Check if Ray is available.

Return type:

bool

lenskit.parallel.ray.ray_active()#

Query whether Ray is active.

Return type:

bool

lenskit.parallel.ray.is_ray_worker()#

Determine whether the current process is running on a Ray worker.

Return type:

bool

class lenskit.parallel.ray.RayOpInvoker(model, func, *, limit=None)#

Bases: ModelOpInvoker[A, R], Generic[M, A, R]

Parameters:
  • model (M)

  • func (InvokeOp[M, A, R])

  • limit (int | None)

map(tasks)#

Apply the configured function to the model and iterables. This is like map(), except it supplies the invoker’s model as the first object to func.

Parameters:
  • iterables – Iterables of arguments to provide to the function.

  • tasks (Iterable[A])

Returns:

An iterable of the results.

Return type:

iterable

shutdown()#

Shut down this invoker.

class lenskit.parallel.ray.TaskLimiter(limit=None)#

Bases: object

Limit task concurrency using ray.wait().

This class provides two key operations:

  • Add a task to the limiter with add_task().

  • Wait for tasks until the number of pending tasks is less than the limit.

  • Wait for all tasks with drain.

Parameters:

limit (int) – The maximum number of pending tasks. Defaults to the LensKit process count (see lenskit.parallel.initialize()).

limit: int#

The maximum number of pending tasks.

finished: int#

The number of tasks completed.

property pending: int#

The number of pending tasks.

results_until_limit()#

Iterate over available results until the number of pending results tasks is less than the limit, blocking as needed.

This is a generator, returning the task result references. The iterator will stop when the pending tasks list is under the limit. No guarantee is made on the order of returned results.

Return type:

Generator[ObjectRef, None, None]

wait_for_limit()#

Wait until the pending tasks are back under the limit.

This method calls ray.get() on the result of each pending task to resolve errors, but discards the return value.

drain_results()#

Iterate over all remaining tasks until the pending task list is empty, blocking as needed.

This is a generator, returning the task result references. No guarantee is made on the order of returned results.

Return type:

Generator[ObjectRef, None, None]

drain()#

Wait until all pending tasks are finished.

This method calls ray.get() on the result of each pending task to resolve errors, but discards the return value.

lenskit.parallel.ray.init_worker(*, autostart=True)#

Initialize a Ray worker process. Sets up logging, and returns the context.

Parameters:

autostart (bool) – Set to False to disable calling WorkerContext.start(), for when the caller will start and stop the context if it is new.

Return type:

WorkerContext