pipelines.base module#
- class pipelines.base.ModelPipeline#
Bases:
Pipeline
The base pipeline class to use when iterating over many model instances.
- class Meta#
Bases:
object
- abstract = True#
- model: ClassVar[Model]#
The model class to use when fetching objects from the database
- get_iterator()#
Returns an iterator to run the pipeline over. By default this returns the result of
get_queryset
.
- get_queryset()#
Returns a queryset containing all items for the model provided in the meta. If
model
is not defined on theMeta
class this method must be overridden otherwise anImproperlyConfigured
error will be raised.
- static get_serializable_pipeline_object(obj: Model | None)#
Serializes an django model object so that it can be stored on the pipeline result object.
If
obj
is notNone
the object will be stored as a dictionary containingpk
,app_label
andmodel_name
so that the object can be retrieved from the database.- Parameters:
obj – The object to serialize
- tasks: dict[str, pipelines.tasks.base.Task] = {}#
A dictionary of task property names mapped to the task objects
- class pipelines.base.Pipeline#
Bases:
Registrable
,ClassWithAppConfigMeta
The base pipeline class. All pipelines should be a subclass of this.
- clean_parents(task: Task, reporter: PipelineReporter, runner: PipelineRunner, run_id: str)#
Checks that all tasks have valid parents if they are defined in the pipeline ordering
- Parameters:
task – The task to check
reporter – The reporter to write any messages to
runner – The runner to process the pipeline
run_id – The id of the current pipeline run
- clean_tasks(reporter: PipelineReporter, runner: PipelineRunner, run_id: str) List[Task | None] #
Checks that if ordering is set all tasks have their required parents in the pipeline
- Parameters:
reporter – The reporter to write any messages to
runner – The runner to process the pipeline
run_id – The id of the current pipeline run
- classmethod get_id()#
Generate id based on where the pipeline is created
- classmethod get_iterator()#
Returns a set of objects for multiple pipeline instances to be created for. If no iteration is required,
None
should be returned.
- static get_serializable_pipeline_object(obj)#
Converts the object to a json serializable version to be stored in the db.
- Parameters:
obj – The object to store
- handle_config_error(execution: PipelineExecution, reporter)#
Handles recording a config error meaning a pipeline couldn’t be started. All results objects will be updated to a
CANCELLED
state.- Parameters:
execution – The pipeline execution object representing the pipeline to run
reporter – The reporter to write the error to
- ordering: dict[str, List[str]] | None = None#
The overridden ordering of tasks in the pipeline. If set, tasks will be ran in the order they are defined in the pipeline class. If set to something other than
None
, it should be a dictionary of task property names mapped to lists of parent task property names. For example:ordering = { "b": ["a"], "c": ["b"], }
would cause
b
to be ran aftera
andc
to be ran afterb
Note
If defined, anything not present in the dictionary is assumes to have no dependencies and can be started at any point.
- classmethod postprocess_meta(current_class_meta, resolved_meta_class)#
Collects all tasks and builds the
tasks
dict and sets thepipeline_task
property on all tasks.- Parameters:
current_class_meta – The meta class defined on this pipeline
resolved_meta_class – The new meta class resolved from the current class and each base class
- start(run_id: str, input_data: Dict[str, Any], runner: PipelineRunner, reporter: PipelineReporter) bool #
Starts the pipeline running.
If the runner schedules the pipeline
True
will be returned, otherwiseFalse
will be returned. If this returnsTrue
, The pipeline has been scheduled, this does not make any guarantee about whether the pipeline was successful.- Parameters:
run_id – The id of the current pipeline run
input_data – The data to pass to the pipeline
reporter – The reporter to write any messages to
runner – The runner to process the pipeline
- tasks: dict[str, pipelines.tasks.base.Task] = {}#
A dictionary of task property names mapped to the task objects