Pipelines#
Pipelines are used to define how your offline tasks should be ran. Each can be configured with a set of tasks to run, the order in which they should run as well as an expected configuration and input data format.
To create a pipeline, you must create a piplines.py
file
in your django app containing a class that extends the base
Pipeline
class:
from pipelines.base import Pipeline
class CustomPipeline(Pipeline):
...
Tasks can then be added by, by default they will be ran in the order they are defined in the pipeline:
from pipelines.base import Pipeline
class CustomPipeline(Pipeline):
# this will be ran first
first = SomeTask()
# this will be ran second
second = SomeOtherTask()
Meta#
Each pipeline may supply a custom Meta
class. This is
combined with the Meta
classes from each of the pipelines
base classes to create a merged metaclass accessible from the
_meta
property of the pipeline. To do this add create
a class called Meta
as a child of the pipeline object:
from django.utils.translation import gettext_lazy as _
from pipelines.base import Pipeline
class CustomPipeline(Pipeline):
# this will be ran first
first = SomeTask()
# this will be ran second
second = SomeOtherTask()
class Meta:
name = _("Custom Pipeline")
verbose_name = _("My Detailed Custom Pipeline")
The base pipeline meta class contains the following properties:
abstract
(bool
): When a pipeline is flagged as abstract it is excluded from the internal pipeline registry and so will not appear in any menus and will not be runnable. If not specified, this will beFalse
even if a parent class is flagged as abstract.name
(str
): A short name for the pipeline to appear in menus etc. If not set the name of the pipeline class is used.verbose_name
(str
): A long name for the pipeline to appear in as titles etc. If not set thename
is used.app_label
(str
): The name of the application the pipeline is part of, used when looking up pipelines from the registry. In not set theapp_label
is discovered from the django app registry.
Iterators#
A pipeline can be have a get_iterator method. This will cause the pipeline to be ran for each element in the returned value with the value being passed into each task:
class CustomPipeline(Pipeline):
...
def get_iterator(self):
return [1, 2, 3]
In this example the pipeline will be ran 3 times with the values 1, 2, and 3.
Model Pipelines#
Model pipelines allow you to run a pipeline for all or a subset of entries
for a specific model. Rather than extending the Pipeline
object, the
ModelPipeline
should be used.
There are 2 options for defining the queryset, either:
setting the model in the pipeline meta class, this will run the pipeline for each object in the database:
from pipelines.base import ModelPipeline class CustomPipeline(ModelPipeline): ... class Meta: model = CustomModel
defining a
get_queryset
on the pipeline. Currently this can only be used to define objects that can be passed into the pipeline, more complex queries are not supported eg aggregates and annotations:from pipelines.base import ModelPipeline class CustomPipeline(ModelPipeline): ... def get_queryset(self): return CustomModel.objects.all()
Ordering#
By default, tasks are ran in the order they are defined in the pipeline with the
first needing to be completed before the second is started and so on. This can
be overridden however by supplying an ordering
property. This is a
dictionary where each key is the name of a task property and the values are a
list of property names the task is dependant on.
Note
If an ordering
property is defined, anything not present in the dictionary
is assumes to have no dependencies and can be started at any point.
In the following example there are 4 tasks a
, b
, c
and d
.
An ordering property has been provided but ordering for a
and d
is not
defined so they can be ran at any point at the runners discretion. Task b
must
wait for a
to have finished and task c
must wait for task b
to
have finished but no tasks need to wait for d
to have finished as d
is not
listed as a dependency of any task:
from pipelines.base import Pipeline
...
class CustomPipeline(Pipeline):
a = A()
b = B()
c = C()
d = D()
ordering = {
"b": ["a"],
"c": ["b"],
}