DEV Community

Cover image for Stop Re-running Everything: A Local Incremental Pipeline in DuckDB
Marko
Marko

Posted on

Stop Re-running Everything: A Local Incremental Pipeline in DuckDB

I love local-first data work… until I catch myself doing the same thing for the 12th time:

“I changed one model. Better rerun the whole pipeline.”

This post is a light walkthrough of a tiny project that fixes that habit using incremental models + cached DAG runs — all on your laptop with DuckDB. The example is a simplified, DuckDB-only version of the existing incremental_demo project.

We’ll do three runs:

  1. seed v1 → initial build
  2. run again unchanged → mostly skipped
  3. seed v2 (update + new row) → incremental merge/upsert

That’s it. No cloud. No ceremony.

The whole demo in one sentence

We seed a tiny raw.events table (from CSV), build a staging model, then build incremental facts that only process “new enough” rows based on updated_at, and apply updates based on event_id.

What’s in the mini project

There are three key pieces:

1) Two seed snapshots

v1 has 3 rows.
v2 changes event_id=2 (newer updated_at, different value) and adds event_id=4.

2) A source mapping

The project defines a source raw.events pointing at a seeded table called seed_events.

3) A few models (SQL + Python)

  • events_base (staging table)
  • fct_events_sql_inline (incremental SQL, config inline)
  • fct_events_sql_yaml (incremental SQL, config in project.yml)
  • fct_events_py_incremental (incremental Python model for DuckDB)

All of these exist in the exported demo.

DuckDB-only setup

The demo’s DuckDB profile is simple: it writes to a local DuckDB file.

profiles.yml (DuckDB profile)

dev_duckdb:
  engine: duckdb
  duckdb:
    path: "{{ env('FF_DUCKDB_PATH', '.local/incremental_demo.duckdb') }}"
Enter fullscreen mode Exit fullscreen mode

.env.dev_duckdb (optional convenience)

FF_DUCKDB_PATH=.local/incremental_demo.duckdb
FF_DUCKDB_SCHEMA=inc_demo_schema
Enter fullscreen mode Exit fullscreen mode

The models (the fun part)

Staging: events_base

This is intentionally boring: cast timestamps, keep columns tidy.

{{ config(materialized='table') }}

select
  event_id,
  cast(updated_at as timestamp) as updated_at,
  value
from {{ source('raw', 'events') }};
Enter fullscreen mode Exit fullscreen mode

Incremental SQL (inline config): fct_events_sql_inline

This model declares:

  • materialized='incremental'
  • unique_key='event_id'
  • watermark column: updated_at

And on incremental runs it only selects rows newer than the max updated_at already in the target.

This assumes updated_at increases when rows change (it’s a demo; real pipelines may need late-arrival handling).

{{ config(
    materialized='incremental',
    unique_key='event_id',
    incremental={ 'updated_at_column': 'updated_at' },
) }}

with base as (
  select *
  from {{ ref('events_base.ff') }}
)
select
  event_id,
  updated_at,
  value
from base
{% if is_incremental() %}
where updated_at > (
  select coalesce(max(updated_at), timestamp '1970-01-01 00:00:00')
  from {{ this }}
)
{% endif %};
Enter fullscreen mode Exit fullscreen mode

Incremental SQL (YAML-config style): fct_events_sql_yaml

Same idea, but the SQL file stays “clean”:

{{ config(materialized='incremental') }}

with base as (
  select *
  from {{ ref('events_base.ff') }}
)
select
  event_id,
  updated_at,
  value
from base;
Enter fullscreen mode Exit fullscreen mode

…and the incremental knobs live in project.yml:

models:
  incremental:
    fct_events_sql_yaml.ff:
      unique_key: "event_id"
      incremental:
        enabled: true
        updated_at_column: "updated_at"
Enter fullscreen mode Exit fullscreen mode

Pick whichever style better suits you.

Incremental Python (DuckDB): fct_events_py_incremental

This one just adds value_x10 in pandas, and returns a delta frame. The incremental behavior (merge/upsert) is configured in project.yml for the model.

from fastflowtransform import engine_model
import pandas as pd

@engine_model(
    only="duckdb",
    name="fct_events_py_incremental",
    deps=["events_base.ff"],
)
def build(events_df: pd.DataFrame) -> pd.DataFrame:
    df = events_df.copy()
    df["value_x10"] = df["value"] * 10
    return df[["event_id", "updated_at", "value", "value_x10"]]
Enter fullscreen mode Exit fullscreen mode

The three-run walkthrough

We’ll follow the demo’s exact “story arc”: first build, no-op build, then seed changes triggering incremental updates.

Step 0: pick a local seeds folder

The Makefile uses a local seeds dir and swaps seed_events.csv between v1 and v2.

mkdir -p .local/seeds
Enter fullscreen mode Exit fullscreen mode

A tiny dataset that still proves “incremental”

This demo uses two versions of the same seed file. v2 updates one existing row and adds one new row — so you can watch an incremental model do both an upsert and an insert.

seeds/seed_events_v1.csv

event_id,updated_at,value
1,2024-01-01 00:00:00,10
2,2024-01-02 00:00:00,20
3,2024-01-03 00:00:00,30
Enter fullscreen mode Exit fullscreen mode

seeds/seed_events_v2.csv

event_id,updated_at,value
1,2024-01-01 00:00:00,10
2,2024-01-05 00:00:00,999
3,2024-01-03 00:00:00,30
4,2024-01-06 00:00:00,40
Enter fullscreen mode Exit fullscreen mode

After you switch from v1 → v2 and run again, you should end up with:

  • event_id=2 updated (newer updated_at, value=999)
  • event_id=4 inserted (brand new row)

1) First run (seed v1 → initial build)

Copy v1 into place:

cp seeds/seed_events_v1.csv .local/seeds/seed_events.csv
Enter fullscreen mode Exit fullscreen mode

Seed + run:

FFT_SEEDS_DIR=.local/seeds fft seed . --env dev_duckdb
fft run . --env dev_duckdb --cache=rw
Enter fullscreen mode Exit fullscreen mode

What you should expect:

  • events_base becomes a normal table
  • incremental models create their target tables for the first time (it’s effectively a full build the first time)

2) No-op run (same seed v1; should be mostly skipped)

Run again without changing anything:

fft run . --env dev_duckdb --cache=rw
Enter fullscreen mode Exit fullscreen mode

The demo literally calls this the “no-op run… should be mostly skipped,” which is the best feeling in local data dev.

3) Change the seed (v2 snapshot) and run incremental

Now swap to v2:

cp seeds/seed_events_v2.csv .local/seeds/seed_events.csv
FFT_SEEDS_DIR=.local/seeds fft seed . --env dev_duckdb
fft run . --env dev_duckdb --cache=rw
Enter fullscreen mode Exit fullscreen mode

This is the punchline:

  • event_id=2 comes in with a newer updated_at and value=999
  • event_id=4 shows up for the first time

So your incremental facts should update the row for event_id=2 and insert event_id=4, based on unique_key=event_id.

Sanity check in DuckDB

Query the incremental SQL table:

duckdb .local/incremental_demo.duckdb \
  "select * from inc_demo_schema.fct_events_sql_inline order by event_id;"
Enter fullscreen mode Exit fullscreen mode

After v2, you should see:

  • event_id=2 with updated_at = 2024-01-05 and value = 999
  • a new row for event_id=4 with updated_at = 2024-01-06 and value = 40

If you query the Python table:

duckdb .local/incremental_demo.duckdb \
  "select * from inc_demo_schema.fct_events_py_incremental order by event_id;"
Enter fullscreen mode Exit fullscreen mode

You should also see value_x10 (e.g., 9990 for the updated row).

Make the DAG visible

You can see the DAG in the docs:

fft docs serve --env dev_duckdb --open
Enter fullscreen mode Exit fullscreen mode

DAG from local docs server

Optional tiny “quality check”

The demo includes simple not-null tests for the incremental outputs.

fft test . --env dev_duckdb --select tag:incremental
Enter fullscreen mode Exit fullscreen mode

What you just bought yourself

With this setup, your local dev loop becomes:

  • Run once to build everything
  • Run again and skip most work
  • Change input data and update only what’s necessary
  • Update existing rows safely (via unique_key) instead of “append and pray”

And all of it works with a single local DuckDB file, which makes experimenting feel cheap again.

Top comments (0)