lenskit.pipeline.Pipeline#

class lenskit.pipeline.Pipeline(name=None, version=None)#

Bases: object

LensKit recommendation pipeline. This is the core abstraction for using LensKit models and other components to produce recommendations in a useful way. It allows you to wire together components in (mostly) abitrary graphs, train them on data, and serialize pipelines to disk for use elsewhere.

If you have a scoring model and just want to generate recommenations with a default setup and minimal configuration, see topn_pipeline().

Parameters:
  • name (str | None) – A name for the pipeline.

  • version (str | None) – A numeric version for the pipeline.

__init__(name=None, version=None)#
Parameters:
  • name (str | None)

  • version (str | None)

Methods

__init__([name, version])

add_component(name, obj, **inputs)

Add a component and connect it into the graph.

alias(alias, node)

Create an alias for a node.

clone([how])

Clone the pipeline, optionally including trained parameters.

component_configs()

Get the configurations for the components.

config_hash()

Get a hash of the pipeline's configuration to uniquely identify it for logging, version control, or other purposes.

connect(obj, **inputs)

Provide additional input connections for a component that has already been added.

create_input(name, *types)

Create an input node for the pipeline.

from_config(config)

get_config(*[, include_hash])

Get this pipeline's configuration for serialization.

get_default(name)

Get the default wiring for an input name.

literal(value, *[, name])

Create a literal node (a node with a fixed value).

meta(*[, include_hash])

Get the metadata (name, version, hash, etc.) for this pipeline without returning the whole config.

node(node, *[, missing])

Get the pipeline node with the specified name.

replace_component(name, obj, **inputs)

Replace a component in the graph.

run(*nodes, **kwargs)

Run the pipeline and obtain the return value(s) of one or more of its components.

run_all(*nodes, **kwargs)

Run all nodes in the pipeline, or all nodes required to fulfill the requested node, and return a mapping with the full pipeline state (the data attached to each node).

set_default(name, node)

Set the default wiring for a component input.

train(data)

Trains the pipeline's trainable components (those implementing the TrainableComponent interface) on some training data.

use_first_of(name, *nodes)

Create a new node whose value is the first defined (not None) value of the specified nodes.

Attributes

name

nodes

Get the nodes in the pipeline graph.

version

meta(*, include_hash=True)#

Get the metadata (name, version, hash, etc.) for this pipeline without returning the whole config.

Parameters:

include_hash (bool) – Whether to include a configuration hash in the metadata.

Return type:

PipelineMeta

property nodes: list[Node[object]]#

Get the nodes in the pipeline graph.

node(node, *, missing='error')#

Get the pipeline node with the specified name. If passed a node, it returns the node or fails if the node is not a member of the pipeline.

Parameters:
  • node (str | Node[Any]) – The name of the pipeline node to look up, or a node to check for membership.

  • missing (Literal['error', 'none'] | None)

Returns:

The pipeline node, if it exists.

Raises:

KeyError – The specified node does not exist.

Return type:

Node[object] | None

create_input(name, *types)#

Create an input node for the pipeline. Pipelines expect their inputs to be provided when they are run.

Parameters:
  • name (str) – The name of the input. The name must be unique in the pipeline (among both components and inputs).

  • types (type[T] | None) – The allowable types of the input; input data can be of any specified type. If None is among the allowed types, the input can be omitted.

Returns:

A pipeline node representing this input.

Raises:

ValueError – a node with the specified name already exists.

Return type:

Node[T]

literal(value, *, name=None)#

Create a literal node (a node with a fixed value).

Note

Literal nodes cannot be serialized witih get_config() or save_config().

Parameters:
  • value (T)

  • name (str | None)

Return type:

LiteralNode[T]

set_default(name, node)#

Set the default wiring for a component input. Components that declare an input parameter with the specified name but no configured input will be wired to this node.

This is intended to be used for things like wiring up user parameters to semi-automatically receive the target user’s identity and history.

Parameters:
  • name (str) – The name of the parameter to set a default for.

  • node (Node[Any] | object) – The node or literal value to wire to this parameter.

Return type:

None

get_default(name)#

Get the default wiring for an input name.

Parameters:

name (str)

Return type:

Node[Any] | None

alias(alias, node)#

Create an alias for a node. After aliasing, the node can be retrieved from node() using either its original name or its alias.

Parameters:
  • alias (str) – The alias to add to the node.

  • node (Node[Any] | str) – The node (or node name) to alias.

Raises:

ValueError – if the alias is already used as an alias or node name.

Return type:

None

add_component(name, obj, **inputs)#

Add a component and connect it into the graph.

Parameters:
  • name (str) – The name of the component in the pipeline. The name must be unique in the pipeline (among both components and inputs).

  • obj (Callable[[...], ND]) – The component itself.

  • inputs (Node[Any] | object) – The component’s input wiring. See Connections for details.

Returns:

The node representing this component in the pipeline.

Return type:

Node[ND]

replace_component(name, obj, **inputs)#

Replace a component in the graph. The new component must have a type that is compatible with the old component. The old component’s input connections will be replaced (as the new component may have different inputs), but any connections that use the old component to supply an input will use the new component instead.

Parameters:
Return type:

Node[ND]

use_first_of(name, *nodes)#

Create a new node whose value is the first defined (not None) value of the specified nodes. If a node is an input node and its value is not supplied, it is treated as None in this case instead of failing the run. This method is used for things like filling in optional pipeline inputs. For example, if you want the pipeline to take candidate items through an items input, but look them up from the user’s history and the training data if items is not supplied, you would do:

pipe = Pipeline() # allow candidate items to be optionally specified
items = pipe.create_input('items', list[EntityId], None) # find
candidates from the training data (optional) lookup_candidates =
pipe.add_component(
    'select-candidates', UnratedTrainingItemsCandidateSelector(),
    user=history,
) # if the client provided items as a pipeline input, use those;
otherwise # use the candidate selector we just configured.
candidates = pipe.use_first_of('candidates', items,
lookup_candidates)

Note

This method does not distinguish between an input being unspecified and explicitly specified as None.

Note

This method does not implement item-level fallbacks, only fallbacks at the level of entire results. That is, you can use it to use component A as a fallback for B if B returns None, but it will not use B to fill in missing scores for individual items that A did not score. A specific itemwise fallback component is needed for such an operation.

Note

If one of the fallback elements is a component A that depends on another component or input B, and B is missing or returns None such that A would usually fail, then A will be skipped and the fallback will move on to the next node. This works with arbitrarily-deep transitive chains.

Parameters:
  • name (str) – The name of the node.

  • nodes (Node[T | None]) – The nodes to try, in order, to satisfy this node.

Return type:

Node[T]

connect(obj, **inputs)#

Provide additional input connections for a component that has already been added. See Connections for details.

Parameters:
  • obj (str | Node[Any]) – The name or node of the component to wire.

  • inputs (Node[Any] | str | object) – The component’s input wiring. For each keyword argument in the component’s function signature, that argument can be provided here with an input that the pipeline will provide to that argument of the component when the pipeline is run.

component_configs()#

Get the configurations for the components. This is the configurations only, it does not include pipeline inputs or wiring.

Return type:

dict[str, dict[str, Any]]

clone(how='config')#

Clone the pipeline, optionally including trained parameters.

The how parameter controls how the pipeline is cloned, and what is available in the clone pipeline. It can be one of the following values:

"config"

Create fresh component instances using the configurations of the components in this pipeline. When applied to a trained pipeline, the clone does not have the original’s learned parameters. This is the default clone method.

"pipeline-config"

Round-trip the entire pipeline through get_config() and from_config().

Parameters:

how (Literal['config', 'pipeline-config']) – The mechanism to use for cloning the pipeline.

Returns:

A new pipeline with the same components and wiring, but fresh instances created by round-tripping the configuration.

Return type:

Pipeline

get_config(*, include_hash=True)#

Get this pipeline’s configuration for serialization. The configuration consists of all inputs and components along with their configurations and input connections. It can be serialized to disk (in JSON, YAML, or a similar format) to save a pipeline.

The configuration does not include any trained parameter values, although the configuration may include things such as paths to checkpoints to load such parameters, depending on the design of the components in the pipeline.

Note

Literal nodes (from literal(), or literal values wired to inputs) cannot be serialized, and this method will fail if they are present in the pipeline.

Parameters:

include_hash (bool)

Return type:

PipelineConfig

config_hash()#

Get a hash of the pipeline’s configuration to uniquely identify it for logging, version control, or other purposes.

The hash format and algorithm are not guaranteed, but is stable within a LensKit version. For the same version of LensKit and component code, the same configuration will produce the same hash, so long as there are no literal nodes. Literal nodes will usually hash consistently, but since literals other than basic JSON values are hashed by pickling, hash stability depends on the stability of the pickle bytestream.

In LensKit 2024.1, the configuration hash is computed by computing the JSON serialization of the pipeline configuration without a hash and returning the hex-encoded SHA256 hash of that configuration.

Return type:

str

train(data)#

Trains the pipeline’s trainable components (those implementing the TrainableComponent interface) on some training data.

Parameters:

data (Dataset)

Return type:

None

run(*nodes, **kwargs)#

Run the pipeline and obtain the return value(s) of one or more of its components. See Execution for details of the pipeline execution model.

Parameters:
Returns:

The pipeline result. If zero or one nodes are specified, the result is returned as-is. If multiple nodes are specified, their results are returned in a tuple.

Raises:
  • PipelineError – when there is a pipeline configuration error (e.g. a cycle).

  • ValueError – when one or more required inputs are missing.

  • TypeError – when one or more required inputs has an incompatible type.

  • other – exceptions thrown by components are passed through.

Return type:

object

run_all(*nodes, **kwargs)#

Run all nodes in the pipeline, or all nodes required to fulfill the requested node, and return a mapping with the full pipeline state (the data attached to each node). This is useful in cases where client code needs to be able to inspect the data at arbitrary steps of the pipeline. It differs from run() in two ways:

  1. It returns the data from all nodes as a mapping (dictionary-like object), not just the specified nodes as a tuple.

  2. If no nodes are specified, it runs all nodes instead of only the last node. This has the consequence of running nodes that are not required to fulfill the last node (such scenarios typically result from using use_first_of()).

Parameters:
  • nodes (str | Node[Any]) – The nodes to run, as positional arguments (if no nodes are specified, this method runs all nodes).

  • kwargs (object) – The inputs.

Returns:

The full pipeline state, with default set to the last node specified (either the last node in nodes, or the last node added to the pipeline).

Return type:

PipelineState