dbt + DuckDB — ローカルで本格的な分析基盤を作る

DuckDB と dbt を組み合わせると、BigQuery や Snowflake なしでも本格的なデータ変換パイプラインをローカルで動かせます。この記事ではセットアップからモデル作成・運用のポイントまで実際のコマンドとクエリで解説します。

DuckDB とは

DuckDB は組み込み型の分析用 SQL データベースです。SQLite のように単一ファイルで動作しますが、列指向ストレージSIMD 最適化により大量データの集計クエリを高速に処理できます。

特徴詳細
組み込み型サーバー不要。Python・CLI・各言語から直接使用
列指向ストレージ分析クエリに最適化された高速処理
CSV / Parquet 直読みファイルを DB にインポートせず直接クエリ可能
Arrow 統合Pandas・Polars との高速なデータ受け渡し
SQL 互換PostgreSQL 方言に準拠、Window 関数・CTE 完全対応

DuckDB を選ぶ場面

データ量が数 GB 以下で、ローカル・CI 環境で完結したい
  → DuckDB + dbt-duckdb

数 TB 規模・複数チームでのコラボレーション
  → BigQuery / Snowflake

既存の Postgres 環境をそのまま使いたい
  → dbt-postgres

セットアップ

必要なパッケージのインストール

pip install dbt-duckdb duckdb

dbt-duckdb は dbt-core と DuckDB アダプターを含みます。個別インストールは不要です。

dbt プロジェクトの初期化

dbt init my_analytics
cd my_analytics

profiles.yml の設定

~/.dbt/profiles.yml を編集します。

my_analytics:
  target: dev
  outputs:
    dev:
      type: duckdb
      path: "dev.duckdb"
      threads: 4

    # インメモリ(一時的な使用向け)
    memory:
      type: duckdb
      path: ":memory:"
      threads: 4

    prod:
      type: duckdb
      path: "/data/analytics/prod.duckdb"
      threads: 8

CSV / Parquet を直接読み込む

DuckDB の強力な機能の一つが、ファイルを DB にインポートせずそのまま SQL でクエリできる点です。

-- CSV を直接クエリ
SELECT
    date,
    product_id,
    SUM(sales_amount) AS total_sales
FROM read_csv('data/sales_*.csv',
    header = true,
    dateformat = '%Y-%m-%d'
)
GROUP BY date, product_id;

-- Parquet を直接クエリ
SELECT *
FROM read_parquet('data/events/*.parquet')
WHERE event_date >= '2025-01-01'
LIMIT 100;

dbt の source として外部ファイルを参照

# models/sources.yml
version: 2

sources:
  - name: raw_files
    tables:
      - name: sales
        meta:
          external_location: >
            read_csv(
              'data/sales_*.csv',
              header = true,
              dateformat = '%Y-%m-%d'
            )
      - name: events
        meta:
          external_location: "read_parquet('data/events/*.parquet')"

dbt モデルの作成

ステージングモデル(クレンジング)

-- models/staging/stg_sales.sql
WITH source AS (
    SELECT * FROM {{ source('raw_files', 'sales') }}
),

cleaned AS (
    SELECT
        CAST(date AS DATE)           AS sale_date,
        TRIM(product_id)             AS product_id,
        TRIM(UPPER(region))          AS region,
        CAST(sales_amount AS DOUBLE) AS sales_amount,
        CAST(quantity AS INTEGER)    AS quantity
    FROM source
    WHERE sales_amount > 0
      AND product_id IS NOT NULL
)

SELECT * FROM cleaned

中間モデル(集計)

-- models/intermediate/int_daily_sales.sql
WITH daily AS (
    SELECT
        sale_date,
        product_id,
        region,
        SUM(sales_amount) AS daily_sales,
        SUM(quantity)     AS daily_quantity
    FROM {{ ref('stg_sales') }}
    GROUP BY sale_date, product_id, region
),

with_rank AS (
    SELECT
        *,
        RANK() OVER (
            PARTITION BY sale_date
            ORDER BY daily_sales DESC
        ) AS sales_rank
    FROM daily
)

SELECT * FROM with_rank

マートモデル(最終成果物)

-- models/mart/mart_product_performance.sql
{{
    config(
        materialized = 'table',
        tags = ['mart', 'daily']
    )
}}

SELECT
    product_id,
    region,
    SUM(daily_sales)        AS total_sales,
    AVG(daily_sales)        AS avg_daily_sales,
    SUM(daily_quantity)     AS total_quantity,
    MIN(sale_date)          AS first_sale_date,
    MAX(sale_date)          AS last_sale_date,
    COUNT(DISTINCT sale_date) AS active_days,
    SUM(daily_sales) / NULLIF(COUNT(DISTINCT sale_date), 0) AS sales_per_active_day
FROM {{ ref('int_daily_sales') }}
GROUP BY product_id, region
ORDER BY total_sales DESC

DuckDB 固有の便利な SQL 機能

-- LIST_AGG で配列集計
SELECT
    product_id,
    LIST(DISTINCT region ORDER BY region) AS regions
FROM stg_sales
GROUP BY product_id;

-- ASOF JOIN(時系列データの結合)
SELECT s.*, p.price
FROM sales s
ASOF JOIN prices p
    ON s.product_id = p.product_id
    AND s.sale_date >= p.effective_date;

dbt の実行

# 全モデルを実行
dbt run

# 特定モデルだけ実行
dbt run --select mart_product_performance

# 上流依存を含めて実行(+ は上流、後ろの + は下流)
dbt run --select +mart_product_performance

# テスト実行
dbt test

# ドキュメント生成・表示
dbt docs generate
dbt docs serve  # ブラウザで lineage graph を確認

BigQuery / Snowflake との使い分け

観点DuckDB + dbtBigQuery / Snowflake
コスト無料クエリ量・保存量に応じた従量課金
データ量〜数十 GB数 TB〜
チーム共有ファイル共有が必要クラウドで自然に共有
セットアップpip install のみプロジェクト・権限設定が必要
CI 速度高速(インメモリ)ネットワーク遅延あり
本番利用個人・小チーム向け企業の本番データ基盤

推奨アーキテクチャ:

開発・プロトタイプ: DuckDB(ローカル)
    ↓ 検証完了後
本番データ基盤: BigQuery / Snowflake

CI テスト: DuckDB(インメモリ)で高速実行
本番パイプライン: BigQuery / Snowflake で実行

ハマりやすいポイント

同時書き込みロック

DuckDB のファイルは同時に 1 プロセスしか書き込めません。CI/CD で並列に dbt run を実行するとロックエラーになります。

# NG: 並列実行
dbt run &
dbt run &   # エラー: Database is locked

# OK: 別ファイルを使う
dbt run --target ci_1  # ci_1.duckdb
dbt run --target ci_2  # ci_2.duckdb

外部ファイルのパスは絶対パスが安全

external_location に相対パスを書くと、dbt の実行ディレクトリによって変わります。

# 環境変数を使うと安全
external_location: "{{ env_var('DATA_DIR') }}/sales.csv"

DuckDB バージョンと dbt-duckdb のバージョン不一致

dbt-duckdb に同梱の DuckDB を使うのが推奨です。個別にインストールする場合はバージョンを合わせます。

pip install dbt-duckdb==1.9.1 duckdb==1.1.3

まとめ

  • DuckDB は組み込み型の高速分析 DB。CSV / Parquet を直接クエリでき、サーバー不要
  • dbt + DuckDB の組み合わせでローカル完結の本格 ELT パイプラインが構築できる
  • CI では :memory: モードで高速にテストし、本番はクラウド DW に切り替えるパターンが有効
  • 同時書き込み制限・パスの扱いに注意すれば、個人〜小チームの分析基盤として十分実用的