pipelines.results.base module#
- class pipelines.results.base.GetPipelineMixin#
Bases:
object
- class pipelines.results.base.GetTaskMixin#
Bases:
object
- class pipelines.results.base.PipelineDigestItem(total_runs: int = 0, total_success: int = 0, total_failure: int = 0, last_ran: datetime | None = None, average_runtime: float | None = None)#
Bases:
object
Data class for storing entries in the pipeline digest
- average_runtime: float | None = None#
The average amount of time (in seconds) it takes to run the pipeline
- last_ran: datetime | None = None#
The date and time the pipeline was last ran
- total_failure: int = 0#
The total number of times the pipeline has failed
- total_runs: int = 0#
The total number of times the pipeline has been ran
- total_success: int = 0#
The total number of times the pipeline has been successfully ran
- class pipelines.results.base.PipelineExecution#
Bases:
GetPipelineMixin
,PipelineStorageObject
Object to store the overall result of a pipeline run
- content_type_name: str = 'PipelineExecution'#
The type name of the results object. This is used throughout the system and is set by the specific base class and should not be changed.
- get_pipeline_results: Callable[[], Sequence[PipelineResult]]#
- class pipelines.results.base.PipelineResult#
Bases:
GetPipelineMixin
,PipelineStorageObject
- content_type_name: str = 'PipelineResult'#
The type name of the results object. This is used throughout the system and is set by the specific base class and should not be changed.
- get_id: Callable[[], Any]#
Returns id of the task execution in the storage
- get_pipeline_execution: Callable[[], PipelineExecution]#
Returns the pipeline execution object describing the pipeline as whole
- get_reporter: Callable[[], str]#
Returns a python path to the reporter the pipeline was started with
- get_runner: Callable[[], str]#
Returns a python path to the runner the pipeline was started with
- get_serializable_pipeline_object: Callable[[], Dict[str, Any]]#
Returns the object this instance of the pipeline was started with
- get_task_executions: Callable[[], Sequence[TaskExecution]]#
Returns all the task execution objects for this particular pipeline instance
- class pipelines.results.base.PipelineResultsStorage#
Bases:
object
- build_pipeline_execution(pipeline: Pipeline, run_id: str, runner: PipelineRunner, reporter: PipelineReporter, input_data: Dict[str, Any], build_all=True) PipelineExecution #
Creates a pipeline execution object along with all the pipeline results, task executions and task results.
If build_all is True, the storage class should create:
1 pipeline execution object
A pipeline result object per element in the pipeline iterator
Per pipeline result, a tasks execution object per task in the pipeline
Per task execution object, a task result object per element in the task iterator
If build_all is False, only the pipeline execution should be created. This is used to capture error states when the pipeline is badly configured and the other objects may not be able to be created.
- Parameters:
pipeline – The Pipeline object to create the results storage for
run_id – A UUID for referencing the run
runner – The PipelineRunner object that will be used to run the pipeline
reporter – The PipelineReporter object that will be used to report status changed
input_data – A json serializable dict containing the input parameters
build_all – If True, all results objects are create otherwise only the pipeline execution object will be created (to store config errors for example)
- cleanup(before: datetime | None = None) Sequence[str] #
Removes all results objects from the storage.
- Parameters:
before – If set only objects created before the date will be removed. Otherwise all will be removed.
- get_pipeline_digest() Dict[str, PipelineDigestItem] #
Returns the
PipelineDigest
object providing stats for all registered pipeline classes.
- get_pipeline_execution(run_id) PipelineExecution | None #
Fetch a specific pipeline execution from the storage.
If the pipeline execution isn’t found, None will be returned.
- Parameters:
run_id – The id of the pipeline run to fetch the execution for
- get_pipeline_executions(pipeline_id: str | None = None) Sequence[PipelineExecution] #
Gets all pipeline executions from the storage. If
pipeline_id
is supplied only executions for the given pipeline id will be returned- Parameters:
pipeline_id – The id of the registered pipeline class
- get_pipeline_result(_id) PipelineResult | None #
Fetch a specific pipeline result from the storage.
If the pipeline result isn’t found, None will be returned.
- Parameters:
_id – The id of the result to fetch from storage
- get_pipeline_results(run_id: str | None = None) Sequence[PipelineResult] #
Gets all pipeline results from the storage. If
run_id
is supplied only results for that particular run will be returned.- Parameters:
run_id – The id of the run to filter results by
- get_task_execution(_id) TaskExecution | None #
Fetch a specific task execution from the storage.
If the pipeline result isn’t found, None will be returned.
- Parameters:
_id – The id of the task execution to fetch from storage
- get_task_executions(run_id: str | None = None, pipeline_result_id: str | None = None) Sequence[TaskExecution] #
Gets all task executions from the storage.
- Parameters:
run_id – The id of the run to filter results by
pipeline_result_id – The id of the parent pipeline result object to filter results by
- get_task_result(_id) TaskResult | None #
Fetch a specific task result from the storage.
If the pipeline result isn’t found, None will be returned.
- Parameters:
_id – The id of the task result to fetch from storage
- get_task_results(run_id: str | None = None, pipeline_result_id: str | None = None, task_execution_id: str | None = None) Sequence[TaskResult] #
Gets all task results from the storage.
- Parameters:
run_id – The id of the run to filter results by
pipeline_result_id – The id of the grandparent pipeline result object to filter results by
task_execution_id – The id of the parent task execution object to filter results by
- class pipelines.results.base.PipelineStorageObject#
Bases:
object
Base object for all items in the pipeline results storage.
It handles looking resolving un implemented
get_
methods by falling back to looking up the attribute on the model and returning that if present. For example, if a method callget_run_id
is made, the class will:First check if
get_run_id
is implemented, if it is, use itIf it’s not implemented, check if the object has a
run_id
attribute if so, return a method that returns the attribute value.If neither are present, raise an
AttributeError
as normal
set_
methods are also handled in a similar manner:First check if
set_run_id
is implemented, if it is, use itIf not, return a method that sets the value of
run_id
on the object
- content_type_name: str#
The type name of the results object. This is used throughout the system and is set by the specific base class and should not be changed.
- get_completed: Callable[[], datetime | None]#
Returns the time the pipeline finished (whether successful or not)
- get_input_data: Callable[[], Dict[str, Any]]#
Returns the input data for the pipeline run
- get_pipeline_id: Callable[[], str]#
Returns the id of the registered pipeline class
- get_run_id: Callable[[], str]#
Returns the id of the running pipeline instance
- get_started: Callable[[], datetime | None]#
Returns the time the pipeline was started
- get_status: Callable[[], PipelineTaskStatus]#
Returns the current status of the pipeline
- getter()#
The fallback method to use when the a
set_
function isn’t implemented.
- method_prop_re = re.compile('^(get|set)_(.*)$')#
Regular expression used to extract property name from the lookup item.
- report_status_change(reporter: PipelineReporter, status: PipelineTaskStatus, message='', propagate=True)#
Update the status of the pipeline execution. If the status hasn’t changed or hasn’t moved on (ie not pending -> running or running -> a final state) the message wont be reported.
- Parameters:
reporter – The reporter to write the message to.
status – The new status of the
message – Optional message to record with the status change
propagate – If True and the object specifies propagation parents the status changed will be passed to the parents
- save: Callable[[], Any]#
Commits the current object state to the storage
- set_completed: Callable[[datetime], None]#
Sets the time the pipeline finished
- set_started: Callable[[datetime], None]#
Sets the time the pipeline was started
- setter(name, value, /)#
The fallback method to use when the a
get_
function isn’t implemented.
- class pipelines.results.base.TaskExecution#
Bases:
GetTaskMixin
,GetPipelineMixin
,PipelineStorageObject
- content_type_name: str = 'TaskExecution'#
The type name of the results object. This is used throughout the system and is set by the specific base class and should not be changed.
- get_config: Callable[[], Dict[str, Any]]#
Gets the task config
- get_id: Callable[[], Any]#
Returns id of the task execution in the storage
- get_pipeline_result: Callable[[], PipelineResult]#
Gets the pipeline result the task execution is linked to
- get_pipeline_task: Callable[[], str]#
Gets the name of the task property on the pipeline class
- get_serializable_pipeline_object: Callable[[], Dict[str, Any]]#
Returns the object this instance of the pipeline was started with
- get_task_id: Callable[[], str]#
Gets the id of the registered task class
- get_task_results: Callable[[], Sequence[TaskResult]]#
Gets all the results for this task execution
- class pipelines.results.base.TaskResult#
Bases:
GetTaskMixin
,GetPipelineMixin
,PipelineStorageObject
- content_type_name: str = 'TaskResult'#
The type name of the results object. This is used throughout the system and is set by the specific base class and should not be changed.
- get_config: Callable[[], Dict[str, Any]]#
Gets the task config
- get_duration()#
Returns the duration of the task
- get_id: Callable[[], Any]#
Returns id of the task result in the storage
- get_pipeline_task: Callable[[], str]#
Gets the name of the task property on the pipeline class
- get_serializable_pipeline_object: Callable[[], Dict[str, Any]]#
Returns the object this instance of the pipeline was started with
- get_serializable_task_object: Callable[[], Dict[str, Any]]#
Returns the object this instance of the pipeline was started with
- get_task_execution: Callable[[], TaskExecution]#
Returns the task execution related to this object
- get_task_id: Callable[[], str]#
Gets the id of the registered task class