Reeve
Services API

Work Pipelines

Background job processing for analytics aggregation, connector sync, and report generation.

Work Pipelines

Work pipelines handle background jobs in the Services API — analytics aggregation, connector data sync, report generation, and other tasks that shouldn't block API responses.

Pipeline Architecture

The Services API uses dedicated work routers for background-style operations:

Frontend request → Work Router → Background task → Data store → Dashboard reads

Work Routers

RouterPathPurpose
work_dashboard.py/api/work/dashboard/Dashboard data aggregation
work_ads.py/api/work/ads/Ad performance aggregation
work_analytics.py/api/work/analytics/Web analytics data pull
work_revenue.py/api/work/revenue/Revenue metrics aggregation
work_email.py/api/work/email/Email campaign analytics
work_agents.py/api/work/agents/Agent activity aggregation
work_pipelines.py/api/work/pipelines/Pipeline execution tracking
work_activity.py/api/work/activity/User activity logging

Dashboard Aggregation

The unified dashboard pulls data from multiple sources:

@router.get("/api/work/dashboard/aggregate")
async def aggregate_dashboard(user: AuthUser = Depends(get_current_user)):
    """Pull data from all connected sources and aggregate."""
    
    results = await asyncio.gather(
        fetch_stripe_metrics(user.user_id),      # Revenue
        fetch_analytics_summary(user.user_id),    # Traffic
        fetch_ad_performance(user.user_id),       # Ads
        fetch_social_metrics(user.user_id),       # Social
        fetch_email_metrics(user.user_id),        # Email
        fetch_support_metrics(user.user_id),      # Support
        return_exceptions=True,
    )
    
    return DashboardData(
        revenue=results[0],
        traffic=results[1],
        ads=results[2],
        social=results[3],
        email=results[4],
        support=results[5],
    )

Each data source is fetched in parallel and gracefully handles failures — if Stripe is down, the revenue panel shows a loading state but other panels still render.

Connector Sync

Connectors periodically sync data from external platforms:

@router.post("/api/work/sync/{provider}")
async def sync_connector(
    provider: str,
    user: AuthUser = Depends(get_current_user),
):
    """Trigger a data sync for a connected provider."""
    token = get_connector_token(user.user_id, provider)
    
    if provider == "shopify":
        await sync_shopify_data(token)
    elif provider == "stripe":
        await sync_stripe_data(token)
    elif provider == "meta":
        await sync_meta_ads_data(token)
    # ...

Workers

The workers.py module provides a simple background task runner:

from api.routers.workers import background_task

@background_task
async def process_brand_analysis(user_id: str, url: str):
    """Run brand analysis in the background."""
    # Scrape website
    # Analyze brand positioning
    # Store results in DB
    # Notify user when done

Report Generation

Reports are generated asynchronously:

@router.post("/api/work/reports/generate")
async def generate_report(
    report_type: str,
    period: str = "7d",
    user: AuthUser = Depends(get_current_user),
):
    """Generate a comprehensive report."""
    # Aggregate data across all sources
    # Format into structured report
    # Store for download/display

Work pipelines run within the FastAPI process using asyncio. For heavier workloads (video processing, large data imports), consider offloading to a dedicated task queue like Celery or AWS SQS.

On this page