Apache Spark 4.1 深掘り — 宣言型パイプライン・Real-Time Mode・PySpark 強化の全貌
Apache Spark 4.1.0 は 2025 年 12 月 16 日にリリースされ、2026 年 1 月には安定版の 4.1.1 が公開されました。Spark 4.1 の最大の特徴は「宣言型」への大きな一歩です。ETL パイプラインの定義方法・ストリーミング処理のレイテンシ・PySpark の実行効率という三つの軸で実務的な改善が加えられています。
Spark Declarative Pipelines(SDP)
Spark 4.1 の目玉機能として、Spark Declarative Pipelines(SDP) が新たに追加されました。SDP では、開発者は「何を作りたいか(データセットとクエリ)」だけを宣言すれば良く、実行グラフの構築・依存関係の解決・並列化・チェックポイント・リトライは Spark が自動的に管理します。
SDP が扱う主なオブジェクトは以下の 2 種類です。
- Streaming Table — ストリーミングクエリによって継続的に管理されるテーブル
- Materialized View — 特定クエリの結果として定義されるテーブル(バッチ的更新)
従来の命令的スタイルとの比較
従来の Structured Streaming では、パイプラインのステップを順序どおりに書き、依存関係を手動で管理する必要がありました。
# 従来の命令的スタイル
stream = (
spark.readStream.format("kafka")
.option("subscribe", "sensor-events")
.load()
)
query = (
stream.writeStream.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoints/sensor")
.start("/tables/sensor_raw")
)
query.awaitTermination()
SDP はこの煩雑さを排除し、ETL 開発を「変換のロジックを書く作業」に集中させます。
# SDP スタイル(宣言型)
import pyspark.sql.functions as F
from pyspark.sql.streaming.declarative import streaming_table
@streaming_table
def sensor_raw(spark):
return (
spark.readStream.format("kafka")
.option("subscribe", "sensor-events")
.load()
)
@streaming_table
def sensor_cleaned(spark):
return (
sensor_raw(spark)
.filter(F.col("value").isNotNull())
.withColumn("ts", F.from_unixtime("timestamp"))
)
依存関係の解決・チェックポイントの管理は Spark が担います。
Structured Streaming の Real-Time Mode(RTM)
Spark 4.1 では、Structured Streaming において Real-Time Mode(RTM) が正式にサポートされました。RTM はエンドツーエンドのレイテンシを大幅に削減することを目的としており、ステートレスなワークロードでは p99 レイテンシが一桁ミリ秒を達成できるとされています。
重要な点は、既存の Structured Streaming クエリをコード変更なしに RTM で実行できることです。移行コストをほとんどかけずにレイテンシを改善できます。
# 既存のクエリに trigger オプションを追加するだけ
query = (
stream.writeStream
.format("delta")
.trigger(continuous="1 second") # RTM を有効化
.outputMode("append")
.start("/tables/output")
)
Micro-Batch との比較
| モード | レイテンシ | 向いているユースケース |
|---|---|---|
| Micro-Batch(デフォルト) | 数秒〜数分 | 高スループット・集計処理 |
| Real-Time Mode(RTM) | 一桁ミリ秒 | 低レイテンシ・ステートレス処理 |
PySpark の Arrow ネイティブ UDF
PySpark においては、Arrow ネイティブ UDF・UDTF デコレータが追加されました。従来の Pandas UDF では PyArrow ↔ Pandas の変換オーバーヘッドが発生していましたが、新しいデコレータを使うことで PyArrow 形式のまま処理を完結させられます。
from pyspark.sql.functions import udf
import pyarrow as pa
# 従来の Pandas UDF(Pandas 経由でオーバーヘッドあり)
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf("double")
def score_pandas(values: pd.Series) -> pd.Series:
return values * 2.0
# Arrow ネイティブ UDF(変換オーバーヘッドなし)
@udf(returnType="double", useArrow=True)
def score_arrow(values: pa.Array) -> pa.Array:
return pa.compute.multiply(values, 2.0)
加えて、Python Data Source のフィルタプッシュダウンもサポートされました。カスタム Python データソースを使用する際に、フィルタ条件をデータ取得側に押し込むことでデータ転送量を削減できます。
SQL 機能の強化
SQL Scripting の GA
Spark 4.0 で導入された SQL スクリプティングが正式版となり、デフォルト有効化されました。エラーハンドリングの改善も含まれています。
BEGIN
DECLARE total_rows BIGINT;
SET total_rows = (SELECT COUNT(*) FROM sensor_raw);
IF total_rows > 0 THEN
INSERT INTO sensor_summary
SELECT device_id, AVG(value) FROM sensor_raw GROUP BY device_id;
END IF;
EXCEPTION WHEN OTHERS THEN
INSERT INTO error_log VALUES (current_timestamp(), 'ETL failed');
END;
VARIANT 型の GA
半構造化データをネイティブに扱える VARIANT 型が正式版になりました。シュレッディングにより読み取りが高速化されています。
-- JSON データを VARIANT 型で格納
CREATE TABLE events (
id BIGINT,
payload VARIANT
);
-- VARIANT フィールドへのアクセス
SELECT payload:device_id::STRING, payload:metrics:temperature::DOUBLE
FROM events
WHERE payload:event_type = 'sensor_reading';
再帰 CTE のサポート
階層データの問い合わせが SQL 標準の再帰 CTE で記述可能になりました。
-- 組織階層の全展開
WITH RECURSIVE org_tree AS (
SELECT id, name, parent_id, 0 AS level
FROM organizations
WHERE parent_id IS NULL
UNION ALL
SELECT o.id, o.name, o.parent_id, t.level + 1
FROM organizations o
JOIN org_tree t ON o.parent_id = t.id
)
SELECT * FROM org_tree ORDER BY level, name;
近似スケッチ関数
- KLL スケッチ — 分位数推定(大規模データの p50/p95/p99 を高速計算)
- Theta スケッチ — カーディナリティ推定(APPROX_COUNT_DISTINCT の高精度版)
-- KLL スケッチで p95 レイテンシを推定
SELECT
service_name,
approx_percentile(response_ms, 0.95) AS p95_latency
FROM request_logs
GROUP BY service_name;
大規模ワークロード向けの安定性改善
- zstd 圧縮 Protobuf プラン — 論理プランのシリアライズに zstd 圧縮を適用し、大規模クエリでのオーバーヘッドを削減
- チャンク化 Arrow 結果ストリーミング — Arrow 形式の結果を分割送信することで、巨大なローカルリレーションへの対応を強化
Spark 4.2 プレビューの動向
2026 年 3 月には Spark 4.2.0 のプレビュー版が公開されました。正式版ではないため API・機能は変更の可能性がありますが、SDP の機能強化や RTM のさらなるレイテンシ改善が予定されています。
まとめ
Apache Spark 4.1 は、ETL をより宣言的に記述できる SDP、低レイテンシ処理を実現する RTM、Python 開発の効率を高める Arrow ネイティブ UDF と、実務上の課題に直結する機能強化が揃ったリリースです。
- SDP — パイプラインの宣言的定義。依存関係・チェックポイントを Spark に委譲できる
- Real-Time Mode — 既存クエリを変更なしに一桁ミリ秒レイテンシへ移行可能
- Arrow ネイティブ UDF — Pandas 変換オーバーヘッドを排除した高速 Python UDF
- SQL Scripting・VARIANT・再帰 CTE — 複雑な ETL ロジックを SQL で完結させやすくなった
既存の Structured Streaming クエリを変更なしに RTM へ移行できる後方互換性の配慮も評価できるリリースです。