Skip to content

aaiclick Orchestration Backend Specification

Basic Example

from aaiclick.orchestration import job, task, job_test, TaskResult

@task
async def add(a: int, b: int) -> int:
    return a + b

@task
async def multiply(x: int, y: int) -> int:
    return x * y

@job("pipeline")
def pipeline(x: int, y: int):
    sum_result = add(a=x, b=y)      # no dependency between tasks → run in parallel
    product = multiply(x=x, y=y)
    return TaskResult(tasks=[sum_result, product])

j = pipeline(x=3, y=4)
job_test(j)  # execute synchronously (testing/local)

See aaiclick/orchestration/examples/orchestration_basic.py for a full example.

Decorators

Implementation: aaiclick/orchestration/decorators.py — see TaskFactory and JobFactory

Airflow-style TaskFlow API. Passing a Task result as an argument creates an upstream dependency automatically.

@task

Wraps an async function into a TaskFactory. Parameters: name (default: function name), max_retries (default: 0). On failure with retries remaining, resets the task to PENDING with incremented attempt.

@job

Wraps a workflow function into a JobFactory. Auto-manages orch_context() and commits all tasks to SQL. Use @job("name"), @job(name="name"), or bare @job.

Job testing: job_test(job) and ajob_test(job) execute synchronously (aaiclick/orchestration/execution/debug.py).

Deployment Modes

Two deployment modes, controlled by two independent environment variables:

Aspect Local (default) Distributed
Data backend chdb (embedded ClickHouse) ClickHouse server
SQL backend SQLite via aiosqlite PostgreSQL via asyncpg
SQL URL sqlite+aiosqlite:///~/.aaiclick/local.db postgresql+asyncpg://user:pass@host:5432/database
Setup python -m aaiclick setup Provision servers + python -m aaiclick migrate upgrade head
Task claiming Sequential SELECT + UPDATE Atomic CTE with FOR UPDATE SKIP LOCKED
Table lifecycle LocalLifecycleHandler (background thread) OrchLifecycleHandler (SQL refcounts)
Detection is_chdb() / is_sqlite() return True Both return False

Implementation: aaiclick/backend.py — see get_ch_url(), get_db_url(), is_chdb(), is_sqlite()

DB Models

Implementation: aaiclick/orchestration/models.py

All entities use Snowflake IDs via ClickHouse generateSnowflakeID() — distributed, time-ordered, no DB round-trip. Stored as BIGINT.

Status Enums

Enum Values
JobStatus PENDING, RUNNING, COMPLETED, FAILED, CANCELLED
TaskStatus PENDING, CLAIMED, RUNNING, COMPLETED, FAILED, CANCELLED, PENDING_CLEANUP, UPSTREAM_FAILED
WorkerStatus ACTIVE, IDLE, STOPPING, STOPPED
RunType SCHEDULED, MANUAL

Two TaskStatus values are set by the background sweep, not the worker: PENDING_CLEANUP (transient — a failed or dead-worker run awaiting ref cleanup before it retries to PENDING or settles to FAILED) and UPSTREAM_FAILED (terminal — a pending task whose transitive upstream failed or was cancelled).

Entities

  • RegisteredJob — job catalog entry; fields: id, name (unique), entrypoint, enabled, schedule (cron), default_kwargs (JSON), preservation_mode, next_run_at, created_at, updated_at
  • Job — a named workflow run; fields: id, name, status, run_type, registered_job_id (FK), preservation_mode, created_at, started_at, completed_at, error
  • Task — single executable unit; fields: id, job_id, group_id, entrypoint, kwargs (JSONB), status, result (JSONB), log_path, error, worker_id, run_epoch (fencing token bumped by clear_task), timestamps
  • Group — logical task grouping with optional nesting via parent_group_id; fields: id, job_id, parent_group_id, name, created_at
  • Dependency — composite PK (previous_id, previous_type, next_id, next_type); types are 'task' or 'group'; supports all four combinations
  • Worker — active worker process; fields: id, hostname, pid, status, last_heartbeat, tasks_completed, tasks_failed, started_at

Task Parameter Serialization

Task kwargs and results are stored as JSONB via _serialize_ref() on Object/View — see aaiclick/data/object.py.

Object ref: {"object_type": "object", "table": "t123...", "job_id": 789}

View ref: Adds where, limit, offset, order_by, selected_fields fields.

job_id marks the producing task's job — the background worker skips tables with a non-NULL job_id pin until the job completes. Schema is reconstructed from ClickHouse column comments via _get_table_schema().

Return values: Nonenull; Object/View → serialized ref + job_id; any other value → auto-converted via create_object_from_value(). See aaiclick/orchestration/execution/runner.pyserialize_task_result().

Task Execution

Implementation: aaiclick/orchestration/execution/

Worker Main Loop

Implementation: aaiclick/orchestration/execution/worker.py — see worker_main_loop()

Polls for tasks, executes, updates status. Handles registration, heartbeats (30s), graceful shutdown, and per-task lifecycle creation.

Task Claiming

Implementation: aaiclick/orchestration/execution/claiming.py — see claim_next_task(), pg_handler.py, sqlite_handler.py

Finds the oldest pending task with all dependencies satisfied and atomically claims it. Transitions job PENDING→RUNNING on first claim. PostgreSQL uses FOR UPDATE SKIP LOCKED; SQLite uses sequential SELECT + UPDATE.

Job Management

Implementation: aaiclick/orchestration/jobs/

  • queries.pyget_job(), list_jobs(status, name_like, limit, offset), count_jobs(), get_tasks_for_job()
  • stats.pycompute_job_stats(), print_job_stats()
  • cancel_job(job_id) — atomically cancels a job and all non-terminal tasks; returns True if cancelled, False if not found or already terminal. See execution/claiming.py.
  • clear_task(task_id) — resets a task and all its transitive downstream tasks to PENDING for re-run (Airflow-style "clear task"); leaves upstream tasks and their output tables untouched, and reactivates a terminal job to RUNNING. Each reset bumps the task's run_epoch fence so an in-flight worker's late writes are rejected. See execution/claiming.py.

Workers detect cancellation by polling task status. Known limitation: CPU-bound tasks won't interrupt until they yield.

Registered Jobs

Implementation: aaiclick/orchestration/registered_jobs.py, aaiclick/orchestration/models.py — see RegisteredJob

Catalog of known jobs, separate from individual runs. Each entry stores entrypoint, optional cron schedule, default kwargs, preservation-mode defaults, and enabled flag.

Registration & CRUD

  • register_job(name, entrypoint, schedule, default_kwargs, preservation_mode, enabled) — create a new catalog entry
  • get_registered_job(name) — lookup by name
  • upsert_registered_job(...) — insert or update
  • enable_job(name) / disable_job(name) — toggle enabled, recompute next_run_at
  • list_registered_jobs(enabled_only) — list all or enabled only

Preservation Config Precedence

preservation_mode follows a four-level precedence chain resolved by factories.resolve_job_config():

Level Source Wins when
1 Explicit run_job(...) / create_job(...) argument The caller passes a non-None value
2 RegisteredJob.preservation_mode The registered job carries a default
3 AAICLICK_DEFAULT_PRESERVATION_MODE env var Set in environment
4 PRESERVATION_NONE Hardcoded fallback

None at any level means "inherit from the next level"; an explicit mode terminates the chain.

Scheduled runs inherit the registered job's level-2 defaults automatically. Manual runs via run_job() or the CLI can override at level 1.

run_job

run_job(name, entrypoint, kwargs, preservation_mode) — links to the existing RegisteredJob if one matches name (and merges kwargs over its default_kwargs); otherwise runs standalone with registered_job_id=None. Resolves preservation config via the precedence chain above and creates a Job with run_type=MANUAL plus the entry point Task. See DataContext — Preservation Modes for the two modes' semantics.

Cron Scheduling

Implementation: aaiclick/orchestration/background/background_worker.py — see BackgroundWorker._check_schedules()

BackgroundWorker polls enabled jobs where next_run_at <= NOW() (~10s). Optimistic locking on next_run_at prevents duplicates. Cron parsed by croniter; next_run_at recomputed on registration, enable, and after each run.

CLI

Implementation: aaiclick/orchestration/cli.py, aaiclick/__main__.py

Local Mode (chdb + SQLite)

Single process, no infrastructure required. local start runs the combined REST + MCP server with the background and execution workers wired into the FastAPI lifespan — auto-runs setup if needed.

python -m aaiclick local start [--host HOST] [--port PORT] [--reload]
# Stop with Ctrl+C / SIGTERM — the lifespan tears down workers cleanly.

Distributed Mode (ClickHouse + PostgreSQL)

Independent processes; tasks run in child processes for isolation.

python -m aaiclick worker start [--max-tasks N]
python -m aaiclick worker stop <worker_id>
python -m aaiclick worker list
python -m aaiclick background start

worker start/background start require distributed backends

In local mode, use local start instead.

Common Commands

python -m aaiclick job get <id>
python -m aaiclick job cancel <id>
python -m aaiclick job list [--status RUNNING] [--like "%etl%"] [--limit 20 --offset 40]
python -m aaiclick job enable <name>          # Enable a registered job
python -m aaiclick job disable <name>         # Disable a registered job
python -m aaiclick register-job <entrypoint> [--name NAME] [--schedule "0 8 * * *"] [--kwargs '{"key": "val"}'] [--preservation-mode NONE|FULL]
python -m aaiclick run-job <name> [--kwargs '{"key": "val"}'] [--preservation-mode NONE|FULL]
python -m aaiclick registered-job list        # List registered jobs

Orchestration Operators

Implementation: aaiclick/orchestration/operators.py

Operator Description
map(cbk, obj, partition, args, kwargs) -> Group Partitions Object into Views, creates N _map_part child tasks.
_map_part(cbk, part, out) -> None Applies cbk(row, *args, **kwargs) to each row in a partition View.
reduce(cbk, obj, partition, args, kwargs) -> Group Layered parallel reduction. Each layer reduces partitions into one row.
_expand_reduce(cbk, obj, ...) -> (Object, [Groups]) Pre-allocates all layer Objects and tasks at once.
_reduce_part(cbk, part, layer_obj) -> None Calls cbk(partition, output) — callback writes directly into layer_obj.

reduce()

Layered parallel reduction. Callback receives input partition and pre-allocated output Object; writes via output.insert(). All layers created at once; each layer depends on the previous.

Layer 0  input=N      tasks=⌈N/P⌉   → layer_0_obj
Layer 1  input=⌈N/P⌉  tasks=⌈.../P⌉ → layer_1_obj
…continues until 1 row remains

Empty input raises TypeError("reduce() of empty sequence with no initial value").

Distributed Object Lifecycle

Implementation: aaiclick/orchestration/lifecycle/

In distributed mode, Object table lifecycle is managed through three PostgreSQL tables:

Table PK Purpose
table_context_refs (table_name, context_id) Registry of which tasks created/used a table
table_run_refs (table_name, run_id) Active run references (incref/decref)
table_pin_refs (table_name, task_id) Per-consumer pins protecting result tables

The background worker is the sole cleanup authority.

Worker Process (spawns child per task)
├── OrchLifecycleHandler (per task, per child process)
│   ├── incref → insert into table_context_refs + table_run_refs
│   ├── decref → delete from table_run_refs
│   ├── pin → fan out: insert one pin_ref per downstream consumer
│   └── unpin → delete own pin_ref
├── task_scope exit → decrefs ALL objects, stale-marks
├── BackgroundWorker (sole cleanup authority)
│   ├── DROP where no pin_refs AND no run_refs
│   ├── deletes context_refs + pin_refs alongside dropped tables
│   └── detects dead workers → marks tasks FAILED
└── orch_context() — shared SQL session

Pin Lifecycle

Only at runtime — when a producer task returns an Object — do we know both the table_name and the upstream task_id. The downstream consumer task_ids are discovered via the dependencies table. This is the only point where the table→task mapping exists.

Task A executes, returns Object(table=T)
  ├── PIN fans out via dependencies table:
  │   SELECT next_id FROM dependencies WHERE previous_id = A.id
  │   → inserts pin_ref(T, B.task_id), pin_ref(T, C.task_id)
  └── task_scope exit: decref all → run_refs removed, pin_refs protect T

Task B starts, deserializes T
  ├── incref → run_ref(T, B.run_id)     ← FIFO queue
  └── unpin → delete pin_ref(T, B.task_id)  ← FIFO: after incref

Task C starts, deserializes T
  ├── incref → run_ref(T, C.run_id)
  └── unpin → delete pin_ref(T, C.task_id)

All consumers started → 0 pin_refs, run_refs protect during execution
All consumers finished → 0 pin_refs, 0 run_refs → eligible for cleanup

OrchLifecycleHandler

Implementation: aaiclick/orchestration/orch_context.py — see OrchLifecycleHandler class

Uses task_id as context_id for context_refs. Pin fans out to downstream consumers via dependencies table. Unpin removes the consumer's own pin_ref row.

PostgreSQL tables: TableContextRef — composite PK (table_name, context_id); TableRunRef — composite PK (table_name, run_id); TablePinRef — composite PK (table_name, task_id).

BackgroundWorker

Implementation: aaiclick/orchestration/background/background_worker.py — see BackgroundWorker class

Five operations per poll:

  1. Job cleanup — clear job_id on pin refs for completed/failed/cancelled jobs
  2. Table cleanup — DROP tables where no context_ref has non-NULL job_id AND no run_refs exist
  3. Dead worker detection — expired heartbeats → mark tasks FAILED, workers STOPPED
  4. Job scheduling — create Job runs for registered jobs whose next_run_at is due

Config: poll_interval (default 10s), worker_timeout (default 90s).

View Lifecycle

Views share the underlying ClickHouse table with their source Object. This interacts with table_run_refs in a non-obvious way.

table_run_refs is a set, not a counter. The composite PK (table_name, run_id) means at most one row exists per table per run — INSERT … ON CONFLICT DO NOTHING. A second incref for the same table in the same run is a no-op; the matching decref still deletes the one row.

Views do not own lifecycle refs. _owns_lifecycle_ref = False on every View. Only the source Object that originally called _register() (and actually inserted a row into table_run_refs) issues the corresponding decref. This prevents a View's __del__ from deleting the source's run_ref while the source Object is still alive.

Within-task View lifetime is managed by Python reference counting: View.__init__ stores _source_obj = source, keeping the source Object alive as long as the View is alive. When the View is GC'd the Python ref is released — if the source has no other refs, it is also GC'd and its decref fires normally.

Cross-task View lifetime uses the PIN mechanism — the same as for Objects:

Task A returns View(table=T)
  ├── pin fans out → pin_ref(T, B.task_id)
  └── task_scope exit → decref(T) [run_refs → 0, pin protects]

Task B deserializes View
  ├── fresh source = Object(table=T); source._register() → INCREF(T)
  ├── view = View(source=source)   [_source_obj keeps source alive]
  └── unpin(T)                     [FIFO: after INCREF commits]

The deserialized source Object holds its own _owns_lifecycle_ref = True and issues a normal decref at task_scope exit.

Write-Ahead Incref

create_object() calls incref before CREATE TABLE — crash between the two is harmless (DROP TABLE IF EXISTS).

TableSweeper

Periodic sweeper: lists t* tables in ClickHouse, extracts timestamp from snowflake ID, drops tables older than threshold with no table_context_refs row.

Local Mode

LocalLifecycleHandler wraps TableWorker — immediate DROP on refcount 0, no PostgreSQL. See DataContext.

Operation Provenance (Oplog)

All Object operations within a task are automatically logged when data_context(oplog=...) is active. See docs/oplog.md for the full specification.

Configuration

Implementation: aaiclick/backend.py — see get_root(), is_local()

Variable Default Description
AAICLICK_LOCAL_ROOT ~/.aaiclick Base directory for all local-mode state
AAICLICK_SQL_URL sqlite+aiosqlite:///{root}/local.db SQLAlchemy async URL for orchestration DB
AAICLICK_CH_URL chdb://{root}/chdb_data ClickHouse connection URL for data ops
AAICLICK_LOG_DIR mode-dependent (see below) Task log directory override

is_local() returns True when AAICLICK_CH_URL starts with chdb:// and AAICLICK_SQL_URL starts with sqlite.

Log directory defaults (see aaiclick/orchestration/logging.pyget_logs_dir()):

Mode Default
Local {AAICLICK_LOCAL_ROOT}/logs
Distributed (macOS) ~/.aaiclick/logs
Distributed (Linux) /var/log/aaiclick
  • Setup (local): python -m aaiclick setup
  • Migrations (PostgreSQL): python -m aaiclick migrate upgrade head — see aaiclick/orchestration/migrate.py