Work Pipelines
Background job processing for analytics aggregation, connector sync, and report generation.
Work Pipelines
Work pipelines handle background jobs in the Services API — analytics aggregation, connector data sync, report generation, and other tasks that shouldn't block API responses.
Pipeline Architecture
The Services API uses dedicated work routers for background-style operations:
Frontend request → Work Router → Background task → Data store → Dashboard readsWork Routers
| Router | Path | Purpose |
|---|---|---|
work_dashboard.py | /api/work/dashboard/ | Dashboard data aggregation |
work_ads.py | /api/work/ads/ | Ad performance aggregation |
work_analytics.py | /api/work/analytics/ | Web analytics data pull |
work_revenue.py | /api/work/revenue/ | Revenue metrics aggregation |
work_email.py | /api/work/email/ | Email campaign analytics |
work_agents.py | /api/work/agents/ | Agent activity aggregation |
work_pipelines.py | /api/work/pipelines/ | Pipeline execution tracking |
work_activity.py | /api/work/activity/ | User activity logging |
Dashboard Aggregation
The unified dashboard pulls data from multiple sources:
@router.get("/api/work/dashboard/aggregate")
async def aggregate_dashboard(user: AuthUser = Depends(get_current_user)):
"""Pull data from all connected sources and aggregate."""
results = await asyncio.gather(
fetch_stripe_metrics(user.user_id), # Revenue
fetch_analytics_summary(user.user_id), # Traffic
fetch_ad_performance(user.user_id), # Ads
fetch_social_metrics(user.user_id), # Social
fetch_email_metrics(user.user_id), # Email
fetch_support_metrics(user.user_id), # Support
return_exceptions=True,
)
return DashboardData(
revenue=results[0],
traffic=results[1],
ads=results[2],
social=results[3],
email=results[4],
support=results[5],
)Each data source is fetched in parallel and gracefully handles failures — if Stripe is down, the revenue panel shows a loading state but other panels still render.
Connector Sync
Connectors periodically sync data from external platforms:
@router.post("/api/work/sync/{provider}")
async def sync_connector(
provider: str,
user: AuthUser = Depends(get_current_user),
):
"""Trigger a data sync for a connected provider."""
token = get_connector_token(user.user_id, provider)
if provider == "shopify":
await sync_shopify_data(token)
elif provider == "stripe":
await sync_stripe_data(token)
elif provider == "meta":
await sync_meta_ads_data(token)
# ...Workers
The workers.py module provides a simple background task runner:
from api.routers.workers import background_task
@background_task
async def process_brand_analysis(user_id: str, url: str):
"""Run brand analysis in the background."""
# Scrape website
# Analyze brand positioning
# Store results in DB
# Notify user when doneReport Generation
Reports are generated asynchronously:
@router.post("/api/work/reports/generate")
async def generate_report(
report_type: str,
period: str = "7d",
user: AuthUser = Depends(get_current_user),
):
"""Generate a comprehensive report."""
# Aggregate data across all sources
# Format into structured report
# Store for download/displayWork pipelines run within the FastAPI process using asyncio. For heavier workloads (video processing, large data imports), consider offloading to a dedicated task queue like Celery or AWS SQS.