Snowflakeのタスク×ストリームで作る!自動ELTパイプライン入門

Snowflakeのタスク×ストリームで作る!自動ELTパイプライン入門 Snowflake

はじめに:なぜ「タスク × ストリーム」なの?

こんにちは!データ基盤を作っていると、こんなことを考えませんか?
「新しく入ってきたデータだけを、自動で加工して別テーブルに入れたい」

これ、実はSnowflakeならタスク(Task)ストリーム(Stream)を組み合わせるだけで、外部のワークフローツールを使わずに作れてしまいます。今回は、この最強コンビで作る「継続的ELTパイプライン」の仕組みと作り方を、初心者目線でやさしく解説していきますね。

※ELTとは「Extract(抽出)→ Load(ロード)→ Transform(変換)」の順でデータを処理するモダンな手法のことです。

Snowflakeのタスクとストリームを組み合わせて作る継続的ELTパイプラインの全体構成を示す概念図

役割分担を整理しよう

まずは2つの登場人物の役割を確認しましょう。

  • ストリーム:テーブルの変更(INSERT / UPDATE / DELETE)をこっそり記録してくれる「変更ログ」。CDC(Change Data Capture)機能。
  • タスク:SQLを定期的に実行してくれる「Snowflake内蔵のスケジューラ」。CRONや「N分ごと」で動く。

このコンビは、ストリームが「何が変わったか」を覚えておき、タスクが「定期的にそれを処理する」という分業体制で動きます。両者の基本は Snowflakeストリーム入門|テーブルの変更を自動キャプチャするCDC機能Snowflakeタスク入門|定期実行ジョブを5分で作る方法 でじっくり解説しているので、未読の方はぜひ先に覗いてみてください。

具体例で動かしてみよう

「生データを格納する raw_orders テーブル」と「集計済みの orders_summary テーブル」があると想像してください。新しい注文が入ったときだけ集計テーブルを更新したい、というシナリオです。

ステップ1:ストリームを作る

CREATE OR REPLACE STREAM orders_stream
ON TABLE raw_orders;

これで raw_orders への変更が orders_stream に自動で蓄積されます。

ステップ2:差分を取り込むタスクを作る

CREATE OR REPLACE TASK orders_elt_task
  WAREHOUSE = compute_wh
  SCHEDULE = '5 MINUTE'
  WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
AS
INSERT INTO orders_summary (order_date, total_amount)
SELECT order_date, SUM(amount)
FROM orders_stream
WHERE METADATA$ACTION = 'INSERT'
GROUP BY order_date;

ここで一番大事なのが WHEN SYSTEM$STREAM_HAS_DATA(...) の部分。これは「ストリームに未処理の差分があるときだけ実行する」という条件式で、変更がないときはタスクがスキップされてクレジット(課金)を消費しません。これだけで賢くコスト節約できるんです。

ステップ3:タスクを起動

ALTER TASK orders_elt_task RESUME;

タスクは作っただけでは止まっています。RESUME で初めて動き出すので忘れずに!

orders_elt_taskがストリームの未処理差分を検知して自動実行されるSnowflake ELTパイプラインの動作フロー図

覚えておきたいポイント

  • ストリームは「読んだら進む」:タスク内の INSERT ... SELECT FROM stream が成功すると、ストリームのオフセットが進み、次回は新しい差分だけを処理します。
  • トランザクションで動く:タスク内のSQLは1トランザクションとして扱われるため、途中で失敗してもストリームは進みません(再実行で復旧可能)。
  • タスクツリー:AFTER 句で別タスクの後に連結すれば、「クレンジング → 集計 → マート作成」のような多段ELTも組めます。
  • もっとシンプルにしたいなら:変換が単純なら Snowflake動的テーブル という選択肢もあります。タスク+ストリームは「処理ロジックを自分で書きたい」ケースで真価を発揮します。

よくあるユースケース

この構成は Snowpipe でロードした生データを、定期的に整形して分析用テーブルに流し込む「準リアルタイムELT」の定番パターンです。「Snowpipeで取り込み → ストリームで差分検知 → タスクで集計」というバトンリレーで、エンドツーエンドのデータパイプラインがSnowflake内だけで完結しちゃいます。

まとめ

タスク×ストリームの組み合わせは、Snowflakeで継続的ELTを作るときの王道パターンです。ストリームで差分を捕まえ、タスクで定期処理し、WHEN 句でムダな実行を防ぐ。この3つを押さえれば、もうあなたも自動データパイプラインの設計者ですよ!

参考リンク

関連記事