aaiclick¶
A data orchestration framework built to make distributed computing easy, with three principles in mind:
- Simplicity — Python-native syntax and dynamic task execution.
- Performance — Utilizes ClickHouse's powerful distributed engine. Data lives in ClickHouse as columnar tables; Python code orchestrates operations — arithmetic, filtering, aggregation, joins — that execute as ClickHouse queries.
- AI Lineage Superpower — Query your data flow. How did this value get here? Why don't we see that value there? Trace lineage across operations and debug pipelines with AI-powered agents.
Local (in-process, zero setup) and distributed (Docker Compose provided) deployments. Runs locally with embedded chdb + SQLite, or scales out with remote ClickHouse + PostgreSQL.
Early stage — looking for early adopters to join the ride and provide feedback.
Orchestration¶
Define tasks and jobs with decorators — all data operations execute as ClickHouse queries:
from aaiclick import create_object_from_value
from aaiclick.orchestration import job, task
@task
async def load_sales():
return await create_object_from_value({
"region": ["US", "EU", "US", "EU", "US"],
"amount": [500, 300, 150, 200, 80],
})
@task
async def analyze(sales) -> dict:
# GROUP BY + SUM runs as a single ClickHouse query; return a plain dict
by_region = await sales.group_by("region").sum("amount")
return await by_region.data() # → {'region': ['US', 'EU'], 'amount': [730, 500]}
@task
async def report(summary: dict):
# receives the plain Python dict returned by analyze()
print(f"Regions: {summary['region']}") # → Regions: ['US', 'EU']
print(f"Amounts: {summary['amount']}") # → Amounts: [730, 500]
print(f"Total: {sum(summary['amount'])}") # → Total: 1230
@job("sales_pipeline")
def sales_pipeline():
sales = load_sales()
# dependencies resolved from arguments
summary = analyze(sales=sales) # returns a Python dict
return report(summary=summary) # the dict flows to report()
if __name__ == "__main__":
from aaiclick.orchestration import job_test
job_test(sales_pipeline) # runs all tasks locally for debugging
Data Operation Only Mode¶
Use data_context() directly for interactive work without orchestration.
Decorate an async function to wrap its whole body in a context:
import asyncio
from aaiclick import create_object_from_value
from aaiclick.data.data_context import data_context
@data_context()
async def main():
prices = await create_object_from_value([10.0, 20.0, 30.0])
total = prices * 1.1 # LazyOperator — no DB call yet
print(await total.data()) # → [11.0, 22.0, 33.0]
print(await total.mean().data()) # → 22.0
asyncio.run(main())
data_context() also works as an async with block when you only need part of
a function inside the context.
Quick Start¶
- Getting Started — installation, setup, environment variables
- Object API — operators, aggregations, views, group by
- Orchestration —
@taskand@jobdecorators, workers - Examples — runnable scripts for every feature