lenskit.parallel.ray#
Support for parallelism with Ray.
Functions
|
|
|
|
|
Initialize or connect to a Ray cluster, with the LensKit options. |
|
Initialize a Ray worker process. |
Determine whether the current process is running on a Ray worker. |
|
Query whether Ray is active. |
|
Check if Ray is available. |
|
Check if this Ray setup is supported by LensKit. |
|
|
Classes
|
|
|
Limit task concurrency using |
- lenskit.parallel.ray.init_cluster(*, num_cpus=None, proc_slots=None, resources=None, worker_parallel=None, global_logging=False, **kwargs)#
Initialize or connect to a Ray cluster, with the LensKit options.
The resulting cluster can be used by an invoker, or it can be used directly. The Ray invoker uses batching, though, so it only works well with many small tasks.
- Parameters:
num_cpus (int | None) – The total number of CPUs to allow. Defaults to :fun:`effective_cpu_count`.
proc_slots (int | None) – The number of “process slots” for LensKit parallel operations. Defaults to the LensKit process count. These slots are recorded as the
lk_process
resource on the Ray cluster.resources (dict[str, float] | None) – Additional custom resources to register in the Ray cluster.
worker_parallel (ParallelConfig | None) – Parallel processing configuration for worker processes. If
None
, uses the default.global_logging (bool) –
True
to wire up logging in the workers at startup, instead of only connecting logs when a task is run.kwargs – Other options to pass to
ray.init()
.
- Stability:
Experimental
- lenskit.parallel.ray.ray_supported()#
Check if this Ray setup is supported by LensKit.
- Return type:
- lenskit.parallel.ray.is_ray_worker()#
Determine whether the current process is running on a Ray worker.
- Return type:
- class lenskit.parallel.ray.RayOpInvoker(model, func, *, limit=None)#
Bases:
ModelOpInvoker
[A
,R
],Generic
[M
,A
,R
]- Parameters:
model (M)
func (InvokeOp[M, A, R])
limit (int | None)
- map(tasks)#
Apply the configured function to the model and iterables. This is like
map()
, except it supplies the invoker’s model as the first object tofunc
.- Parameters:
iterables – Iterables of arguments to provide to the function.
tasks (Iterable[A])
- Returns:
An iterable of the results.
- Return type:
iterable
- shutdown()#
Shut down this invoker.
- class lenskit.parallel.ray.TaskLimiter(limit=None)#
Bases:
object
Limit task concurrency using
ray.wait()
.This class provides two key operations:
Add a task to the limiter with
add_task()
.Wait for tasks until the number of pending tasks is less than the limit.
Wait for all tasks with drain.
- Parameters:
limit (int) – The maximum number of pending tasks. Defaults to the LensKit process count (see
lenskit.parallel.initialize()
).
- results_until_limit()#
Iterate over available results until the number of pending results tasks is less than the limit, blocking as needed.
This is a generator, returning the task result references. The iterator will stop when the pending tasks list is under the limit. No guarantee is made on the order of returned results.
- Return type:
Generator[ObjectRef, None, None]
- wait_for_limit()#
Wait until the pending tasks are back under the limit.
This method calls
ray.get()
on the result of each pending task to resolve errors, but discards the return value.
- drain_results()#
Iterate over all remaining tasks until the pending task list is empty, blocking as needed.
This is a generator, returning the task result references. No guarantee is made on the order of returned results.
- Return type:
Generator[ObjectRef, None, None]
- drain()#
Wait until all pending tasks are finished.
This method calls
ray.get()
on the result of each pending task to resolve errors, but discards the return value.