Orchestration¶
Everything so far ran inline. To turn a sequence of steps into a pipeline that a worker can schedule, wrap the steps in tasks and compose them into a job. The data operations are unchanged — orchestration just adds structure and dependency tracking on top.
Define tasks¶
@task marks an async function as a unit of work. A task can return a plain
value or an Object:
@task
async def simple_arithmetic() -> int:
"""A simple task that does basic arithmetic."""
a = 1
b = 2
c = a + b
print(f"Computing: {a} + {b} = {c}") # → 3
return c
@task
async def multiply(x: int, y: int) -> int:
"""A task that takes parameters."""
result = x * y
print(f"Computing: {x} * {y} = {result}") # → 30
return result
@task
async def create_sales() -> Object:
"""Create a sales dataset and return as Object."""
sales = await create_object_from_value(
{
"product": ["Widget", "Gadget", "Gizmo"],
"quantity": [10, 5, 8],
"price": [9.99, 24.99, 14.99],
}
)
count = await sales.count().data()
print(f"Sales: {count} rows") # → 3
return sales
Compose a job¶
@job defines the workflow. Calling tasks inside it records them; passing one
task's result as another's argument creates a dependency:
@job("basic_orchestration")
def basic_pipeline(x: int = 5, y: int = 6):
"""Pipeline with arithmetic tasks and Object data."""
arith = simple_arithmetic()
product = multiply(x=x, y=y)
sales = create_sales()
return tasks_list(arith, product, sales)
Run it¶
ajob_test() runs every task locally, in order, which is ideal for developing
and debugging a pipeline before handing it to a worker:
pipeline = await basic_pipeline()
print(f"Created job: {pipeline.name} (ID: {pipeline.id})")
await ajob_test(pipeline)
assert pipeline.status == JOB_COMPLETED, f"Expected COMPLETED, got {pipeline.status}"
print(f"Job status: {pipeline.status}")
Where to go next¶
That completes the tour — you can create Objects, operate on them, aggregate, group, filter, combine, and orchestrate. From here, the User Guide covers each area in depth.
See Also¶
- Orchestration —
@task/@job, workers, and scheduling - Examples: Orchestration Basics — the complete runnable script
- Examples: Orchestration Dynamic — dynamic task generation