pipelines.base module#

class pipelines.base.ModelPipeline#

Bases: Pipeline

The base pipeline class to use when iterating over many model instances.

class Meta#

Bases: object

abstract = True#
model: ClassVar[Model]#

The model class to use when fetching objects from the database

get_iterator()#

Returns an iterator to run the pipeline over. By default this returns the result of get_queryset.

get_queryset()#

Returns a queryset containing all items for the model provided in the meta. If model is not defined on the Meta class this method must be overridden otherwise an ImproperlyConfigured error will be raised.

static get_serializable_pipeline_object(obj: Model | None)#

Serializes an django model object so that it can be stored on the pipeline result object.

If obj is not None the object will be stored as a dictionary containing pk, app_label and model_name so that the object can be retrieved from the database.

Parameters:

obj – The object to serialize

tasks: dict[str, pipelines.tasks.base.Task] = {}#

A dictionary of task property names mapped to the task objects

class pipelines.base.Pipeline#

Bases: Registrable, ClassWithAppConfigMeta

The base pipeline class. All pipelines should be a subclass of this.

clean_parents(task: Task, reporter: PipelineReporter, runner: PipelineRunner, run_id: str)#

Checks that all tasks have valid parents if they are defined in the pipeline ordering

Parameters:
  • task – The task to check

  • reporter – The reporter to write any messages to

  • runner – The runner to process the pipeline

  • run_id – The id of the current pipeline run

clean_tasks(reporter: PipelineReporter, runner: PipelineRunner, run_id: str) List[Task | None]#

Checks that if ordering is set all tasks have their required parents in the pipeline

Parameters:
  • reporter – The reporter to write any messages to

  • runner – The runner to process the pipeline

  • run_id – The id of the current pipeline run

classmethod get_id()#

Generate id based on where the pipeline is created

classmethod get_iterator()#

Returns a set of objects for multiple pipeline instances to be created for. If no iteration is required, None should be returned.

static get_serializable_pipeline_object(obj)#

Converts the object to a json serializable version to be stored in the db.

Parameters:

obj – The object to store

handle_config_error(execution: PipelineExecution, reporter)#

Handles recording a config error meaning a pipeline couldn’t be started. All results objects will be updated to a CANCELLED state.

Parameters:
  • execution – The pipeline execution object representing the pipeline to run

  • reporter – The reporter to write the error to

ordering: dict[str, List[str]] | None = None#

The overridden ordering of tasks in the pipeline. If set, tasks will be ran in the order they are defined in the pipeline class. If set to something other than None, it should be a dictionary of task property names mapped to lists of parent task property names. For example:

ordering = {
    "b": ["a"],
    "c": ["b"],
}

would cause b to be ran after a and c to be ran after b

Note

If defined, anything not present in the dictionary is assumes to have no dependencies and can be started at any point.

classmethod postprocess_meta(current_class_meta, resolved_meta_class)#

Collects all tasks and builds the tasks dict and sets the pipeline_task property on all tasks.

Parameters:
  • current_class_meta – The meta class defined on this pipeline

  • resolved_meta_class – The new meta class resolved from the current class and each base class

start(run_id: str, input_data: Dict[str, Any], runner: PipelineRunner, reporter: PipelineReporter) bool#

Starts the pipeline running.

If the runner schedules the pipeline True will be returned, otherwise False will be returned. If this returns True, The pipeline has been scheduled, this does not make any guarantee about whether the pipeline was successful.

Parameters:
  • run_id – The id of the current pipeline run

  • input_data – The data to pass to the pipeline

  • reporter – The reporter to write any messages to

  • runner – The runner to process the pipeline

tasks: dict[str, pipelines.tasks.base.Task] = {}#

A dictionary of task property names mapped to the task objects