pipelines.results.base module#

class pipelines.results.base.GetPipelineMixin#

Bases: object

get_pipeline() Pipeline#

Returns the registered pipeline object

class pipelines.results.base.GetTaskMixin#

Bases: object

get_task() Task#

Gets the registered task object

class pipelines.results.base.PipelineDigestItem(total_runs: int = 0, total_success: int = 0, total_failure: int = 0, last_ran: datetime | None = None, average_runtime: float | None = None)#

Bases: object

Data class for storing entries in the pipeline digest

average_runtime: float | None = None#

The average amount of time (in seconds) it takes to run the pipeline

last_ran: datetime | None = None#

The date and time the pipeline was last ran

total_failure: int = 0#

The total number of times the pipeline has failed

total_runs: int = 0#

The total number of times the pipeline has been ran

total_success: int = 0#

The total number of times the pipeline has been successfully ran

class pipelines.results.base.PipelineExecution#

Bases: GetPipelineMixin, PipelineStorageObject

Object to store the overall result of a pipeline run

content_type_name: str = 'PipelineExecution'#

The type name of the results object. This is used throughout the system and is set by the specific base class and should not be changed.

get_pipeline_results: Callable[[], Sequence[PipelineResult]]#
class pipelines.results.base.PipelineResult#

Bases: GetPipelineMixin, PipelineStorageObject

content_type_name: str = 'PipelineResult'#

The type name of the results object. This is used throughout the system and is set by the specific base class and should not be changed.

get_id: Callable[[], Any]#

Returns id of the task execution in the storage

get_pipeline_execution: Callable[[], PipelineExecution]#

Returns the pipeline execution object describing the pipeline as whole

get_pipeline_object: Callable[[], Pipeline]#

Gets the deserialized pipeline object

get_reporter: Callable[[], str]#

Returns a python path to the reporter the pipeline was started with

get_runner: Callable[[], str]#

Returns a python path to the runner the pipeline was started with

get_serializable_pipeline_object: Callable[[], Dict[str, Any]]#

Returns the object this instance of the pipeline was started with

get_task_executions: Callable[[], Sequence[TaskExecution]]#

Returns all the task execution objects for this particular pipeline instance

class pipelines.results.base.PipelineResultsStorage#

Bases: object

build_pipeline_execution(pipeline: Pipeline, run_id: str, runner: PipelineRunner, reporter: PipelineReporter, input_data: Dict[str, Any], build_all=True) PipelineExecution#

Creates a pipeline execution object along with all the pipeline results, task executions and task results.

If build_all is True, the storage class should create:

  • 1 pipeline execution object

  • A pipeline result object per element in the pipeline iterator

  • Per pipeline result, a tasks execution object per task in the pipeline

  • Per task execution object, a task result object per element in the task iterator

If build_all is False, only the pipeline execution should be created. This is used to capture error states when the pipeline is badly configured and the other objects may not be able to be created.

Parameters:
  • pipeline – The Pipeline object to create the results storage for

  • run_id – A UUID for referencing the run

  • runner – The PipelineRunner object that will be used to run the pipeline

  • reporter – The PipelineReporter object that will be used to report status changed

  • input_data – A json serializable dict containing the input parameters

  • build_all – If True, all results objects are create otherwise only the pipeline execution object will be created (to store config errors for example)

cleanup(before: datetime | None = None) Sequence[str]#

Removes all results objects from the storage.

Parameters:

before – If set only objects created before the date will be removed. Otherwise all will be removed.

get_pipeline_digest() Dict[str, PipelineDigestItem]#

Returns the PipelineDigest object providing stats for all registered pipeline classes.

get_pipeline_execution(run_id) PipelineExecution | None#

Fetch a specific pipeline execution from the storage.

If the pipeline execution isn’t found, None will be returned.

Parameters:

run_id – The id of the pipeline run to fetch the execution for

get_pipeline_executions(pipeline_id: str | None = None) Sequence[PipelineExecution]#

Gets all pipeline executions from the storage. If pipeline_id is supplied only executions for the given pipeline id will be returned

Parameters:

pipeline_id – The id of the registered pipeline class

get_pipeline_result(_id) PipelineResult | None#

Fetch a specific pipeline result from the storage.

If the pipeline result isn’t found, None will be returned.

Parameters:

_id – The id of the result to fetch from storage

get_pipeline_results(run_id: str | None = None) Sequence[PipelineResult]#

Gets all pipeline results from the storage. If run_id is supplied only results for that particular run will be returned.

Parameters:

run_id – The id of the run to filter results by

get_task_execution(_id) TaskExecution | None#

Fetch a specific task execution from the storage.

If the pipeline result isn’t found, None will be returned.

Parameters:

_id – The id of the task execution to fetch from storage

get_task_executions(run_id: str | None = None, pipeline_result_id: str | None = None) Sequence[TaskExecution]#

Gets all task executions from the storage.

Parameters:
  • run_id – The id of the run to filter results by

  • pipeline_result_id – The id of the parent pipeline result object to filter results by

get_task_result(_id) TaskResult | None#

Fetch a specific task result from the storage.

If the pipeline result isn’t found, None will be returned.

Parameters:

_id – The id of the task result to fetch from storage

get_task_results(run_id: str | None = None, pipeline_result_id: str | None = None, task_execution_id: str | None = None) Sequence[TaskResult]#

Gets all task results from the storage.

Parameters:
  • run_id – The id of the run to filter results by

  • pipeline_result_id – The id of the grandparent pipeline result object to filter results by

  • task_execution_id – The id of the parent task execution object to filter results by

class pipelines.results.base.PipelineStorageObject#

Bases: object

Base object for all items in the pipeline results storage.

It handles looking resolving un implemented get_ methods by falling back to looking up the attribute on the model and returning that if present. For example, if a method call get_run_id is made, the class will:

  1. First check if get_run_id is implemented, if it is, use it

  2. If it’s not implemented, check if the object has a run_id attribute if so, return a method that returns the attribute value.

  3. If neither are present, raise an AttributeError as normal

set_ methods are also handled in a similar manner:

  1. First check if set_run_id is implemented, if it is, use it

  2. If not, return a method that sets the value of run_id on the object

content_type_name: str#

The type name of the results object. This is used throughout the system and is set by the specific base class and should not be changed.

get_completed: Callable[[], datetime | None]#

Returns the time the pipeline finished (whether successful or not)

get_input_data: Callable[[], Dict[str, Any]]#

Returns the input data for the pipeline run

get_pipeline_id: Callable[[], str]#

Returns the id of the registered pipeline class

get_run_id: Callable[[], str]#

Returns the id of the running pipeline instance

get_started: Callable[[], datetime | None]#

Returns the time the pipeline was started

get_status: Callable[[], PipelineTaskStatus]#

Returns the current status of the pipeline

getter()#

The fallback method to use when the a set_ function isn’t implemented.

method_prop_re = re.compile('^(get|set)_(.*)$')#

Regular expression used to extract property name from the lookup item.

report_status_change(reporter: PipelineReporter, status: PipelineTaskStatus, message='', propagate=True)#

Update the status of the pipeline execution. If the status hasn’t changed or hasn’t moved on (ie not pending -> running or running -> a final state) the message wont be reported.

Parameters:
  • reporter – The reporter to write the message to.

  • status – The new status of the

  • message – Optional message to record with the status change

  • propagate – If True and the object specifies propagation parents the status changed will be passed to the parents

save: Callable[[], Any]#

Commits the current object state to the storage

set_completed: Callable[[datetime], None]#

Sets the time the pipeline finished

set_started: Callable[[datetime], None]#

Sets the time the pipeline was started

setter(name, value, /)#

The fallback method to use when the a get_ function isn’t implemented.

class pipelines.results.base.TaskExecution#

Bases: GetTaskMixin, GetPipelineMixin, PipelineStorageObject

content_type_name: str = 'TaskExecution'#

The type name of the results object. This is used throughout the system and is set by the specific base class and should not be changed.

get_config: Callable[[], Dict[str, Any]]#

Gets the task config

get_id: Callable[[], Any]#

Returns id of the task execution in the storage

get_pipeline_object: Callable[[], Pipeline]#

Gets the deserialized pipeline object

get_pipeline_result: Callable[[], PipelineResult]#

Gets the pipeline result the task execution is linked to

get_pipeline_task: Callable[[], str]#

Gets the name of the task property on the pipeline class

get_serializable_pipeline_object: Callable[[], Dict[str, Any]]#

Returns the object this instance of the pipeline was started with

get_task_id: Callable[[], str]#

Gets the id of the registered task class

get_task_results: Callable[[], Sequence[TaskResult]]#

Gets all the results for this task execution

class pipelines.results.base.TaskResult#

Bases: GetTaskMixin, GetPipelineMixin, PipelineStorageObject

content_type_name: str = 'TaskResult'#

The type name of the results object. This is used throughout the system and is set by the specific base class and should not be changed.

get_config: Callable[[], Dict[str, Any]]#

Gets the task config

get_duration()#

Returns the duration of the task

get_id: Callable[[], Any]#

Returns id of the task result in the storage

get_pipeline_object: Callable[[], Pipeline]#

Gets the deserialized pipeline object

get_pipeline_task: Callable[[], str]#

Gets the name of the task property on the pipeline class

get_serializable_pipeline_object: Callable[[], Dict[str, Any]]#

Returns the object this instance of the pipeline was started with

get_serializable_task_object: Callable[[], Dict[str, Any]]#

Returns the object this instance of the pipeline was started with

get_task_execution: Callable[[], TaskExecution]#

Returns the task execution related to this object

get_task_id: Callable[[], str]#

Gets the id of the registered task class

get_task_object: Callable[[], Task]#

Gets the deserialized task object