Snowflake × AWS でリアルタイムデータ基盤を作る — センシングデータ収集から可視化まで
Snowflake は優れたデータウェアハウスですが、リアルタイム性については正直に言うと苦手な部分があります。この記事では Snowflake のリアルタイム性を冷静に評価したうえで、AWS サービスで補完するアーキテクチャパターンと、センシングデータの収集・可視化の全体像を解説します。
Snowflake のリアルタイム性 — 正直な評価
Snowflake はバッチ処理・分析クエリに最適化されたウェアハウスであり、リアルタイム性については以下の制約があります。
| 項目 | 実態 |
|---|---|
| 最小ロード間隔 | Snowpipe で数十秒〜数分のラグが発生 |
| クエリレイテンシ | コールドスタート時は数秒〜数十秒かかる |
| 行ごとの UPSERT | マイクロバッチでも INSERT が前提で高頻度更新は重い |
| ストリーミング | Kafka Connector でも秒単位のラグ |
結論: Snowflake は「準リアルタイム(数分以内)」には対応できますが、「数百ミリ秒以内に表示」が必要なユースケースには向きません。
センシングデータ(IoT センサー・機器ログ等)のように 高頻度・低レイテンシ が求められる場合は、AWS のリアルタイム向けサービスと組み合わせるのが現実的な設計です。
AWS でリアルタイム性を解決する 3 パターン
パターン 1: Kinesis Data Streams + Lambda(イベント駆動)
最もシンプルな構成。センサーデータをストリームに流し、Lambda で即時処理します。
センサー → Kinesis Data Streams → Lambda → DynamoDB / RDS
↓(バッチ)
S3 → Snowflake(分析用)
向いているユースケース: アラート検知・異常値の即時通知・シンプルな集計
import json
import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('sensor_realtime')
def lambda_handler(event, context):
for record in event['Records']:
payload = json.loads(record['kinesis']['data'])
table.put_item(Item={
'device_id': payload['device_id'],
'timestamp': payload['timestamp'],
'value': payload['value'],
})
パターン 2: Amazon Timestream(時系列データ専用)
IoT・センシングデータに特化した時系列データベース。ミリ秒単位の書き込みと高速クエリに対応。
センサー → IoT Core → Timestream(リアルタイム参照)
↓(エクスポート)
S3 → Snowflake(長期分析)
向いているユースケース: 機器の稼働監視・温湿度ログ・時系列グラフの表示
import boto3
from datetime import datetime
client = boto3.client('timestream-write', region_name='ap-northeast-1')
def write_sensor_data(device_id, metric_name, value):
current_time = str(int(datetime.now().timestamp() * 1000))
client.write_records(
DatabaseName='sensor_db',
TableName='sensor_metrics',
Records=[{
'Dimensions': [{'Name': 'device_id', 'Value': device_id}],
'MeasureName': metric_name,
'MeasureValue': str(value),
'MeasureValueType': 'DOUBLE',
'Time': current_time,
'TimeUnit': 'MILLISECONDS',
}]
)
パターン 3: RDS / Aurora(リレーショナルが必要な場合)
最新値や集計済みサマリーをリレーショナル DB に持ち、Snowflake は履歴分析専用にする構成。
センサー → Kinesis → Lambda → RDS Aurora(最新値・直近データ)
↓(CDC または定期エクスポート)
Snowflake(過去データの分析・ML)
向いているユースケース: 既存の RDBMS 資産がある場合・JOIN が必要な複雑なクエリ
センシングデータの収集アーキテクチャ
エッジから Snowflake までの全体像です。
[ センサー / デバイス ]
↓ MQTT / HTTP
[ AWS IoT Core ]
↓ ルールエンジン
┌────┴────────────┐
↓ ↓
[ Kinesis ] [ S3(生データ保存)]
↓ ↓
[ Lambda ] [ Glue ETL ]
↓ ↓
[ Timestream ] [ Snowflake ]
(リアルタイム) (バッチ分析)
IoT Core のルール設定例
IoT Core のルールで Kinesis と S3 に同時に振り分けることで、リアルタイム用とバッチ用を分離します。
-- IoT Core ルールクエリ(SQL ライク)
SELECT
topic(2) AS device_id,
timestamp() AS ts,
temperature,
humidity
FROM 'sensors/+/data'
WHERE temperature IS NOT NULL
| アクション | 送信先 | 用途 |
|---|---|---|
| Kinesis Data Streams | sensor-stream | リアルタイム処理 |
| S3 | s3://sensor-raw/ | バッチ・Snowflake 取り込み |
Snowflake への取り込み(Snowpipe)
S3 に保存されたデータは Snowpipe で自動的に Snowflake に取り込みます。
CREATE STAGE sensor_stage
URL = 's3://sensor-raw/'
CREDENTIALS = (AWS_ROLE = 'arn:aws:iam::123456789:role/snowflake-role');
CREATE PIPE sensor_pipe AUTO_INGEST = TRUE AS
COPY INTO sensor_raw
FROM @sensor_stage
FILE_FORMAT = (TYPE = 'JSON');
S3 バケットに SQS 通知を設定すると、ファイル到着から数十秒以内に自動取り込みが動きます。
表示系の選択肢
Grafana(リアルタイム監視向け)
Timestream や Kinesis のデータを秒単位で更新するダッシュボード。Timestream プラグインを使うと設定が簡単です。
Amazon QuickSight(ビジネス分析向け)
Snowflake を直接データソースとして接続し、経営層・非エンジニア向けのレポートを作る場合に適しています。
カスタム UI(Next.js + API Gateway)
独自のダッシュボードが必要な場合は API Gateway → Lambda → Timestream の構成でリアルタイムデータを取得します。
// pages/api/sensor/[deviceId].ts
import { TimestreamQueryClient, QueryCommand } from '@aws-sdk/client-timestream-query';
const client = new TimestreamQueryClient({ region: 'ap-northeast-1' });
export default async function handler(req, res) {
const { deviceId } = req.query;
const command = new QueryCommand({
QueryString: `
SELECT time, measure_value::double AS value
FROM "sensor_db"."sensor_metrics"
WHERE device_id = '${deviceId}'
AND measure_name = 'temperature'
AND time > ago(1h)
ORDER BY time DESC
LIMIT 100
`,
});
const result = await client.send(command);
const rows = result.Rows?.map(row => ({
time: row.Data?.[0].ScalarValue,
value: parseFloat(row.Data?.[1].ScalarValue ?? '0'),
}));
res.json(rows);
}
役割分担まとめ
| レイヤー | サービス | 役割 |
|---|---|---|
| 収集 | AWS IoT Core | デバイスからのデータ受信(MQTT/HTTP) |
| ストリーム | Kinesis Data Streams | リアルタイムデータの転送バッファ |
| リアルタイム処理 | Lambda | 異常検知・アラート・即時書き込み |
| リアルタイムストア | Timestream / DynamoDB | 直近データの高速参照 |
| 生データ保管 | S3 | バッチ処理・再処理用の永続化 |
| バッチ ETL | Glue | S3 → Snowflake の変換・ロード |
| 分析ウェアハウス | Snowflake | 長期履歴・複雑分析・BI 連携 |
| リアルタイム可視化 | Grafana | 秒単位の監視ダッシュボード |
まとめ
「Snowflake がリアルタイムに弱い」のは欠点ではなく、専門特化の結果です。AWS のリアルタイム向けサービスと役割分担することで、それぞれの強みを最大限に活かしたデータ基盤が構築できます。
- 1 秒以内のリアルタイム表示 → Timestream / DynamoDB
- 高頻度の行更新(UPSERT) → RDS / DynamoDB
- アラート・イベント駆動処理 → Lambda
- 長期履歴・複雑分析・BI 連携 → Snowflake