Skip to content

Orchestration

Everything so far ran inline. To turn a sequence of steps into a pipeline that a worker can schedule, wrap the steps in tasks and compose them into a job. The data operations are unchanged — orchestration just adds structure and dependency tracking on top.

Define tasks

@task marks an async function as a unit of work. A task can return a plain value or an Object:

@task
async def simple_arithmetic() -> int:
    """A simple task that does basic arithmetic."""
    a = 1
    b = 2
    c = a + b
    print(f"Computing: {a} + {b} = {c}")  # → 3
    return c


@task
async def multiply(x: int, y: int) -> int:
    """A task that takes parameters."""
    result = x * y
    print(f"Computing: {x} * {y} = {result}")  # → 30
    return result


@task
async def create_sales() -> Object:
    """Create a sales dataset and return as Object."""
    sales = await create_object_from_value(
        {
            "product": ["Widget", "Gadget", "Gizmo"],
            "quantity": [10, 5, 8],
            "price": [9.99, 24.99, 14.99],
        }
    )
    count = await sales.count().data()
    print(f"Sales: {count} rows")  # → 3
    return sales

Compose a job

@job defines the workflow. Calling tasks inside it records them; passing one task's result as another's argument creates a dependency:

@job("basic_orchestration")
def basic_pipeline(x: int = 5, y: int = 6):
    """Pipeline with arithmetic tasks and Object data."""
    arith = simple_arithmetic()
    product = multiply(x=x, y=y)
    sales = create_sales()
    return tasks_list(arith, product, sales)

Run it

ajob_test() runs every task locally, in order, which is ideal for developing and debugging a pipeline before handing it to a worker:

pipeline = await basic_pipeline()
print(f"Created job: {pipeline.name} (ID: {pipeline.id})")

await ajob_test(pipeline)
assert pipeline.status == JOB_COMPLETED, f"Expected COMPLETED, got {pipeline.status}"
print(f"Job status: {pipeline.status}")

Where to go next

That completes the tour — you can create Objects, operate on them, aggregate, group, filter, combine, and orchestrate. From here, the User Guide covers each area in depth.

See Also