Skip to main content
AI/MLjeremylongshore

clari-deploy-integration

'Deploy Clari export pipelines to production with Airflow, Cloud Functions,

Stars
2,267
Source
jeremylongshore/claude-code-plugins-plus-skills
Updated
2026-05-31
Slug
jeremylongshore--claude-code-plugins-plus-skills--clari-deploy-integration
View on GitHubRaw SKILL.md

// install — copy + paste into any project

mkdir -p .claude/skills && curl -fsSL https://raw.githubusercontent.com/jeremylongshore/claude-code-plugins-plus-skills/HEAD/plugins/saas-packs/clari-pack/skills/clari-deploy-integration/SKILL.md -o .claude/skills/clari-deploy-integration.md

Drops the SKILL.md into .claude/skills/clari-deploy-integration.md. Works with Claude Code, Cursor, and any agent that loads SKILL.md files from .claude/skills/.

Clari Deploy Integration

Overview

Deploy Clari export pipelines to production environments: Airflow DAGs, AWS Lambda, or Google Cloud Functions for scheduled, serverless execution.

Instructions

Airflow DAG

# dags/clari_export_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from datetime import datetime, timedelta

def export_clari_forecast(**context):
    from clari_client import ClariClient, ClariConfig

    client = ClariClient(ClariConfig(
        api_key=Variable.get("clari_api_key"),
    ))

    period = context["params"].get("period", "2026_Q1")
    data = client.export_and_download("company_forecast", period)

    entries = data.get("entries", [])
    context["ti"].xcom_push(key="entry_count", value=len(entries))
    # Load to warehouse here

dag = DAG(
    "clari_daily_export",
    schedule_interval="0 6 * * *",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    default_args={"retries": 2, "retry_delay": timedelta(minutes=5)},
)

export_task = PythonOperator(
    task_id="export_forecast",
    python_callable=export_clari_forecast,
    dag=dag,
)

AWS Lambda

# lambda_handler.py
import json
import boto3
from clari_client import ClariClient, ClariConfig

def handler(event, context):
    ssm = boto3.client("ssm")
    api_key = ssm.get_parameter(
        Name="/clari/api-key", WithDecryption=True
    )["Parameter"]["Value"]

    client = ClariClient(ClariConfig(api_key=api_key))
    data = client.export_and_download(
        event.get("forecast_name", "company_forecast"),
        event.get("period", "2026_Q1"),
    )

    return {
        "statusCode": 200,
        "body": json.dumps({"entries": len(data.get("entries", []))}),
    }

Google Cloud Function

# main.py
import functions_framework
from google.cloud import secretmanager
from clari_client import ClariClient, ClariConfig

@functions_framework.http
def clari_export(request):
    sm = secretmanager.SecretManagerServiceClient()
    secret = sm.access_secret_version(name="projects/my-proj/secrets/clari-api-key/versions/latest")
    api_key = secret.payload.data.decode()

    client = ClariClient(ClariConfig(api_key=api_key))
    data = client.export_and_download("company_forecast", "2026_Q1")

    return {"entries": len(data.get("entries", []))}

Error Handling

Issue Cause Solution
Lambda timeout Export takes > 15min Use Step Functions for long jobs
Secret not found Wrong parameter path Verify SSM/Secret Manager path
Airflow task fails Rate limited Add retries with backoff

Resources

Next Steps

For webhook setup, see clari-webhooks-events.