lenskit.logging.tasks#

Abstraction for recording tasks.

Classes

Task(label, *[, file, parent, reset_hwm, ...])

A task for logging and resource measurement.

TaskStatus(value[, names, module, qualname, ...])

Statuses for task records.

class lenskit.logging.tasks.TaskStatus(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)#

Bases: str, Enum

Statuses for task records.

class lenskit.logging.tasks.Task(label, *, file=None, parent=None, reset_hwm=None, task_id=<factory>, parent_id=None, subprocess=False, status=TaskStatus.PENDING, start_time=None, finish_time=None, duration=None, cpu_time=None, peak_memory=None, peak_gpu_memory=None, subtasks=<factory>, **data)#

Bases: BaseModel

A task for logging and resource measurement.

A task may be top-level (have no parent), or it may be a subtask. By default, new tasks have the current active task as their parent. Tasks are not active until they are started (using a task as a context manager automatically does this, which is the recommended process).

The task-tracking mechanism is currently designed to support large tasks, like training a model or running batch inference; it is not yet designed for fine-grained spans like you would see in OpenTelemetry or Eliot.

Note

The notion of the “active task” does not yet support multi-threaded tasks.

Parameters:
  • file (PathLike[str] | None) – A file to save the task when it is finished.

  • parent (Task | UUID | None) – The parent task. If unspecified, uses the currently-active task.

  • reset_hwm (bool | None) – Whether to reset the system resource high-water-marks at the start of this task. Only effective on Linux, but allows for measurement of the peak memory use of this task specifically. If unspecified, it resets the HWM if there is no parent.

  • label (str)

  • task_id (UUID)

  • parent_id (UUID | None)

  • subprocess (bool)

  • status (TaskStatus)

  • start_time (float | None)

  • finish_time (float | None)

  • duration (float | None)

  • cpu_time (float | None)

  • peak_memory (int | None)

  • peak_gpu_memory (int | None)

  • subtasks (Annotated[list[Annotated[Task, SerializeAsAny()]], BeforeValidator(func=~lenskit.logging.tasks._dict_extract_values, json_schema_input_type=PydanticUndefined)])

  • data (Any)

task_id: UUID#

The task ID.

parent_id: UUID | None#

The parent task ID.

subprocess: bool#

Whether this task is a subprocess of its parent. Subprocess task CPU times are not included in the parent task times.

label: str#

Human-readable task label.

status: TaskStatus#

The task’s current status.

start_time: float | None#

The task start time (UNIX timestamp).

finish_time: float | None#

The task completion time (UNIX timestamp).

duration: float | None#

Task duration in seconds. Measured using time.perf_counter(), so it may disagree slightly with the difference in start and finish times.

cpu_time: float | None#

CPU time consumed in seconds.

peak_memory: int | None#

Peak memory usage (max RSS) in bytes. Only available on Unix; individual task peak memory use is only reliable on Linux (MacOS will report the max memory used since the process was started).

peak_gpu_memory: int | None#

Peak PyTorch GPU memory usage in bytes.

subtasks: Annotated[list[SerializeAsAny[Task]], BeforeValidator(_dict_extract_values)]#

This task’s subtasks.

static current()#

Get the currently-active task.

Return type:

Task | None

static root()#

Get the root task.

Return type:

Task | None

model_config: ClassVar[ConfigDict] = {'extra': 'allow'}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(context, /)#

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Parameters:
  • self (BaseModel) – The BaseModel instance.

  • context (Any) – The context.

Return type:

None

save_to_file(path, monitor=True)#

Save this task to a file, and re-save it when finished.

Parameters:
start()#

Start the task.

finish(status=TaskStatus.FINISHED)#

Finish the task.

Parameters:

status (TaskStatus)

update()#

Update the task’s resource measurements and save the file (if one is set).

monitor_refresh()#

Refresh method called by the monitor backend.

add_subtask(task)#

Add or update a subtask.

Parameters:

task (Task)

update_resources()#

Update the resource measurements. Returns the current measurement.

This method is called by update(), with an exclusive lock held.

Return type:

ResourceMeasurement