Tasks#
A task is a unit of work that gets ran by a pipeline. At a minimum the task
must define a run
method, this can be used perform any python code:
class CustomTask(Task):
def run(self, *args, **kwargs):
# some python code
...
A pydantic object can be defined that will clean the config and input data which will clean the task config and the input data and raise errors if the validation fails.
Iterators#
As with pipelines, tasks support being ran on multiple objects. To do so
define a get_iterator
method that returns an iterable. The pipeline
runner will then expand this into multiple instances of the same task passing
each object in to each.
In the following example the task will be started with values “a”, “b” and “c”:
class CustomTask(Task):
...
def get_iterator(self):
...
Accessing Objects#
After setting up iterators for pipelines and tasks you will need to access them
when running your tasks. For this each task has pipeline_object
and
task_object
properties. The pipeline_object
will be the object
passed in from the pipelines iterator and the task_object
will be the
object passed in from the task iterator.
Config Data#
The config type for a task is defined by defining a ConfigType
property
on the task class. This should extend the TaskConfig
from
pipelines.tasks.base
:
from pipelines.tasks.base import TaskConfig, Task
class CustomTask(Task):
class ConfigType(TaskConfig):
a: str
In this example, when adding a task to a pipeline, the task must be instantiated
with a keyword option a
which is a str
, if it is missing an error
will be raised:
from pipelines.base import Pipeline
class CustomPipeline(Pipeline):
# will raise an error because `a` is not supplied
bad = CustomTask()
# success!
good = CustomTask(a="foo")
The config types are pydantic classes and so can do everything that can be done with other pydantic objects.
Warning
The ConfigType
should allow for extra fields to be supplied. This allows
for optional parameters to be passed to the runners when ran. For example the
celery runner allows you to pass a celery_queue
property to tasks to specify
the queue on which the task should be scheduled.
By default any config class extending TaskConfig
should already follow this
behaviour.
Input Data#
Similar to the config type, a task can define an InputType
that extends
pydanytics base model. This is used to
validate the input data provided to the pipeline. Each task should only define the values
required by the task itself.
Warning
By default, pydantic models ignore any extra properties. This convention should be followed
for InputType
so that extra parameters aren’t passed to a task causing an error.
In the following example, when CustomPipeline
is ran with {"a": "foo", "b": "bar"}
“foo” will be passed to the first task as parameter a
and “bar” will be passed to
the second task as parameter b
:
from pydantic import BaseModel
from pipelines.tasks.base import Task
from pipelines.base import Pipeline
class FirstTask(Task)
class InputType(BaseModel):
a: str
def run(self, a=None):
print(a)
class SecondTask(Task)
class InputType(BaseModel):
b: str
def run(self, b=None):
print(b)
class CustomPipeline(Pipeline):
first = FirstTask()
second = SecondTask()