Build and run dbt transformations for a data product
dataproduct-implement generates the ends of the pipeline: input-port sources (from access agreements) and output-port models (from the data contract). This skill fills in the middle, the staging/ and intermediate/ layers, and runs dbt against the result. The conventions below are adapted from dbt's structure and materialization best practices, and are meant to be edited by the organization adopting this plugin.
When to use this vs. other skills
- No dbt project yet → run
dataproduct-bootstrapfirst. - No output-port models or input-port sources yet → run
dataproduct-implementfirst. - Auditing or reviewing an existing dbt project → use
/reviewor/simplify, not this skill. - Authoring or refactoring the middle of the DAG, or running dbt → this skill.
Conventions (the editable part)
These rules are the contract between this skill and the rest of the plugin. Organizations forking the plugin should treat this section as the place to encode their own style.
Layer responsibilities
| Layer | Purpose | Materialization | References | Naming |
|---|---|---|---|---|
models/input_ports/<op-id>.source.yaml |
External raw data, one file per active access agreement | n/a (source) | n/a | source name = <provider-dp-id>_<provider-op-id> |
models/staging/stg_<provider-dp-id>__<table>.sql |
One staging model per source table: rename, cast, light cleanup, dedup | view |
{{ source(...) }} only |
double underscore separates source from entity |
models/intermediate/int_<purpose>.sql |
Joins, aggregations, pivots, surrogate keys, single-purpose | view (or ephemeral if used once) |
{{ ref(stg_*) }} and {{ ref(int_*) }} only |
verb-based filename describing what it does |
models/output_ports/v1/<table>.sql |
Published, contract-governed tables | table (or incremental past the threshold below) |
{{ ref(int_*) }} or {{ ref(stg_*) }} |
table name from the ODCS contract's models: key |
Deviations from upstream dbt conventions, called out so the lineage stays visible:
- We use
input_ports/andoutput_ports/v1/instead of dbt'ssources/marts, to match the data-product lifecycle terminology. - Output-port models are not prefixed with
fct_/dim_. The contract dictates the published name, and consumers see that name directly. - We keep
staging/andintermediate/flat for single-data-product repos. Use subfolders by output port (e.g.staging/<output-port-id>/) only if a data product has more than two or three output ports.
Staging model shape (CTE pattern)
Every staging model uses this three-CTE skeleton so the structure is uniform and a reader knows where to find the rename block.
-- Staging model for source <provider-dp-id>_<provider-op-id>.<table>
with
source as (
select * from {{ source('<provider-dp-id>_<provider-op-id>', '<table>') }}
),
renamed as (
select
cast(<raw_col> as <warehouse_type>) as <canonical_name>,
...
from source
),
final as (
select * from renamed
)
select * from final
Allowed transformations in renamed: column renames, casts, simple case rewrites, unit conversions (e.g. cents to dollars), trim/lower. Disallowed: joins, aggregations, any cross-source logic. Those belong in intermediate/.
Intermediate model shape
- One file per purpose. If you find yourself naming it
int_foo_and_bar, split it. - All inputs come through
ref(), neversource(). Asource()call inintermediate/means a staging model is missing. - Pick
viewunless the model is referenced once and is cheap, in which caseephemeralkeeps it out of the warehouse.
Tests
Tests are declared in the _models.yml next to the file.
| Layer | Default tests |
|---|---|
input_ports/ (sources) |
freshness: block when the upstream contract publishes an SLA; otherwise none |
staging/ |
not_null + unique on the natural key, accepted_values on enum columns |
intermediate/ |
relationships on every join key, not_null on any column the next layer depends on |
output_ports/v1/ |
Derived from the ODCS contract by dataproduct-implement |
Custom singular tests live in tests/<purpose>.sql and assert cross-model invariants the schema-tests cannot express.
Materializations
dbt's tiered rule applies: start with view, promote to table when query latency hurts, promote to incremental when the build window hurts.
- Default per layer (set in
dbt_project.yml): staging →view, intermediate →view, output_ports →table. - Switch an output port to
incrementalwhen one of these is true:- The model takes longer than 5 minutes to build full-refresh.
- The source data grows by more than ~10M rows per scheduled run.
- A CI run cannot rebuild the model within the deploy budget.
When switching to incremental, configure all three keys; defaults vary by warehouse:
{{ config(
materialized='incremental',
unique_key='<natural_key>',
on_schema_change='append_new_columns',
incremental_strategy='merge' # databricks/snowflake/bigquery; use 'delete+insert' on postgres
) }}
Inside the model body, gate new-row logic with {% if is_incremental() %} where updated_at > (select max(updated_at) from {{ this }}) {% endif %}.
Schemas
Each layer lands in its own warehouse schema so consumers see only the published output, not the scaffolding.
| Layer | Schema | Example |
|---|---|---|
input_ports/ |
n/a (sources read from upstream schemas; not materialized here) | — |
staging/ |
internal_<data-product-id> |
internal_dp_acme_customer_activity |
intermediate/ |
internal_<data-product-id> (same as staging) |
internal_dp_acme_customer_activity |
output_ports/v<N>/<table>.sql |
op_<output-port-id>_v<N> |
op_customer_activity_v1 |
Why this split:
- The output-port schema embeds the output port version so v1 and v2 of the same port can coexist while one is being rolled out. Consumers are granted access to
op_<output-port-id>_v<N>only. - Staging and intermediate share a single internal schema, scoped by data product id so two data products on the same warehouse do not collide. That schema is not granted to consumers.
To wire this up in dbt, the project needs two pieces:
Override
generate_schema_nameso+schema:is taken literally instead of being suffixed onto the target's default schema. Place this inmacros/get_custom_schema.sql:{% macro generate_schema_name(custom_schema_name, node) -%} {%- if custom_schema_name is none -%} {{ target.schema }} {%- else -%} {{ custom_schema_name | trim }} {%- endif -%} {%- endmacro %}Per-model schema for output ports (one schema per port, so a directory-level rule does not work when v
/ contains more than one port): -- top of models/output_ports/v1/<table>.sql {{ config(schema='op_<output_port_id>_v1') }}Directory-level schema for internal models in
dbt_project.yml:models: <data_product_id>: staging: +schema: internal_<data_product_id> intermediate: +schema: internal_<data_product_id>
With these in place, dbt run creates exactly op_<op-id>_v1, op_<op-id>_v2, and internal_<dp-id> in the warehouse, no extra prefixes.
Documentation
- Every model has a
description:in_models.yml. Single sentence: what it represents, not how it is built. - Every column with a non-obvious meaning has a
description:. Obvious ones (customer_id,created_at) can skip it. - Output-port descriptions come from the contract; do not edit them locally — edit the contract via
datacontract-edit.
How to run this skill
${PLUGIN_ROOT}below refers to the root of this plugin (the directory that containsskills/). On Claude Code it is set automatically as${CLAUDE_PLUGIN_ROOT}; use that. On any other agent it is unset; resolve it as../..relative to thisSKILL.mdfile's directory.
Plan announcement (before Step 0)
Before running Step 0, print this plan to the user verbatim:
Running dataproduct-dbt. I'll:
- Pre-checks: confirm this is a dbt project with
input_ports/,staging/,intermediate/,output_ports/v1/.- Identify which operation you want (build staging, add intermediate, refactor an output port, run dbt, switch to incremental).
- Apply the operation following the conventions in this skill (which you can edit to match your org's style).
- Verify with
dbt parseand, when you ask,dbt build --select <scope>.- Summarize what changed and what's open.
Then proceed.
Step 0 — Pre-checks
- Confirm
uv run --quiet dbt --versionsucceeds from the project root. If it fails, runuv syncand retry; if still missing, stop and tell the user to add the dbt adapter for their warehouse topyproject.toml's[dependency-groups].dev(e.g.dbt-snowflake,dbt-databricks) and re-runuv sync. Useuv run dbt …for every dbt CLI invocation in this skill. - Confirm
dbt_project.ymlexists at the working directory root. If not, route todataproduct-bootstrap. - Confirm the four model directories exist (
input_ports/,staging/,intermediate/,output_ports/v1/). If any are missing, route todataproduct-bootstraporentropy-data-sync(whichever the user prefers) and stop.
Step 1 — Identify the operation
Match the user's ask to one of the operations below. If two fit, ask which one. The operations are deliberately small so the skill stays focused; run it again for the next operation.
| If the user says... | Operation |
|---|---|
| "build out staging", "create staging models for input ports" | A |
| "add an intermediate model for X", "join staging models" | B |
| "refactor this output port into staging + intermediate", "this output port has direct source refs" | C |
| "run dbt", "test the models", "compile only", "build the models" | D |
| "make this output port incremental", "switch X to incremental" | E |
| "add tests / descriptions to staging or intermediate" | F |
Step 2 — Apply the operation
A. Generate staging models from input ports
For each models/input_ports/<provider-op-id>.source.yaml (skip files the user did not select if they narrowed the scope):
- Read the source file. Note
sources[0].name(the<dp-id>_<op-id>reference) and eachtables[].name(from the provider's contract). - For each table, write
models/staging/stg_<provider-dp-id>__<table>.sqlusing the staging CTE pattern above. Apply column renames only when the raw name violates the project's snake_case convention or the project's column-name policy; otherwise pass them through with explicit casts. - Append the model to
models/staging/_models.yml:models: - name: stg_<provider-dp-id>__<table> description: Cleaned, renamed, and type-cast columns from <provider-dp-id>.<table>. columns: - name: <natural_key> tests: [not_null, unique] - Do not invent business logic. If a rename or cast is non-obvious, leave a
-- TODO:comment in the model and surface it in the final report.
B. Add an intermediate model
- Ask the user what the model is for in one sentence (the verb plus the entities). Map to a filename:
int_<verb>_<entity>.sql(e.g.int_payments_pivoted_to_orders.sql). - Write
models/intermediate/<filename>.sql. Use only{{ ref(...) }}references to staging or other intermediate models. - Append a
_models.ymlentry with a description and tests on join keys (relationships,not_null). - Run
dbt parseto verify. Do not run the model in the warehouse without the user asking.
C. Refactor an output-port model into staging + intermediate
When models/output_ports/v1/<table>.sql contains {{ source(...) }} calls or pile-up joins:
- Identify each
source()call. Generate a staging model for each (operation A). - Identify each join, aggregation, pivot, or surrogate-key construction. Move it to one or more intermediate models (operation B).
- Rewrite the output-port body to be a thin projection over
ref(int_*)and apply the contract-driven casts and column order. - Run
dbt parse. If the rewrite changes column counts or order, surface a diff and ask before saving.
D. Run dbt
Pick the right command based on the user's ask. Always scope with --select unless the user explicitly asks for the full project.
| User intent | Command |
|---|---|
| Compile only (no warehouse roundtrip) | dbt parse |
| Materialize a model and its upstream | dbt run --select +<model> |
| Run tests for a model and its upstream | dbt test --select +<model> |
| Materialize + test in DAG order | dbt build --select +<model> |
| Rebuild an incremental model from scratch | dbt build --full-refresh --select <model> |
Confirm before any dbt run, dbt test, or dbt build on the whole project. Those touch the warehouse and can be expensive.
E. Switch an output port to incremental
- Ask the user for the natural key (must be unique per row) and the timestamp column the incremental filter will use.
- Add the
{{ config(...) }}block at the top of the model (see the Materializations section above). - Add the
{% if is_incremental() %} where <ts> > (select max(<ts>) from {{ this }}) {% endif %}filter on the deepest CTE that reads from the source. - Run
dbt build --full-refresh --select <model>once with the user's confirmation to seed the table. - On the next run, confirm
dbt build --select <model>only inserts new rows (checktarget/run/.../<model>.sqlfor themerge/delete+insertshape).
F. Add tests or descriptions
- Open the relevant
_models.yml. - Add
description:lines from the user's input or from the contract (for output ports, the contract is the source of truth — do not paraphrase it). - Add tests per the Tests table above. Run
dbt parseto verify YAML.
Step 3 — Verify
After any operation that wrote SQL or YAML:
- Run
dbt parseto catch syntax errors and dangling refs. - If the user authorized it, run
dbt build --select <scope>to materialize and test the affected models. - Do not run
dbt runseparately fromdbt test—dbt buildorders them correctly and stops on failures.
Step 4 — Final report
End with this two-part recap. Use the shared Status enum: created, updated, already present, deferred, skipped.
Part 1 — outcome table. One row per operation applied.
| Artifact | Status | Details |
|---|---|---|
| Operation | … | A / B / C / D / E / F (one row per operation run) |
| Staging models | … | <N> files at models/staging/stg_<...>.sql |
| Intermediate models | … | <N> files at models/intermediate/int_<...>.sql |
| Output-port refactor | … | <table>.sql rewritten to ref staging/intermediate / not applicable |
_models.yml entries |
… | counts per layer |
dbt parse |
… | "passed" / "failed: |
dbt build --select <scope> |
… | "passed" / "failed: |
Part 2 — next steps. Bullet list, include only what applies:
- For each
-- TODO:left in a generated model, list the model and the open question. - If staging or intermediate rename decisions were taken without user input, list them so the user can confirm.
- If the user asked for
dbt buildbut it was skipped (e.g. missing warehouse creds), the exact env vars they need to set. - If an output port is approaching the incremental threshold (>5 min, >10M rows/run), suggest operation E.
If there is nothing in Part 2, write a single line: No further action required.
Constraints
- No business logic in staging. If a join, aggregation, or
caseinvolving multiple columns shows up there, push it tointermediate/. - No
source()outside staging. Intermediate and output ports useref()exclusively. If you find asource()call elsewhere, surface it and propose operation C. - Contract is source of truth for output ports. Do not edit the column list, types, descriptions, or tests on output-port models from this skill — those come from the ODCS contract. Edits to the contract go through
datacontract-edit. - Idempotent. Re-running an operation when the target files already match the conventions is a no-op; the skill should report
already present. - Do not push or commit. Leave VCS state to the user.