Snowflakeストリーム入門|テーブルの変更を自動キャプチャするCDC機能

Snowflakeストリーム入門|テーブルの変更を自動キャプチャするCDC機能 Snowflake

はじめに:ストリームってなに?

こんにちは!今回は Snowflake の ストリーム (Stream) という機能を、初心者向けにやさしく解説していきます。

「昨日のデータと今日のデータの差分だけ抽出したい」「変更があった行だけを別のテーブルに反映したい」——こんな場面、よくありますよね。これを毎回 SQL で頑張ろうとすると、タイムスタンプ列を比較したり、フラグを立てたりと、けっこう面倒です。

そこで登場するのが ストリーム。テーブルに対する INSERT / UPDATE / DELETE の変更を 自動的に追跡 (CDC: Change Data Capture) してくれる、Snowflake の便利機能です。この記事を読み終わるころには、「あ、これならETLパイプラインがめっちゃ楽に作れそう!」と思えるはずですよ。

ストリームの基本概念

ストリームは、ざっくり言うと「テーブルの変更履歴を覗くためのウィンドウ」です。実際にデータをコピーするわけではなく、変更箇所を指し示すオフセット (しおり) を内部に持っています。

ストリームを SELECT すると、前回読み取った位置から現在までの変更行だけが見えます。そして、その変更行を別テーブルに INSERT するなど DML で「消費」すると、しおりが自動的に進んで、次回はまた新しい変更だけが見える、という仕組みです。

Snowflakeストリームがテーブルの変更履歴をオフセットで追跡しCDCを実現する仕組みを示す概念図

ストリームが追加してくれる3つのメタ列

ストリームを SELECT すると、元テーブルの列に加えて以下の メタデータ列 が自動的に付きます。

  • METADATA$ACTION: INSERT または DELETE
  • METADATA$ISUPDATE: UPDATE の場合は TRUE (UPDATEは DELETE+INSERT として記録)
  • METADATA$ROW_ID: 行を一意に識別するID

具体的な使い方

1. ストリームを作成する

シンプルな構文はこちらです。CDC に対応した 通常のテーブルに対してストリームを張ります。

-- ソーステーブル
CREATE OR REPLACE TABLE orders (
  order_id   INT,
  customer   STRING,
  amount     NUMBER
);

-- ストリームの作成
CREATE OR REPLACE STREAM orders_stream
ON TABLE orders;

2. 変更を発生させて確認

-- データを追加
INSERT INTO orders VALUES (1, 'Alice', 1000), (2, 'Bob', 2000);

-- ストリームを覗く
SELECT *, METADATA$ACTION, METADATA$ISUPDATE
FROM orders_stream;

結果として、追加された2行が METADATA$ACTION = 'INSERT' として表示されます。

3. 変更を消費する (オフセットを進める)

ストリームは、それを参照する DML 文 (INSERT / MERGE など) が成功するとオフセットが進みます。よくあるパターンは MERGE で別テーブルに反映する書き方です。

MERGE INTO orders_history AS h
USING orders_stream AS s
ON h.order_id = s.order_id
WHEN MATCHED AND s.METADATA$ACTION = 'DELETE' THEN DELETE
WHEN MATCHED AND s.METADATA$ACTION = 'INSERT' THEN UPDATE SET h.amount = s.amount
WHEN NOT MATCHED AND s.METADATA$ACTION = 'INSERT' THEN INSERT VALUES (s.order_id, s.customer, s.amount);
Snowflakeストリームの差分をMERGE文で別テーブルに反映するCDCパイプラインの処理フロー図

よくあるユースケース

① タスクと組み合わせて自動ETL

ストリーム単体では「呼ばれた時に差分を返す」だけなので、定期的に処理させるにはSnowflakeタスクと組み合わせるのが鉄板です。タスクで5分おきに MERGE を実行すれば、ニアリアルタイムなデータ反映パイプラインがノーコードで完成します。

② Snowpipe と連携してCDC

Snowpipe でロードされた生データに対してストリームを張れば、新しく取り込まれた行だけを後続の集計テーブルへ流せます。

③ 動的テーブルとの違い

近年は動的テーブルでも自動更新パイプラインが組めますが、ストリームは「差分そのものを自分で操作したい」「カスタムロジックを挟みたい」というケースで強みを発揮します。

注意点

  • 保持期間: ストリームの差分は、ソーステーブルの DATA_RETENTION_TIME_IN_DAYS に依存します。長く放置すると「stale (古くなった)」状態になり、読めなくなるので注意。
  • 消費は1回: 同じ変更を複数回読みたい場合は、ストリームを複数作成しましょう。
  • 種類: 標準ストリーム / 追加専用 (APPEND_ONLY) / 挿入専用 (INSERT_ONLY、外部テーブル用) の3タイプがあります。

まとめ

Snowflake のストリームは、テーブルの変更を「差分として自動でキャプチャ」してくれる、CDC パイプラインの強力な武器です。タスクや Snowpipe と組み合わせれば、コードを書かずに増分処理パイプラインが作れちゃいます。まずは小さなテーブルで CREATE STREAM → INSERT → SELECT の流れを試してみてくださいね!

参考リンク

関連記事