Tasks#

A task is a unit of work that gets ran by a pipeline. At a minimum the task must define a run method, this can be used perform any python code:

class CustomTask(Task):
    def run(self, *args, **kwargs):
        # some python code
        ...

A pydantic object can be defined that will clean the config and input data which will clean the task config and the input data and raise errors if the validation fails.

Iterators#

As with pipelines, tasks support being ran on multiple objects. To do so define a get_iterator method that returns an iterable. The pipeline runner will then expand this into multiple instances of the same task passing each object in to each.

In the following example the task will be started with values “a”, “b” and “c”:

class CustomTask(Task):
    ...

    def get_iterator(self):
        ...

Accessing Objects#

After setting up iterators for pipelines and tasks you will need to access them when running your tasks. For this each task has pipeline_object and task_object properties. The pipeline_object will be the object passed in from the pipelines iterator and the task_object will be the object passed in from the task iterator.

Config Data#

The config type for a task is defined by defining a ConfigType property on the task class. This should extend the TaskConfig from pipelines.tasks.base:

from pipelines.tasks.base import TaskConfig, Task


class CustomTask(Task):
    class ConfigType(TaskConfig):
        a: str

In this example, when adding a task to a pipeline, the task must be instantiated with a keyword option a which is a str, if it is missing an error will be raised:

from pipelines.base import Pipeline

class CustomPipeline(Pipeline):
    # will raise an error because `a` is not supplied
    bad = CustomTask()

    # success!
    good = CustomTask(a="foo")

The config types are pydantic classes and so can do everything that can be done with other pydantic objects.

Warning

The ConfigType should allow for extra fields to be supplied. This allows for optional parameters to be passed to the runners when ran. For example the celery runner allows you to pass a celery_queue property to tasks to specify the queue on which the task should be scheduled.

By default any config class extending TaskConfig should already follow this behaviour.

Input Data#

Similar to the config type, a task can define an InputType that extends pydanytics base model. This is used to validate the input data provided to the pipeline. Each task should only define the values required by the task itself.

Warning

By default, pydantic models ignore any extra properties. This convention should be followed for InputType so that extra parameters aren’t passed to a task causing an error.

In the following example, when CustomPipeline is ran with {"a": "foo", "b": "bar"} “foo” will be passed to the first task as parameter a and “bar” will be passed to the second task as parameter b:

from pydantic import BaseModel

from pipelines.tasks.base import Task
from pipelines.base import Pipeline

class FirstTask(Task)
    class InputType(BaseModel):
        a: str

    def run(self, a=None):
        print(a)

class SecondTask(Task)
    class InputType(BaseModel):
        b: str

    def run(self, b=None):
        print(b)


class CustomPipeline(Pipeline):
    first = FirstTask()
    second = SecondTask()