dbt Core 1.11 と Apache Spark 4.1 の注目アップデート — 宣言的パイプラインと UDF 管理

データエンジニアリングのエコシステムで大きなアップデートが相次ぎました。dbt Labs は dbt Core 1.11 の GA を発表し、Apache Spark は 4.1 系で宣言的パイプライン(SDP)とリアルタイムストリーミングモード(RTM)を正式リリースしました。

dbt Core 1.11:UDF のファーストクラスサポート

dbt Core 1.11 では ユーザー定義関数(UDF)の直接管理 が追加されました。

UDF の定義と管理

# dbt プロジェクト内で UDF を定義
# models/udf/my_udf.sql
{% set udf_def %}
CREATE OR REPLACE FUNCTION my_database.my_schema.normalize_text(input STRING)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.9'
AS $$
import re
def normalize_text(input):
    return re.sub(r'\s+', ' ', input.strip().lower())
$$
{% endset %}
{{ udf_def }}
  • Python UDF・デフォルト引数・豊富な設定オプションをサポート
  • dbt プロジェクト内で UDF を定義・登録でき、ウェアハウス全体で同じ変換ロジックを再利用可能

Deferral の UDF 対応

--defer --state オプションと組み合わせることで、UDF を事前ビルドせずに依存モデルを実行できます。

# 変更されたモデルのみ実行(UDF は前回ビルドを再利用)
dbt run --defer --state .dbt-states/production

JSON スキーマ検証がデフォルト有効化

YAML 設定の誤りや古い構成を開発の早い段階で検出できるようになりました。

# NG: 古い構成(警告が出る)
models:
  - name: my_model
    config:
      materialized: table
    columns:
      - name: id
        tests:          # 古いキー名
          - unique
          - not_null

# OK: 現在の推奨構成
models:
  - name: my_model
    config:
      materialized: table
    columns:
      - name: id
        data_tests:     # 新しいキー名
          - unique
          - not_null

dbt Core 1.10:YAML アンカーと Fusion エンジン

1.11 に先行した 1.10 では以下が導入されました:

YAML アンカーで設定を再利用

# 共通設定をアンカーで定義
anchors:
  - &common_config
    materialized: table
    schema: mart
    tags: ['production']

models:
  - name: mart_orders
    config:
      <<: *common_config  # アンカーを展開

  - name: mart_customers
    config:
      <<: *common_config  # 同じ設定を再利用

dbt Fusion エンジン:Apache Spark 3.0 のサポート開始

dbt Fusion エンジン CLI が Apache Spark 3.0 のサポートを beta 提供開始しました。Fusion はより高速なコンパイル・実行を実現するエンジンで、Spark ユーザーへの展開が進んでいます。

Apache Spark 4.1:宣言的パイプライン(SDP)

Apache Spark 4.1.1 の目玉機能は Spark Declarative Pipelines(SDP) です。

SDP とは

バッチ・ストリーミング両対応の宣言的フレームワークで、以下を Spark が自動管理します:

  • 実行グラフの構築
  • 依存関係の順序解決
  • 並列実行・チェックポイント・リトライ
from pyspark.sql import SparkSession
from pyspark.declarative import pipeline, table, flow

spark = SparkSession.builder.appName("MyPipeline").getOrCreate()

@table(
    comment="生データの読み込み",
    table_properties={"quality": "bronze"}
)
def raw_events():
    return (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", "broker:9092")
        .option("subscribe", "events")
        .load()
    )

@table(
    comment="クレンジング済みデータ",
    table_properties={"quality": "silver"}
)
def cleaned_events():
    return (
        raw_events()
        .filter("value IS NOT NULL")
        .select("timestamp", "user_id", "event_type")
    )

@flow
def my_pipeline():
    return cleaned_events()

対応データソース

カテゴリサービス
クラウドストレージAmazon S3、Azure ADLS Gen2、Google Cloud Storage
メッセージバスApache Kafka、Amazon Kinesis、Google Pub/Sub、Azure EventHub

Apache Spark 4.1:Structured Streaming リアルタイムモード(RTM)

Spark 4.1 では Structured Streaming Real-Time Mode(RTM) が初の正式リリースとなりました。

RTM の特徴

  • ステートレスなタスクではレイテンシがシングルデジット(1桁)ミリ秒まで低下
  • Spark SQL エンジン上で動作し、連続処理をインクリメンタルかつ継続的に実行
# RTM での Structured Streaming 設定
query = (
    cleaned_events
    .writeStream
    .trigger(continuous="1 second")  # RTM を有効化
    .outputMode("append")
    .format("delta")
    .option("checkpointLocation", "/checkpoints/events")
    .start("/tables/silver_events")
)

レイテンシ比較

モードレイテンシ向いているユースケース
バッチ処理分〜時間日次集計、ETL
マイクロバッチ(デフォルト)秒単位準リアルタイムダッシュボード
RTM(Spark 4.1 新機能)1桁ミリ秒リアルタイムアラート、IoT

その他の Spark 4.1 新機能

Arrow ネイティブの UDF / UDTF デコレータ

Pandas 変換オーバーヘッドなしに PyArrow で効率的に実行できます。

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import pyarrow as pa

@udf(returnType=StringType(), useArrow=True)
def normalize_text(text: pa.Array) -> pa.Array:
    return pa.compute.utf8_lower(text)

VARIANT 型 GA

-- セミ構造化データを VARIANT 型で格納
CREATE TABLE events_raw (
    id BIGINT,
    payload VARIANT
);

-- shredding でネストフィールドを高速アクセス
SELECT payload:user_id::STRING AS user_id,
       payload:event_type::STRING AS event_type
FROM events_raw;

SQL Scripting GA(デフォルト有効化)

-- 条件分岐・ループを含む SQL スクリプト
BEGIN
  DECLARE total_count INT;
  SET total_count = (SELECT COUNT(*) FROM orders WHERE status = 'pending');

  IF total_count > 1000 THEN
    INSERT INTO alerts VALUES ('高い未処理件数: ' || total_count, CURRENT_TIMESTAMP());
  END IF;
END

バージョンロードマップ

バージョンリリース状態
Spark 4.0.22026-02-05安定版(バグ修正)
Spark 4.1.12026-01-09安定版
Spark 4.2.0-preview12026-01-11プレビュー(API 変更あり)

まとめ

  • dbt Core 1.11 は UDF を dbt プロジェクトのファーストクラス市民として組み込み、YAML 品質チェックの標準化によりチーム開発の信頼性を高めた
  • Apache Spark 4.1 は SDP と RTM という 2 つの大きな機能で、パイプラインのコード量削減とリアルタイム処理のレイテンシ改善を同時に実現
  • dbt Fusion エンジンが Spark をサポートし始めたことで、両エコシステムの統合がさらに進む可能性がある
  • RTM の 1 桁ミリ秒レイテンシは IoT・リアルタイムアラートなど低レイテンシ要件のユースケースで大きな武器になる