Skip to content

aaiclick

A data orchestration framework built to make distributed computing easy, with three principles in mind:

  1. Simplicity — Python-native syntax and dynamic task execution.
  2. Performance — Utilizes ClickHouse's powerful distributed engine. Data lives in ClickHouse as columnar tables; Python code orchestrates operations — arithmetic, filtering, aggregation, joins — that execute as ClickHouse queries.
  3. AI Lineage Superpower — Query your data flow. How did this value get here? Why don't we see that value there? Trace lineage across operations and debug pipelines with AI-powered agents.

Local (in-process, zero setup) and distributed (Docker Compose provided) deployments. Runs locally with embedded chdb + SQLite, or scales out with remote ClickHouse + PostgreSQL.

Early stage — looking for early adopters to join the ride and provide feedback.

Orchestration

Define tasks and jobs with decorators — all data operations execute as ClickHouse queries:

from aaiclick import create_object_from_value
from aaiclick.orchestration import job, task

@task
async def load_sales():
    return await create_object_from_value({
        "region": ["US", "EU", "US", "EU", "US"],
        "amount": [500, 300, 150, 200, 80],
    })

@task
async def analyze(sales) -> dict:
    # GROUP BY + SUM runs as a single ClickHouse query; return a plain dict
    by_region = await sales.group_by("region").sum("amount")
    return await by_region.data()  # → {'region': ['US', 'EU'], 'amount': [730, 500]}

@task
async def report(summary: dict):
    # receives the plain Python dict returned by analyze()
    print(f"Regions: {summary['region']}")  # → Regions: ['US', 'EU']
    print(f"Amounts: {summary['amount']}")  # → Amounts: [730, 500]
    print(f"Total:   {sum(summary['amount'])}")  # → Total: 1230

@job("sales_pipeline")
def sales_pipeline():
    sales = load_sales()
    # dependencies resolved from arguments
    summary = analyze(sales=sales)      # returns a Python dict
    return report(summary=summary)      # the dict flows to report()

if __name__ == "__main__":
    from aaiclick.orchestration import job_test
    job_test(sales_pipeline)  # runs all tasks locally for debugging

Data Operation Only Mode

Use data_context() directly for interactive work without orchestration. Decorate an async function to wrap its whole body in a context:

import asyncio
from aaiclick import create_object_from_value
from aaiclick.data.data_context import data_context

@data_context()
async def main():
    prices = await create_object_from_value([10.0, 20.0, 30.0])

    total = prices * 1.1                         # LazyOperator — no DB call yet
    print(await total.data())                    # → [11.0, 22.0, 33.0]
    print(await total.mean().data())             # → 22.0

asyncio.run(main())

data_context() also works as an async with block when you only need part of a function inside the context.

Quick Start

pip install aaiclick
python -m aaiclick setup