Implementing Data warehouse sources
Use this skill when building or updating Data warehouse sources in posthog/temporal/data_imports/sources/.
Read first
Before coding, read:
posthog/temporal/data_imports/sources/source.template(use the top-of-file TODOs as a starting reference, but verify target files against the current source implementations — the template can drift, e.g. it currently still points at the oldposthog/warehouse/types.pypath instead ofproducts/data_warehouse/backend/types.py)posthog/temporal/data_imports/sources/README.mdposthog/temporal/data_imports/sources/SOURCES.md— inventory of every registered source with its communication method (HTTP / vendor SDK / gRPC / DB protocol / webhook) and tracked-transport state. Skim this first to see how similar sources are wired and what state today's source you're touching is in. Keep it in sync — see "Updating SOURCES.md" below.posthog/temporal/data_imports/sources/common/base.py— base classes (SimpleSource,ResumableSource,WebhookSource) and theFieldTypeunionposthog/temporal/data_imports/sources/common/resumable.py—ResumableSourceManagerposthog/temporal/data_imports/sources/common/webhook_s3.py—WebhookSourceManager- 1 API source with
settings.py+ transport logic (e.g. klaviyo, github). For dependent-resource fan-out (parent→child withtype: "resolve"), also readposthog/temporal/data_imports/sources/common/rest_source/__init__.pyandconfig_setup.py(e.g.process_parent_data_item,make_parent_key_name). - For webhook-capable sources, read
posthog/temporal/data_imports/sources/stripe/source.pyas the reference implementation.
Picking the right base class
Every new source must inherit from one (or a combination) of these:
SimpleSource[Config]— default for straightforward pull-based APIs where each run fully iterates the endpoint.ResumableSource[Config, ResumableData]— preferred for any new API-backed source whose underlying API supports resumption (cursor/link-header pagination, time windows, offset tokens, or any other deterministic way to pick back up where we left off). If the API gives us a next-page token, aLinkheader, or a stable time filter, useResumableSource. This lets Temporal resume after heartbeat timeouts without restarting from scratch. The manager persists state to Redis (24h TTL).WebhookSource[Config]— only when the source can push events to us (e.g. Stripe webhook endpoints). Typically combined withResumableSourceso the initial backfill is resumable and subsequent deltas come via webhook.
Combine by multiple inheritance when both apply, e.g.:
class StripeSource(
ResumableSource[StripeSourceConfig, StripeResumeConfig],
WebhookSource[StripeSourceConfig],
OAuthMixin,
):
...
Rule of thumb:
- Pull-only API, no cursor we can persist →
SimpleSource. - Pull-only API with any cursor/next-page/time-filter we can save between runs →
ResumableSource. - Source can call us back with change events → add
WebhookSourceon top of whichever pull base fits.
Databases and file-transfer sources (SFTP, S3) stay on SimpleSource unless there's a clear reason otherwise.
End-to-end workflow for a new API source
Follow this order. Each step maps to TODOs in source.template.
Survey the source. Pick the endpoints a user will actually want. Cross-reference:
- Airbyte: https://airbyte.com/connectors (connector pages often link to source code — useful reference)
- Fivetran: https://www.fivetran.com/connectors
- Stitch: https://www.stitchdata.com/docs/integrations/ Find the official API docs or OpenAPI spec. Make sure it's the current version, not a deprecated one.
Bootstrap the source. Copy the template and wire up the enum/type references:
mkdir -p posthog/temporal/data_imports/sources/{SOURCE_NAME} cp posthog/temporal/data_imports/sources/source.template posthog/temporal/data_imports/sources/{SOURCE_NAME}/source.pyThen update the two hand-edited files (the template still lists
posthog/schema.pytoo, but that file is regenerated bypnpm run schema:buildin step 12 — don't maintain it by hand):ExternalDataSourceTypeatproducts/data_warehouse/backend/types.py— follow the existing convention in that file:ALL_CAPSwith no underscores between words (e.g.ACTIVECAMPAIGN,APPLESEARCHADS), value isPascalCaseexternalDataSourcesatfrontend/src/queries/schema/schema-general.ts(lower-kebab-case)
Pick the base class (see above) and rename the class /
source_typereturn.Define
get_source_config— name, label, caption, docsUrl, iconPath, fields. Use appropriate field types (see below).Register the source — add an import line to
posthog/temporal/data_imports/sources/__init__.pyand include it in__all__. (The@SourceRegistry.registerdecorator on the class handles runtime registration.)Run the config generator:
pnpm run generate:source-configs. Confirm the new config class appears inposthog/temporal/data_imports/sources/generated_configs.py. Do not edit that file by hand. Every time you changeget_source_config.fields, re-run the generator.Swap the generic
Configtype insource.pyfor the generated{Source}SourceConfigclass.Implement:
validate_credentials,get_schemas,source_for_pipeline(plusget_resumable_source_manager/get_webhook_source_manageras needed).Split transport logic. Put API client, paginator, row normalization, and
SourceResponseassembly in{source}.py. Keep endpoint catalog/incremental fields/primary keys/partition defaults insettings.py.Add icon. Place at
frontend/public/services/{source}.svg(prefer SVG). If the logo isn't already committed, fetch from Logo.dev — ask the user for the Logo.dev API key; do not hardcode one. Keep file size reasonable.Run migrations.
DEBUG=1 python manage.py makemigrations && DEBUG=1 ./bin/migrate(only needed if a new enum value triggers a Django migration).Rebuild schema types:
pnpm run schema:build. This updatesposthog/schema.pyfromschema-general.tsand makes the source appear in frontend dropdowns. Re-run wheneverschema-general.tschanges.Release status. For unfinished work, set
unreleasedSource=True. SetreleaseStatus="alpha"for new sources that haven't been extensively tested,releaseStatus="beta"once most rough edges are ironed out, and leavereleaseStatusunset for general availability. For controlled rollout, setfeatureFlag="dwh-{source_name}"(kebab-case). When fully releasing, removeunreleasedSource, setreleaseStatusto the appropriate stage (or omit for GA), and optionally drop the feature flag.Delete the template TODO comments before PR.
Source architecture contract
For API-backed sources, use this split:
source.py: source registration, source form fields, schema list, credential validation, resumable/webhook manager wiring, pipeline handoff.settings.py: endpoint catalog, incremental fields, primary key, partition defaults.{source}.py: API client/auth, paginator, request params, row normalization, andSourceResponse.
This keeps endpoint behavior declarative and easy to extend.
For REST sources that mix top-level and fan-out endpoints, keep endpoint metadata in settings.py and route in {source}.py with this priority:
- endpoint-specific custom iterators (only when required),
- generic fan-out helper path,
- top-level endpoint path.
Source fields (the form the user fills in)
Defined in get_source_config.fields. All field types live in posthog/schema.py and are unioned as FieldType in posthog/temporal/data_imports/sources/common/base.py.
SourceFieldInputConfig— basic input (text,email,number,password,textarea). Rendered as<LemonInput />.SourceFieldSwitchGroupConfig— toggle that reveals a sub-group of fields. Use for optional feature blocks.SourceFieldSelectConfig— dropdown. Options can carry sub-fieldsshown when selected (use for alternative auth methods — e.g. API key vs OAuth).SourceFieldOauthConfig— OAuth viaIntegrationmodel. See OAuth section.SourceFieldFileUploadConfig— file upload (JSON). Usekeys=["..."]allow-list or"*".SourceFieldSSHTunnelConfig— renders SSH tunnel sub-fields; addsssh_tunnel: SSHTunnelto the config with helpers.
Guidelines:
- Multiple auth methods →
SourceFieldSelectConfigwith childfieldsper option. - Optional toggles →
SourceFieldSwitchGroupConfig. - Confidential fields must use
SourceFieldInputConfigType.PASSWORD. The serializer derives sensitive vs nonsensitive keys automatically from the field definitions — you do not need to maintain an allow-list elsewhere.
Implementing source_for_pipeline
Return a SourceResponse directly. Do not use dlt_source_to_source_response for new sources — DLT is being removed.
Prefer yielding data in the shape the API returns it. No custom dataclasses, no heavy parsing. Yield either dict, list[dict] (preferred when possible), or a pyarrow.Table. The pipeline buffers and batches for you.
Don't import or instantiate Batcher at the source layer. The pipeline already runs one (pipelines/pipeline/pipeline.py) at the same 5000-row / 200 MiB thresholds. Yielding raw dict / list[dict] from your generator is the canonical path — reach for pyarrow.Table only when you already have arrow-shaped data (e.g., a ClickHouse adapter). Source-level batching results in double-buffering with no behavioral win.
For pyarrow tables, cap in-memory rows at ~200 MiB or ~5000 rows. Use helpers like table_from_iterator() / table_from_py_list() from posthog/temporal/data_imports/pipelines/pipeline/utils.py.
URL construction: use urllib.parse.urlencode for query strings. Don't use requests.Request(...).prepare().url — PreparedRequest.url is typed Optional[str] and the typical workaround (prepared.url or f"...") carries an unreachable fallback. urlencode is shorter, dependency-free, and produces identical output for ASCII-safe params.
Resumable source pattern
@dataclasses.dataclass
class MyResumeConfig:
next_url: str # or cursor, offset, time window — whatever the API uses
class MySource(ResumableSource[MySourceConfig, MyResumeConfig]):
def get_resumable_source_manager(self, inputs: SourceInputs) -> ResumableSourceManager[MyResumeConfig]:
return ResumableSourceManager[MyResumeConfig](inputs, MyResumeConfig)
def source_for_pipeline(
self,
config: MySourceConfig,
resumable_source_manager: ResumableSourceManager[MyResumeConfig],
inputs: SourceInputs,
) -> SourceResponse:
return my_source(..., resumable_source_manager=resumable_source_manager)
In the transport function:
resume = manager.load_state() if manager.can_resume() else None
url = resume.next_url if resume else initial_url
while True:
data = fetch_page(url)
# yield batch
next_url = data.get("links", {}).get("next")
if not next_url:
break
manager.save_state(MyResumeConfig(next_url=next_url))
url = next_url # advance before the next fetch, otherwise we loop on the same page
Save state after yielding each batch, not before — so if we crash we re-yield the last batch (merge dedupes on primary key) rather than skipping it.
Webhook source pattern
- Implement
webhook_templatereturning aHogFunctionTemplateDCthat transforms incoming webhook payloads. - Implement
webhook_resource_mapmapping our schema name → external object type. - Implement
create_webhook,delete_webhook,get_external_webhook_infoif the API allows programmatic webhook management. Otherwise return a failed result and provide awebhookSetupCaptionexplaining manual setup. - Add
webhookFieldstoSourceConfigfor post-setup inputs (e.g. signing secret). - In
source_for_pipeline, callself.get_webhook_source_manager(inputs)and pass its iterator alongside the pull iterator so a single sync pulls historical + webhook-delivered rows. - Populate
SourceSchema.supports_webhooks=Trueonly for endpoints where webhooks are actually viable (usually incremental/append-only ones).
Outbound HTTP must go through the tracked transport
Every HTTP call from posthog/temporal/data_imports/sources/** must go through make_tracked_session() (from
posthog.temporal.data_imports.sources.common.http). The tracked session attaches team_id, source_type,
external_data_source_id, external_data_schema_id, and external_data_job_id to every outbound request's
log line and OTel metric, and participates in opt-in sample capture.
- For raw
requestsusage:make_tracked_session(headers=..., retry=...)returns arequests.Session. Usesession.get/post/...instead of the module-levelrequests.get/...shortcuts. - For sources that already go through
rest_source.RESTClient: it defaults to a tracked session automatically; no change needed. - For vendor SDKs that accept a session/HTTP-client hook (Stripe
RequestsClient(session=...), gspreadauthorize(credentials, session=...), BigQuery viaAuthorizedSession+TrackedHTTPAdapter), inject one. Reference patterns live instripe/stripe.py,google_sheets/google_sheets.py, andbigquery/bigquery.py. - For vendor SDKs with no injection seam (today:
bingads,linkedin-api'sRestliClient, anything pure-gRPC), add a# nosemgrep: data-imports-http-transport-...pragma with a one-line reason and record the source as⚠️ Vendor SDKinSOURCES.md.
CI enforces this via .semgrep/rules/data-imports-http-transport.yaml. The rule bans direct requests.Session(),
requests.<verb>(...), and httpx.Client/AsyncClient/<verb> inside sources/**. Type-only imports
(from requests import Response, from requests.exceptions import HTTPError) remain allowed.
Updating SOURCES.md
posthog/temporal/data_imports/sources/SOURCES.md is the inventory of every registered source, its
communication method, and whether its outbound traffic is tracked. Update it as part of the same PR
whenever you:
- Add a new source — initially as a Scaffolded entry; move it into the Implemented table once you ship working sync logic.
- Implement a previously scaffolded source — move the row into the Implemented table and fill in comm method, primary library, and tracked-transport state.
- Migrate a vendor SDK to inject a tracked session — flip the source from
⚠️ Vendor SDKto✅. - Switch a source's protocol — e.g. swap REST for gRPC, add webhook support alongside the pull API,
or move from
requeststo a vendor SDK. Update both the comm method and tracked-transport columns.
Keep the entries alphabetical within each table. If you add a partially-tracked source, also append a short "Notes on partially-tracked sources" entry explaining what blocks tracking (no SDK seam, gRPC, etc.).
Required coding conventions
- Register with
@SourceRegistry.register. - Inherit
SimpleSource[GeneratedConfig]unless resumable/webhook behavior is required. - API sources should usually return
table_format="delta"in endpoint resources. primary_keysare endpoint-specific (declare insettings.py, not alwaysid). Use composite keys when no single field is unique.- Add partitioning for new sources where possible:
- API sources:
partition_mode="datetime"with a stable datetime field. - Database sources:
partition_countandpartition_size.
- API sources:
- Pick a partition key that does not change —
created_at,dateCreated,firstSeen. Never useupdated_atorlastSeen. - Add
get_non_retryable_errors()for known permanent failures (401/403, invalid/expired credentials, missing scopes). - Keep comments minimal and only when intent is not obvious.
- Python imports at the top of the module, not inside functions (unless needed to break circular imports).
Incremental sync guidance
- Only set
supports_incremental=Truewhen the API exposes a server-side timestamp filter (<field>_gte,since,modified_after, etc.). A "client-side cursor" that fetches every page and skips already-seen rows in Python is not incremental — every run still hits every page, so the API cost of an "incremental" sync ends up identical to a full refresh. If the API has no server filter, ship full refresh only. - If the API supports server-side time filtering, use it and map from
db_incremental_field_last_value. - Honor
inputs.incremental_field— that's the user's chosen cursor field from the schema settings.INCREMENTAL_FIELDSper-endpoint is the menu of advertised options; don't reach intoINCREMENTAL_FIELDS[endpoint][0]to pick a default and silently override the user's selection. - Per-endpoint sort enums vary. Don't hardcode
?sorting=created_at(or whatever) globally. Verify each list endpoint's allowed sort values against the API spec and with a curl smoke-test against the live API — APIs frequently document one set of options and silently reject another, or use a different timestamp column on certain resources. - Pass
?sorting=explicitly on a stable monotonic field when paginating. For incremental sources, the request sort must matchSourceResponse.sort_mode("asc"typically;"desc"only when forced by the API — seestripe/stripe.py,github/settings.py) so the pipeline's cursor watermark advances correctly. For full-refresh sources, an explicit sort prevents page-boundary skips/duplicates if the API's implicit default is unstable or shifts as rows are inserted during the sync. - If the API only supports cursor pagination, still declare incremental fields if reliable and let merge semantics dedupe.
sort_mode="desc"only if the endpoint truly cannot return ascending. For descending sources, handledb_incremental_field_earliest_valueto scroll earlier rows before newer ones (see Stripe).- Default unknown endpoints to full refresh first; enable incremental only after confirming a stable filter field and API ordering semantics.
- Confirm partition keys against response schemas, not endpoint names.
API behavior verification checklist
Before finalizing endpoint logic, verify from docs and with curl against the live API (not just docs — APIs frequently silently ignore unknown params or document outdated enums):
- Response shape: list vs object vs wrapped data (
{"data": [...]}). - Pagination: Link header vs body cursor vs offset/page; how next-page termination is signaled.
- Ordering guarantees: ascending/descending/undefined for time fields, and the API's default sort if you don't pass one.
- Sort enum per endpoint: which
sorting=values does each list endpoint accept? Some APIs vary the allowed enum per resource. Confirm with curl that the value you intend to pass returns 200, and probe with a future-date cutoff to confirm whether timestamp filters are honored or silently ignored. - Server-side timestamp filter: does
<field>_gte/since/modified_afteractually filter, or does the API accept it and ignore it? Test by passing a future date and checking whether the row drops out. - Rate-limit headers (window reset timestamp, concurrent limits).
- Field stability: whether candidate incremental/partition fields can change over time.
If undocumented, keep parsing/merge logic conservative and add a short code comment noting the uncertainty.
Endpoint inventory workflow
- Build an endpoint inventory before expanding coverage (path, auth scopes, grain, pagination style, primary key shape, incremental candidates).
- Keep it in source-local docs (e.g.
posthog/temporal/data_imports/sources/<source>/api_inventory.md). - Add endpoints in phases: org-level list endpoints → project-level fan-out → child/fan-out endpoints with bounded pagination.
Top-level endpoints (org/account level)
- Declare endpoint metadata in
settings.py(path,primary_key,incremental_fields,partition_key,sort_mode). - Build through a single resource config helper; keep transport branches minimal.
- Endpoint params stay declarative (
limit, required filters). - Merge write disposition only when incremental semantics are reliable; otherwise full replace.
Pagination tips
- Some APIs use cursor pagination in
Linkheaders — check bothrel="next"and any results flag. - When following a full cursor URL from response headers, clear request params in paginator
update_requestto avoid duplicate query params. - For parent/child fan-out, keep hard page caps per parent resource to avoid unbounded scans.
- Emit structured logs when page caps are reached (include resource name and parent identifiers).
Retry and throttling strategy
- Use
tenacityinstead of manual retry loops. - Retry transport failures and retryable status codes (
429, transient5xx). - Prefer server-provided rate-limit reset headers on
429; fall back to exponential backoff. - Bound and make deterministic (
stop_after_attempt). Preserve clear terminal behavior. - Keep timeout/retry settings near the top of the module for easy tuning.
Fan-out endpoints
Fan-out = iterate a parent resource, then query child endpoints per parent.
Prefer dependent resources for single-hop fan-out. Use rest_api_resources with a parent and child that declares type: "resolve" for the parent field. Shared infra (rest_source/__init__.py, config_setup.process_parent_data_item) paginates the parent and calls the child per parent row. Use include_from_parent so child rows carry parent fields (injected as _<parent>_<field> via make_parent_key_name).
Make fan-out declarative. Add a fan-out config object in settings.py (e.g. DependentEndpointConfig) with parent_name, resolve_param, resolve_field, include_from_parent, optional parent field renames, and optional parent endpoint params. Route single-hop fan-out through a shared helper (e.g. common/rest_source/fanout.py:build_dependent_resource).
Parent field rename mapping belongs in the helper. Callers should not branch on whether renames exist.
Per-endpoint pagination/selectors — build_dependent_resource supports endpoint overrides (parent_endpoint_extra, child_endpoint_extra for paginator / data_selector, page_size_param for non-limit size params).
Path pre-formatting: process_parent_data_item only does str.format() with the resolved param. Pre-format static placeholders with .replace() before passing to the resource config, so only the resolved placeholder remains.
Custom iterator only when fan-out is 2+ levels deep. Reuse the same pagination/retry helpers as elsewhere.
OAuth configuration
Before implementing OAuth, check if the integration already exists — search posthog/models/integration.py loosely for the service name before concluding it's new.
If new:
Env vars. Add to
posthog/settings/integrations.py:YOUR_SOURCE_CLIENT_ID = get_from_env("YOUR_SOURCE_CLIENT_ID", "") YOUR_SOURCE_CLIENT_SECRET = get_from_env("YOUR_SOURCE_CLIENT_SECRET", "")Integration kind. In
posthog/models/integration.py:- Add to
IntegrationKindenum. - Add to
OauthIntegration.supported_kinds. - Add an
elif kind == "your-source": return OauthConfig(...)branch inoauth_config_for_kind().
- Add to
Redirect URI:
https://localhost:8010/integrations/your-kind/callbackin the external service.List any new env vars in the final handoff so they can be set in all environments.
Non-retryable errors
Override get_non_retryable_errors() to mark errors that should permanently fail instead of retrying:
def get_non_retryable_errors(self) -> dict[str, str | None]:
return {
"401 Client Error: Unauthorized for url: https://api.example.com": "Your API key is invalid or expired. Please generate a new key and reconnect.",
"403 Client Error: Forbidden for url: https://api.example.com": "Your API key does not have the required permissions. Please check the key permissions and try again.",
}
Common cases: 401 Unauthorized, 403 Forbidden, invalid/expired tokens, OAuth tokens needing re-auth.
validate_credentials
Called with schema_name=None at source-create (one cheap probe to confirm the token is genuine) and with schema_name=<name> from the per-schema incremental_fields action (confirm scope for that specific endpoint).
If the API distinguishes 401 (bad token) from 403 (valid token, missing scope), accept 403 at source-create — users may legitimately only grant scopes for the endpoints they want to sync. Re-raise 403 only when schema_name is set. Sync-time 403s are handled separately by get_non_retryable_errors().
Document required token scopes
If the API issues OAuth scopes or per-resource access tokens, declare every scope the source actually calls so users know what to grant — don't make them grant the full set defensively.
- OAuth sources: set
requiredScopesonSourceFieldOauthConfig(space-separated string, matches the OAuthscopeparameter format). The frontend diffs it against the integration's granted scopes and warns the user with a Reconnect action when any are missing. - Non-OAuth sources (PAT, API key): there's no integration object to inspect, so list scopes in the
captioninstead. Captions render throughLemonMarkdown, so backticks, bold, and links work.
Mixins
From posthog/temporal/data_imports/sources/common/mixins.py:
SSHTunnelMixin—with_ssh_tunnel()context plusmake_ssh_tunnel_func()for deferred tunnel opening.OAuthMixin—get_oauth_integration()to pullIntegrationfrom the DB.ValidateDatabaseHostMixin—is_database_host_valid()to block internal VPC IPs (unless SSH tunnel is used).
Icons
- Prefer SVG over PNG. Keep file size reasonable.
- Place in
frontend/public/services/and reference as/static/services/{name}.svginiconPath. - If the source logo isn't already in the project, pull via Logo.dev. Ask the user for the API key — do not hardcode one. If the user hasn't provided one, surface that as a blocker rather than committing a placeholder.
Testing expectations
Add at least two test modules:
tests/test_<source>_source.py(source-class level):source_typeget_source_configfields and labelsget_schemasoutputsvalidate_credentialssuccess/failuresource_for_pipelineargument plumbing- for resumable sources:
get_resumable_source_managerreturns a manager bound to the right data class - for webhook sources:
create_webhook/delete_webhook/get_external_webhook_infobehavior,webhook_resource_mapcorrectness,webhook_templatepresence
tests/test_<source>.py(transport level):- paginator behavior from response headers/body
- resource generation for incremental vs non-incremental
- endpoint-specific primary key mapping
- credential validation status mapping
- mapper/filter helpers if present
- fan-out endpoint row format assertions (dict shape + parent identifiers)
- for dependent-resource fan-out: mock
rest_api_resources, pass rows with_<parent>_<field>keys to exercise parent-field injection and rename behavior - expected return schema checks for each declared endpoint in
settings.py - for resumable sources: resume-from-saved-state path (manager returns state, transport uses it as starting point); state is saved after each batch
Prefer behavior tests over config-shape tests. Avoid brittle assertions on internal config dict structure unless they protect a known regression that cannot be asserted via output behavior.
Use parameterized tests for status codes and edge cases. Lean toward over-covering.
Implementation checklist
Bootstrapping:
- [ ] Enum added to products/data_warehouse/backend/types.py (ALL_CAPS, no underscores between words)
- [ ] Entry added to frontend/src/queries/schema/schema-general.ts (kebab-case) — `pnpm run schema:build` regenerates posthog/schema.py from this; don't hand-edit posthog/schema.py
- [ ] Source imported in posthog/temporal/data_imports/sources/__init__.py + __all__
- [ ] Class inherits from SimpleSource / ResumableSource / WebhookSource (or combo) — see "Picking the right base class"
Source implementation:
- [ ] Define source fields in get_source_config
- [ ] Implement validate_credentials
- [ ] Implement get_schemas
- [ ] Add endpoint settings (settings.py)
- [ ] Implement transport + paginator ({source}.py)
- [ ] Return SourceResponse with correct primary_keys, partitioning, sort_mode
- [ ] Implement get_resumable_source_manager if ResumableSource
- [ ] Implement webhook methods if WebhookSource
- [ ] Add get_non_retryable_errors for auth/permission errors
Tooling & assets:
- [ ] Icon in frontend/public/services/ (SVG preferred — ask user for Logo.dev key if needed)
- [ ] Run `pnpm run generate:source-configs`
- [ ] Swap generic Config for generated {Source}SourceConfig in source.py
- [ ] Run `pnpm run schema:build`
- [ ] Django migrations run if enum value requires it
Release status:
- [ ] unreleasedSource=True while WIP
- [ ] releaseStatus="alpha" for new sources not yet extensively tested
- [ ] releaseStatus="beta" when most rough edges have been ironed out
- [ ] Omit releaseStatus (or set to "ga") on full release
- [ ] featureFlag="dwh-{source_name}" for controlled rollout
- [ ] Flag removed / unreleasedSource removed on full release
Tests & handoff:
- [ ] Source tests (test_<source>_source.py)
- [ ] Transport tests (test_<source>.py)
- [ ] `ruff check . --fix` and `ruff format .`
- [ ] List any new env vars (OAuth client IDs/secrets, etc) in the PR / handoff
Validation and generation workflow
After changing source fields, re-run pnpm run generate:source-configs and pnpm run schema:build, then the targeted tests for the new source. Run ruff check . --fix and ruff format . on modified Python files.
Common pitfalls
- Source not visible in wizard: not registered/imported in
sources/__init__.py, orschema:buildnot rerun. - Generated config class still empty: forgot
generate:source-configsafter updating fields. - Incremental sync misbehaving: wrong field name/type or wrong sort assumptions.
- Endless retries for bad credentials: missing
get_non_retryable_errors. - Resumable state never saved: forgot to call
save_stateafter yielding a batch; or saved before yield and a crash causes data loss. - Webhook rows not landing: feature flag
warehouse-source-webhooksdisabled, or schemais_webhook=False, orinitial_sync_complete=False. - Dependent resource path
KeyError: pre-format static path placeholders (see Fan-out). - Silent truncation risk: page caps hit without logs/metrics.
- Drift from refactors: unused function params/helpers left behind after endpoint behavior changes.
- Type drift in endpoint config dicts: use source typing aliases (
Endpoint,ClientConfig,IncrementalConfig) to keep static checks precise. - Partition key instability: picked
updated_atinstead ofcreated_at; partitions rewrite on every sync. - Hardcoded Logo.dev key committed: always ask the user for the key at runtime.