Apache Spark 4.1 深掘り — 宣言型パイプライン・Real-Time Mode・PySpark 強化の全貌

Apache Spark 4.1.0 は 2025 年 12 月 16 日にリリースされ、2026 年 1 月には安定版の 4.1.1 が公開されました。Spark 4.1 の最大の特徴は「宣言型」への大きな一歩です。ETL パイプラインの定義方法・ストリーミング処理のレイテンシ・PySpark の実行効率という三つの軸で実務的な改善が加えられています。

Spark Declarative Pipelines(SDP)

Spark 4.1 の目玉機能として、Spark Declarative Pipelines(SDP) が新たに追加されました。SDP では、開発者は「何を作りたいか(データセットとクエリ)」だけを宣言すれば良く、実行グラフの構築・依存関係の解決・並列化・チェックポイント・リトライは Spark が自動的に管理します。

SDP が扱う主なオブジェクトは以下の 2 種類です。

  • Streaming Table — ストリーミングクエリによって継続的に管理されるテーブル
  • Materialized View — 特定クエリの結果として定義されるテーブル(バッチ的更新)

従来の命令的スタイルとの比較

従来の Structured Streaming では、パイプラインのステップを順序どおりに書き、依存関係を手動で管理する必要がありました。

# 従来の命令的スタイル
stream = (
    spark.readStream.format("kafka")
    .option("subscribe", "sensor-events")
    .load()
)

query = (
    stream.writeStream.format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/checkpoints/sensor")
    .start("/tables/sensor_raw")
)

query.awaitTermination()

SDP はこの煩雑さを排除し、ETL 開発を「変換のロジックを書く作業」に集中させます。

# SDP スタイル(宣言型)
import pyspark.sql.functions as F
from pyspark.sql.streaming.declarative import streaming_table

@streaming_table
def sensor_raw(spark):
    return (
        spark.readStream.format("kafka")
        .option("subscribe", "sensor-events")
        .load()
    )

@streaming_table
def sensor_cleaned(spark):
    return (
        sensor_raw(spark)
        .filter(F.col("value").isNotNull())
        .withColumn("ts", F.from_unixtime("timestamp"))
    )

依存関係の解決・チェックポイントの管理は Spark が担います。

Structured Streaming の Real-Time Mode(RTM)

Spark 4.1 では、Structured Streaming において Real-Time Mode(RTM) が正式にサポートされました。RTM はエンドツーエンドのレイテンシを大幅に削減することを目的としており、ステートレスなワークロードでは p99 レイテンシが一桁ミリ秒を達成できるとされています。

重要な点は、既存の Structured Streaming クエリをコード変更なしに RTM で実行できることです。移行コストをほとんどかけずにレイテンシを改善できます。

# 既存のクエリに trigger オプションを追加するだけ
query = (
    stream.writeStream
    .format("delta")
    .trigger(continuous="1 second")  # RTM を有効化
    .outputMode("append")
    .start("/tables/output")
)

Micro-Batch との比較

モードレイテンシ向いているユースケース
Micro-Batch(デフォルト)数秒〜数分高スループット・集計処理
Real-Time Mode(RTM)一桁ミリ秒低レイテンシ・ステートレス処理

PySpark の Arrow ネイティブ UDF

PySpark においては、Arrow ネイティブ UDF・UDTF デコレータが追加されました。従来の Pandas UDF では PyArrow ↔ Pandas の変換オーバーヘッドが発生していましたが、新しいデコレータを使うことで PyArrow 形式のまま処理を完結させられます。

from pyspark.sql.functions import udf
import pyarrow as pa

# 従来の Pandas UDF(Pandas 経由でオーバーヘッドあり)
from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf("double")
def score_pandas(values: pd.Series) -> pd.Series:
    return values * 2.0

# Arrow ネイティブ UDF(変換オーバーヘッドなし)
@udf(returnType="double", useArrow=True)
def score_arrow(values: pa.Array) -> pa.Array:
    return pa.compute.multiply(values, 2.0)

加えて、Python Data Source のフィルタプッシュダウンもサポートされました。カスタム Python データソースを使用する際に、フィルタ条件をデータ取得側に押し込むことでデータ転送量を削減できます。

SQL 機能の強化

SQL Scripting の GA

Spark 4.0 で導入された SQL スクリプティングが正式版となり、デフォルト有効化されました。エラーハンドリングの改善も含まれています。

BEGIN
  DECLARE total_rows BIGINT;
  SET total_rows = (SELECT COUNT(*) FROM sensor_raw);

  IF total_rows > 0 THEN
    INSERT INTO sensor_summary
    SELECT device_id, AVG(value) FROM sensor_raw GROUP BY device_id;
  END IF;
EXCEPTION WHEN OTHERS THEN
  INSERT INTO error_log VALUES (current_timestamp(), 'ETL failed');
END;

VARIANT 型の GA

半構造化データをネイティブに扱える VARIANT 型が正式版になりました。シュレッディングにより読み取りが高速化されています。

-- JSON データを VARIANT 型で格納
CREATE TABLE events (
  id BIGINT,
  payload VARIANT
);

-- VARIANT フィールドへのアクセス
SELECT payload:device_id::STRING, payload:metrics:temperature::DOUBLE
FROM events
WHERE payload:event_type = 'sensor_reading';

再帰 CTE のサポート

階層データの問い合わせが SQL 標準の再帰 CTE で記述可能になりました。

-- 組織階層の全展開
WITH RECURSIVE org_tree AS (
  SELECT id, name, parent_id, 0 AS level
  FROM organizations
  WHERE parent_id IS NULL

  UNION ALL

  SELECT o.id, o.name, o.parent_id, t.level + 1
  FROM organizations o
  JOIN org_tree t ON o.parent_id = t.id
)
SELECT * FROM org_tree ORDER BY level, name;

近似スケッチ関数

  • KLL スケッチ — 分位数推定(大規模データの p50/p95/p99 を高速計算)
  • Theta スケッチ — カーディナリティ推定(APPROX_COUNT_DISTINCT の高精度版)
-- KLL スケッチで p95 レイテンシを推定
SELECT
  service_name,
  approx_percentile(response_ms, 0.95) AS p95_latency
FROM request_logs
GROUP BY service_name;

大規模ワークロード向けの安定性改善

  • zstd 圧縮 Protobuf プラン — 論理プランのシリアライズに zstd 圧縮を適用し、大規模クエリでのオーバーヘッドを削減
  • チャンク化 Arrow 結果ストリーミング — Arrow 形式の結果を分割送信することで、巨大なローカルリレーションへの対応を強化

Spark 4.2 プレビューの動向

2026 年 3 月には Spark 4.2.0 のプレビュー版が公開されました。正式版ではないため API・機能は変更の可能性がありますが、SDP の機能強化や RTM のさらなるレイテンシ改善が予定されています。

まとめ

Apache Spark 4.1 は、ETL をより宣言的に記述できる SDP、低レイテンシ処理を実現する RTM、Python 開発の効率を高める Arrow ネイティブ UDF と、実務上の課題に直結する機能強化が揃ったリリースです。

  • SDP — パイプラインの宣言的定義。依存関係・チェックポイントを Spark に委譲できる
  • Real-Time Mode — 既存クエリを変更なしに一桁ミリ秒レイテンシへ移行可能
  • Arrow ネイティブ UDF — Pandas 変換オーバーヘッドを排除した高速 Python UDF
  • SQL Scripting・VARIANT・再帰 CTE — 複雑な ETL ロジックを SQL で完結させやすくなった

既存の Structured Streaming クエリを変更なしに RTM へ移行できる後方互換性の配慮も評価できるリリースです。