Skip to main content

implementing-warehouse-sources

Implement and extend PostHog Data warehouse import sources. Use when adding a new source under posthog/temporal/data_imports/sources, adding datasets/endpoints to an existing source, or adding incremental sync, resumable imports, webhook ingestion, pagination, credentials validation, and source tests.

Stars
34,779
Source
PostHog/posthog
Updated
2026-05-31
Slug
PostHog--posthog--implementing-warehouse-sources
View on GitHubRaw SKILL.md

// install — copy + paste into any project

mkdir -p .claude/skills && curl -fsSL https://raw.githubusercontent.com/PostHog/posthog/HEAD/.agents/skills/implementing-warehouse-sources/SKILL.md -o .claude/skills/implementing-warehouse-sources.md

Drops the SKILL.md into .claude/skills/implementing-warehouse-sources.md. Works with Claude Code, Cursor, and any agent that loads SKILL.md files from .claude/skills/.

Implementing Data warehouse sources

Use this skill when building or updating Data warehouse sources in posthog/temporal/data_imports/sources/.

Read first

Before coding, read:

  • posthog/temporal/data_imports/sources/source.template (use the top-of-file TODOs as a starting reference, but verify target files against the current source implementations — the template can drift, e.g. it currently still points at the old posthog/warehouse/types.py path instead of products/data_warehouse/backend/types.py)
  • posthog/temporal/data_imports/sources/README.md
  • posthog/temporal/data_imports/sources/SOURCES.md — inventory of every registered source with its communication method (HTTP / vendor SDK / gRPC / DB protocol / webhook) and tracked-transport state. Skim this first to see how similar sources are wired and what state today's source you're touching is in. Keep it in sync — see "Updating SOURCES.md" below.
  • posthog/temporal/data_imports/sources/common/base.py — base classes (SimpleSource, ResumableSource, WebhookSource) and the FieldType union
  • posthog/temporal/data_imports/sources/common/resumable.pyResumableSourceManager
  • posthog/temporal/data_imports/sources/common/webhook_s3.pyWebhookSourceManager
  • 1 API source with settings.py + transport logic (e.g. klaviyo, github). For dependent-resource fan-out (parent→child with type: "resolve"), also read posthog/temporal/data_imports/sources/common/rest_source/__init__.py and config_setup.py (e.g. process_parent_data_item, make_parent_key_name).
  • For webhook-capable sources, read posthog/temporal/data_imports/sources/stripe/source.py as the reference implementation.

Picking the right base class

Every new source must inherit from one (or a combination) of these:

  • SimpleSource[Config] — default for straightforward pull-based APIs where each run fully iterates the endpoint.
  • ResumableSource[Config, ResumableData]preferred for any new API-backed source whose underlying API supports resumption (cursor/link-header pagination, time windows, offset tokens, or any other deterministic way to pick back up where we left off). If the API gives us a next-page token, a Link header, or a stable time filter, use ResumableSource. This lets Temporal resume after heartbeat timeouts without restarting from scratch. The manager persists state to Redis (24h TTL).
  • WebhookSource[Config] — only when the source can push events to us (e.g. Stripe webhook endpoints). Typically combined with ResumableSource so the initial backfill is resumable and subsequent deltas come via webhook.

Combine by multiple inheritance when both apply, e.g.:

class StripeSource(
    ResumableSource[StripeSourceConfig, StripeResumeConfig],
    WebhookSource[StripeSourceConfig],
    OAuthMixin,
):
    ...

Rule of thumb:

  • Pull-only API, no cursor we can persist → SimpleSource.
  • Pull-only API with any cursor/next-page/time-filter we can save between runs → ResumableSource.
  • Source can call us back with change events → add WebhookSource on top of whichever pull base fits.

Databases and file-transfer sources (SFTP, S3) stay on SimpleSource unless there's a clear reason otherwise.

End-to-end workflow for a new API source

Follow this order. Each step maps to TODOs in source.template.

  1. Survey the source. Pick the endpoints a user will actually want. Cross-reference:

  2. Bootstrap the source. Copy the template and wire up the enum/type references:

    mkdir -p posthog/temporal/data_imports/sources/{SOURCE_NAME}
    cp posthog/temporal/data_imports/sources/source.template posthog/temporal/data_imports/sources/{SOURCE_NAME}/source.py
    

    Then update the two hand-edited files (the template still lists posthog/schema.py too, but that file is regenerated by pnpm run schema:build in step 12 — don't maintain it by hand):

    • ExternalDataSourceType at products/data_warehouse/backend/types.py — follow the existing convention in that file: ALL_CAPS with no underscores between words (e.g. ACTIVECAMPAIGN, APPLESEARCHADS), value is PascalCase
    • externalDataSources at frontend/src/queries/schema/schema-general.ts (lower-kebab-case)
  3. Pick the base class (see above) and rename the class / source_type return.

  4. Define get_source_config — name, label, caption, docsUrl, iconPath, fields. Use appropriate field types (see below).

  5. Register the source — add an import line to posthog/temporal/data_imports/sources/__init__.py and include it in __all__. (The @SourceRegistry.register decorator on the class handles runtime registration.)

  6. Run the config generator: pnpm run generate:source-configs. Confirm the new config class appears in posthog/temporal/data_imports/sources/generated_configs.py. Do not edit that file by hand. Every time you change get_source_config.fields, re-run the generator.

  7. Swap the generic Config type in source.py for the generated {Source}SourceConfig class.

  8. Implement: validate_credentials, get_schemas, source_for_pipeline (plus get_resumable_source_manager / get_webhook_source_manager as needed).

  9. Split transport logic. Put API client, paginator, row normalization, and SourceResponse assembly in {source}.py. Keep endpoint catalog/incremental fields/primary keys/partition defaults in settings.py.

  10. Add icon. Place at frontend/public/services/{source}.svg (prefer SVG). If the logo isn't already committed, fetch from Logo.devask the user for the Logo.dev API key; do not hardcode one. Keep file size reasonable.

  11. Run migrations. DEBUG=1 python manage.py makemigrations && DEBUG=1 ./bin/migrate (only needed if a new enum value triggers a Django migration).

  12. Rebuild schema types: pnpm run schema:build. This updates posthog/schema.py from schema-general.ts and makes the source appear in frontend dropdowns. Re-run whenever schema-general.ts changes.

  13. Release status. For unfinished work, set unreleasedSource=True. Set releaseStatus="alpha" for new sources that haven't been extensively tested, releaseStatus="beta" once most rough edges are ironed out, and leave releaseStatus unset for general availability. For controlled rollout, set featureFlag="dwh-{source_name}" (kebab-case). When fully releasing, remove unreleasedSource, set releaseStatus to the appropriate stage (or omit for GA), and optionally drop the feature flag.

  14. Delete the template TODO comments before PR.

Source architecture contract

For API-backed sources, use this split:

  • source.py: source registration, source form fields, schema list, credential validation, resumable/webhook manager wiring, pipeline handoff.
  • settings.py: endpoint catalog, incremental fields, primary key, partition defaults.
  • {source}.py: API client/auth, paginator, request params, row normalization, and SourceResponse.

This keeps endpoint behavior declarative and easy to extend.

For REST sources that mix top-level and fan-out endpoints, keep endpoint metadata in settings.py and route in {source}.py with this priority:

  1. endpoint-specific custom iterators (only when required),
  2. generic fan-out helper path,
  3. top-level endpoint path.

Source fields (the form the user fills in)

Defined in get_source_config.fields. All field types live in posthog/schema.py and are unioned as FieldType in posthog/temporal/data_imports/sources/common/base.py.

  • SourceFieldInputConfig — basic input (text, email, number, password, textarea). Rendered as <LemonInput />.
  • SourceFieldSwitchGroupConfig — toggle that reveals a sub-group of fields. Use for optional feature blocks.
  • SourceFieldSelectConfig — dropdown. Options can carry sub-fields shown when selected (use for alternative auth methods — e.g. API key vs OAuth).
  • SourceFieldOauthConfig — OAuth via Integration model. See OAuth section.
  • SourceFieldFileUploadConfig — file upload (JSON). Use keys=["..."] allow-list or "*".
  • SourceFieldSSHTunnelConfig — renders SSH tunnel sub-fields; adds ssh_tunnel: SSHTunnel to the config with helpers.

Guidelines:

  • Multiple auth methods → SourceFieldSelectConfig with child fields per option.
  • Optional toggles → SourceFieldSwitchGroupConfig.
  • Confidential fields must use SourceFieldInputConfigType.PASSWORD. The serializer derives sensitive vs nonsensitive keys automatically from the field definitions — you do not need to maintain an allow-list elsewhere.

Implementing source_for_pipeline

Return a SourceResponse directly. Do not use dlt_source_to_source_response for new sources — DLT is being removed.

Prefer yielding data in the shape the API returns it. No custom dataclasses, no heavy parsing. Yield either dict, list[dict] (preferred when possible), or a pyarrow.Table. The pipeline buffers and batches for you.

Don't import or instantiate Batcher at the source layer. The pipeline already runs one (pipelines/pipeline/pipeline.py) at the same 5000-row / 200 MiB thresholds. Yielding raw dict / list[dict] from your generator is the canonical path — reach for pyarrow.Table only when you already have arrow-shaped data (e.g., a ClickHouse adapter). Source-level batching results in double-buffering with no behavioral win.

For pyarrow tables, cap in-memory rows at ~200 MiB or ~5000 rows. Use helpers like table_from_iterator() / table_from_py_list() from posthog/temporal/data_imports/pipelines/pipeline/utils.py.

URL construction: use urllib.parse.urlencode for query strings. Don't use requests.Request(...).prepare().urlPreparedRequest.url is typed Optional[str] and the typical workaround (prepared.url or f"...") carries an unreachable fallback. urlencode is shorter, dependency-free, and produces identical output for ASCII-safe params.

Resumable source pattern

@dataclasses.dataclass
class MyResumeConfig:
    next_url: str  # or cursor, offset, time window — whatever the API uses

class MySource(ResumableSource[MySourceConfig, MyResumeConfig]):
    def get_resumable_source_manager(self, inputs: SourceInputs) -> ResumableSourceManager[MyResumeConfig]:
        return ResumableSourceManager[MyResumeConfig](inputs, MyResumeConfig)

    def source_for_pipeline(
        self,
        config: MySourceConfig,
        resumable_source_manager: ResumableSourceManager[MyResumeConfig],
        inputs: SourceInputs,
    ) -> SourceResponse:
        return my_source(..., resumable_source_manager=resumable_source_manager)

In the transport function:

resume = manager.load_state() if manager.can_resume() else None
url = resume.next_url if resume else initial_url

while True:
    data = fetch_page(url)
    # yield batch
    next_url = data.get("links", {}).get("next")
    if not next_url:
        break
    manager.save_state(MyResumeConfig(next_url=next_url))
    url = next_url  # advance before the next fetch, otherwise we loop on the same page

Save state after yielding each batch, not before — so if we crash we re-yield the last batch (merge dedupes on primary key) rather than skipping it.

Webhook source pattern

  • Implement webhook_template returning a HogFunctionTemplateDC that transforms incoming webhook payloads.
  • Implement webhook_resource_map mapping our schema name → external object type.
  • Implement create_webhook, delete_webhook, get_external_webhook_info if the API allows programmatic webhook management. Otherwise return a failed result and provide a webhookSetupCaption explaining manual setup.
  • Add webhookFields to SourceConfig for post-setup inputs (e.g. signing secret).
  • In source_for_pipeline, call self.get_webhook_source_manager(inputs) and pass its iterator alongside the pull iterator so a single sync pulls historical + webhook-delivered rows.
  • Populate SourceSchema.supports_webhooks=True only for endpoints where webhooks are actually viable (usually incremental/append-only ones).

Outbound HTTP must go through the tracked transport

Every HTTP call from posthog/temporal/data_imports/sources/** must go through make_tracked_session() (from posthog.temporal.data_imports.sources.common.http). The tracked session attaches team_id, source_type, external_data_source_id, external_data_schema_id, and external_data_job_id to every outbound request's log line and OTel metric, and participates in opt-in sample capture.

  • For raw requests usage: make_tracked_session(headers=..., retry=...) returns a requests.Session. Use session.get/post/... instead of the module-level requests.get/... shortcuts.
  • For sources that already go through rest_source.RESTClient: it defaults to a tracked session automatically; no change needed.
  • For vendor SDKs that accept a session/HTTP-client hook (Stripe RequestsClient(session=...), gspread authorize(credentials, session=...), BigQuery via AuthorizedSession + TrackedHTTPAdapter), inject one. Reference patterns live in stripe/stripe.py, google_sheets/google_sheets.py, and bigquery/bigquery.py.
  • For vendor SDKs with no injection seam (today: bingads, linkedin-api's RestliClient, anything pure-gRPC), add a # nosemgrep: data-imports-http-transport-... pragma with a one-line reason and record the source as ⚠️ Vendor SDK in SOURCES.md.

CI enforces this via .semgrep/rules/data-imports-http-transport.yaml. The rule bans direct requests.Session(), requests.<verb>(...), and httpx.Client/AsyncClient/<verb> inside sources/**. Type-only imports (from requests import Response, from requests.exceptions import HTTPError) remain allowed.

Updating SOURCES.md

posthog/temporal/data_imports/sources/SOURCES.md is the inventory of every registered source, its communication method, and whether its outbound traffic is tracked. Update it as part of the same PR whenever you:

  • Add a new source — initially as a Scaffolded entry; move it into the Implemented table once you ship working sync logic.
  • Implement a previously scaffolded source — move the row into the Implemented table and fill in comm method, primary library, and tracked-transport state.
  • Migrate a vendor SDK to inject a tracked session — flip the source from ⚠️ Vendor SDK to .
  • Switch a source's protocol — e.g. swap REST for gRPC, add webhook support alongside the pull API, or move from requests to a vendor SDK. Update both the comm method and tracked-transport columns.

Keep the entries alphabetical within each table. If you add a partially-tracked source, also append a short "Notes on partially-tracked sources" entry explaining what blocks tracking (no SDK seam, gRPC, etc.).

Required coding conventions

  • Register with @SourceRegistry.register.
  • Inherit SimpleSource[GeneratedConfig] unless resumable/webhook behavior is required.
  • API sources should usually return table_format="delta" in endpoint resources.
  • primary_keys are endpoint-specific (declare in settings.py, not always id). Use composite keys when no single field is unique.
  • Add partitioning for new sources where possible:
    • API sources: partition_mode="datetime" with a stable datetime field.
    • Database sources: partition_count and partition_size.
  • Pick a partition key that does not changecreated_at, dateCreated, firstSeen. Never use updated_at or lastSeen.
  • Add get_non_retryable_errors() for known permanent failures (401/403, invalid/expired credentials, missing scopes).
  • Keep comments minimal and only when intent is not obvious.
  • Python imports at the top of the module, not inside functions (unless needed to break circular imports).

Incremental sync guidance

  • Only set supports_incremental=True when the API exposes a server-side timestamp filter (<field>_gte, since, modified_after, etc.). A "client-side cursor" that fetches every page and skips already-seen rows in Python is not incremental — every run still hits every page, so the API cost of an "incremental" sync ends up identical to a full refresh. If the API has no server filter, ship full refresh only.
  • If the API supports server-side time filtering, use it and map from db_incremental_field_last_value.
  • Honor inputs.incremental_field — that's the user's chosen cursor field from the schema settings. INCREMENTAL_FIELDS per-endpoint is the menu of advertised options; don't reach into INCREMENTAL_FIELDS[endpoint][0] to pick a default and silently override the user's selection.
  • Per-endpoint sort enums vary. Don't hardcode ?sorting=created_at (or whatever) globally. Verify each list endpoint's allowed sort values against the API spec and with a curl smoke-test against the live API — APIs frequently document one set of options and silently reject another, or use a different timestamp column on certain resources.
  • Pass ?sorting= explicitly on a stable monotonic field when paginating. For incremental sources, the request sort must match SourceResponse.sort_mode ("asc" typically; "desc" only when forced by the API — see stripe/stripe.py, github/settings.py) so the pipeline's cursor watermark advances correctly. For full-refresh sources, an explicit sort prevents page-boundary skips/duplicates if the API's implicit default is unstable or shifts as rows are inserted during the sync.
  • If the API only supports cursor pagination, still declare incremental fields if reliable and let merge semantics dedupe.
  • sort_mode="desc" only if the endpoint truly cannot return ascending. For descending sources, handle db_incremental_field_earliest_value to scroll earlier rows before newer ones (see Stripe).
  • Default unknown endpoints to full refresh first; enable incremental only after confirming a stable filter field and API ordering semantics.
  • Confirm partition keys against response schemas, not endpoint names.

API behavior verification checklist

Before finalizing endpoint logic, verify from docs and with curl against the live API (not just docs — APIs frequently silently ignore unknown params or document outdated enums):

  • Response shape: list vs object vs wrapped data ({"data": [...]}).
  • Pagination: Link header vs body cursor vs offset/page; how next-page termination is signaled.
  • Ordering guarantees: ascending/descending/undefined for time fields, and the API's default sort if you don't pass one.
  • Sort enum per endpoint: which sorting= values does each list endpoint accept? Some APIs vary the allowed enum per resource. Confirm with curl that the value you intend to pass returns 200, and probe with a future-date cutoff to confirm whether timestamp filters are honored or silently ignored.
  • Server-side timestamp filter: does <field>_gte / since / modified_after actually filter, or does the API accept it and ignore it? Test by passing a future date and checking whether the row drops out.
  • Rate-limit headers (window reset timestamp, concurrent limits).
  • Field stability: whether candidate incremental/partition fields can change over time.

If undocumented, keep parsing/merge logic conservative and add a short code comment noting the uncertainty.

Endpoint inventory workflow

  • Build an endpoint inventory before expanding coverage (path, auth scopes, grain, pagination style, primary key shape, incremental candidates).
  • Keep it in source-local docs (e.g. posthog/temporal/data_imports/sources/<source>/api_inventory.md).
  • Add endpoints in phases: org-level list endpoints → project-level fan-out → child/fan-out endpoints with bounded pagination.

Top-level endpoints (org/account level)

  • Declare endpoint metadata in settings.py (path, primary_key, incremental_fields, partition_key, sort_mode).
  • Build through a single resource config helper; keep transport branches minimal.
  • Endpoint params stay declarative (limit, required filters).
  • Merge write disposition only when incremental semantics are reliable; otherwise full replace.

Pagination tips

  • Some APIs use cursor pagination in Link headers — check both rel="next" and any results flag.
  • When following a full cursor URL from response headers, clear request params in paginator update_request to avoid duplicate query params.
  • For parent/child fan-out, keep hard page caps per parent resource to avoid unbounded scans.
  • Emit structured logs when page caps are reached (include resource name and parent identifiers).

Retry and throttling strategy

  • Use tenacity instead of manual retry loops.
  • Retry transport failures and retryable status codes (429, transient 5xx).
  • Prefer server-provided rate-limit reset headers on 429; fall back to exponential backoff.
  • Bound and make deterministic (stop_after_attempt). Preserve clear terminal behavior.
  • Keep timeout/retry settings near the top of the module for easy tuning.

Fan-out endpoints

Fan-out = iterate a parent resource, then query child endpoints per parent.

Prefer dependent resources for single-hop fan-out. Use rest_api_resources with a parent and child that declares type: "resolve" for the parent field. Shared infra (rest_source/__init__.py, config_setup.process_parent_data_item) paginates the parent and calls the child per parent row. Use include_from_parent so child rows carry parent fields (injected as _<parent>_<field> via make_parent_key_name).

Make fan-out declarative. Add a fan-out config object in settings.py (e.g. DependentEndpointConfig) with parent_name, resolve_param, resolve_field, include_from_parent, optional parent field renames, and optional parent endpoint params. Route single-hop fan-out through a shared helper (e.g. common/rest_source/fanout.py:build_dependent_resource).

Parent field rename mapping belongs in the helper. Callers should not branch on whether renames exist.

Per-endpoint pagination/selectorsbuild_dependent_resource supports endpoint overrides (parent_endpoint_extra, child_endpoint_extra for paginator / data_selector, page_size_param for non-limit size params).

Path pre-formatting: process_parent_data_item only does str.format() with the resolved param. Pre-format static placeholders with .replace() before passing to the resource config, so only the resolved placeholder remains.

Custom iterator only when fan-out is 2+ levels deep. Reuse the same pagination/retry helpers as elsewhere.

OAuth configuration

Before implementing OAuth, check if the integration already exists — search posthog/models/integration.py loosely for the service name before concluding it's new.

If new:

  1. Env vars. Add to posthog/settings/integrations.py:

    YOUR_SOURCE_CLIENT_ID = get_from_env("YOUR_SOURCE_CLIENT_ID", "")
    YOUR_SOURCE_CLIENT_SECRET = get_from_env("YOUR_SOURCE_CLIENT_SECRET", "")
    
  2. Integration kind. In posthog/models/integration.py:

    • Add to IntegrationKind enum.
    • Add to OauthIntegration.supported_kinds.
    • Add an elif kind == "your-source": return OauthConfig(...) branch in oauth_config_for_kind().
  3. Redirect URI: https://localhost:8010/integrations/your-kind/callback in the external service.

  4. List any new env vars in the final handoff so they can be set in all environments.

Non-retryable errors

Override get_non_retryable_errors() to mark errors that should permanently fail instead of retrying:

def get_non_retryable_errors(self) -> dict[str, str | None]:
    return {
        "401 Client Error: Unauthorized for url: https://api.example.com": "Your API key is invalid or expired. Please generate a new key and reconnect.",
        "403 Client Error: Forbidden for url: https://api.example.com": "Your API key does not have the required permissions. Please check the key permissions and try again.",
    }

Common cases: 401 Unauthorized, 403 Forbidden, invalid/expired tokens, OAuth tokens needing re-auth.

validate_credentials

Called with schema_name=None at source-create (one cheap probe to confirm the token is genuine) and with schema_name=<name> from the per-schema incremental_fields action (confirm scope for that specific endpoint).

If the API distinguishes 401 (bad token) from 403 (valid token, missing scope), accept 403 at source-create — users may legitimately only grant scopes for the endpoints they want to sync. Re-raise 403 only when schema_name is set. Sync-time 403s are handled separately by get_non_retryable_errors().

Document required token scopes

If the API issues OAuth scopes or per-resource access tokens, declare every scope the source actually calls so users know what to grant — don't make them grant the full set defensively.

  • OAuth sources: set requiredScopes on SourceFieldOauthConfig (space-separated string, matches the OAuth scope parameter format). The frontend diffs it against the integration's granted scopes and warns the user with a Reconnect action when any are missing.
  • Non-OAuth sources (PAT, API key): there's no integration object to inspect, so list scopes in the caption instead. Captions render through LemonMarkdown, so backticks, bold, and links work.

Mixins

From posthog/temporal/data_imports/sources/common/mixins.py:

  • SSHTunnelMixinwith_ssh_tunnel() context plus make_ssh_tunnel_func() for deferred tunnel opening.
  • OAuthMixinget_oauth_integration() to pull Integration from the DB.
  • ValidateDatabaseHostMixinis_database_host_valid() to block internal VPC IPs (unless SSH tunnel is used).

Icons

  • Prefer SVG over PNG. Keep file size reasonable.
  • Place in frontend/public/services/ and reference as /static/services/{name}.svg in iconPath.
  • If the source logo isn't already in the project, pull via Logo.dev. Ask the user for the API key — do not hardcode one. If the user hasn't provided one, surface that as a blocker rather than committing a placeholder.

Testing expectations

Add at least two test modules:

  • tests/test_<source>_source.py (source-class level):
    • source_type
    • get_source_config fields and labels
    • get_schemas outputs
    • validate_credentials success/failure
    • source_for_pipeline argument plumbing
    • for resumable sources: get_resumable_source_manager returns a manager bound to the right data class
    • for webhook sources: create_webhook / delete_webhook / get_external_webhook_info behavior, webhook_resource_map correctness, webhook_template presence
  • tests/test_<source>.py (transport level):
    • paginator behavior from response headers/body
    • resource generation for incremental vs non-incremental
    • endpoint-specific primary key mapping
    • credential validation status mapping
    • mapper/filter helpers if present
    • fan-out endpoint row format assertions (dict shape + parent identifiers)
    • for dependent-resource fan-out: mock rest_api_resources, pass rows with _<parent>_<field> keys to exercise parent-field injection and rename behavior
    • expected return schema checks for each declared endpoint in settings.py
    • for resumable sources: resume-from-saved-state path (manager returns state, transport uses it as starting point); state is saved after each batch

Prefer behavior tests over config-shape tests. Avoid brittle assertions on internal config dict structure unless they protect a known regression that cannot be asserted via output behavior.

Use parameterized tests for status codes and edge cases. Lean toward over-covering.

Implementation checklist

Bootstrapping:
- [ ] Enum added to products/data_warehouse/backend/types.py (ALL_CAPS, no underscores between words)
- [ ] Entry added to frontend/src/queries/schema/schema-general.ts (kebab-case) — `pnpm run schema:build` regenerates posthog/schema.py from this; don't hand-edit posthog/schema.py
- [ ] Source imported in posthog/temporal/data_imports/sources/__init__.py + __all__
- [ ] Class inherits from SimpleSource / ResumableSource / WebhookSource (or combo) — see "Picking the right base class"

Source implementation:
- [ ] Define source fields in get_source_config
- [ ] Implement validate_credentials
- [ ] Implement get_schemas
- [ ] Add endpoint settings (settings.py)
- [ ] Implement transport + paginator ({source}.py)
- [ ] Return SourceResponse with correct primary_keys, partitioning, sort_mode
- [ ] Implement get_resumable_source_manager if ResumableSource
- [ ] Implement webhook methods if WebhookSource
- [ ] Add get_non_retryable_errors for auth/permission errors

Tooling & assets:
- [ ] Icon in frontend/public/services/ (SVG preferred — ask user for Logo.dev key if needed)
- [ ] Run `pnpm run generate:source-configs`
- [ ] Swap generic Config for generated {Source}SourceConfig in source.py
- [ ] Run `pnpm run schema:build`
- [ ] Django migrations run if enum value requires it

Release status:
- [ ] unreleasedSource=True while WIP
- [ ] releaseStatus="alpha" for new sources not yet extensively tested
- [ ] releaseStatus="beta" when most rough edges have been ironed out
- [ ] Omit releaseStatus (or set to "ga") on full release
- [ ] featureFlag="dwh-{source_name}" for controlled rollout
- [ ] Flag removed / unreleasedSource removed on full release

Tests & handoff:
- [ ] Source tests (test_<source>_source.py)
- [ ] Transport tests (test_<source>.py)
- [ ] `ruff check . --fix` and `ruff format .`
- [ ] List any new env vars (OAuth client IDs/secrets, etc) in the PR / handoff

Validation and generation workflow

After changing source fields, re-run pnpm run generate:source-configs and pnpm run schema:build, then the targeted tests for the new source. Run ruff check . --fix and ruff format . on modified Python files.

Common pitfalls

  • Source not visible in wizard: not registered/imported in sources/__init__.py, or schema:build not rerun.
  • Generated config class still empty: forgot generate:source-configs after updating fields.
  • Incremental sync misbehaving: wrong field name/type or wrong sort assumptions.
  • Endless retries for bad credentials: missing get_non_retryable_errors.
  • Resumable state never saved: forgot to call save_state after yielding a batch; or saved before yield and a crash causes data loss.
  • Webhook rows not landing: feature flag warehouse-source-webhooks disabled, or schema is_webhook=False, or initial_sync_complete=False.
  • Dependent resource path KeyError: pre-format static path placeholders (see Fan-out).
  • Silent truncation risk: page caps hit without logs/metrics.
  • Drift from refactors: unused function params/helpers left behind after endpoint behavior changes.
  • Type drift in endpoint config dicts: use source typing aliases (Endpoint, ClientConfig, IncrementalConfig) to keep static checks precise.
  • Partition key instability: picked updated_at instead of created_at; partitions rewrite on every sync.
  • Hardcoded Logo.dev key committed: always ask the user for the key at runtime.