Snowpark入門|PythonでSnowflakeのデータ処理を書こう

Snowpark入門|PythonでSnowflakeのデータ処理を書こう Snowflake

SnowparkってなんだろうPythonでSnowflakeを動かすって本当?

「SnowflakeのデータをPythonで加工したい!でもいちいちpandasに読み込み直すのは面倒…」
そんな悩みをまるっと解決してくれるのが、今回紹介する Snowpark です。

Snowpark を使うと、Pythonのコードを書くだけで、その処理がSnowflake側で実行されます。データをローカルに引っ張ってこなくていいので、巨大なテーブルでもスケールするのが嬉しいポイント。この記事を読み終わるころには、「あ、Snowpark ってこんな感じか!」とイメージが掴めるようになりますよ。

Snowpark入門|PythonでSnowflakeのデータ処理を書こう

Snowparkの基本概念

そもそもSnowparkとは?

Snowpark は、Python・Java・Scalaから Snowflake のデータを操作できる開発フレームワークです。中でも一番人気なのが Python版の「Snowpark for Python」。pandas や PySpark を触ったことがある人なら、見た瞬間「あ、これ知ってる雰囲気!」となるはずです。

SQLを書くのと何が違うの?

Snowflake は通常 SQL でデータ処理を書きますが、複雑なロジック(機械学習の前処理、ループ処理、文字列操作のかたまりなど)になると SQL だけでは書きづらいことも。Snowpark なら Python の表現力を活かしつつ、処理は Snowflake のウェアハウス上で実行されるため、データ転送のオーバーヘッドがありません。

キーになる2つの概念

  • Session: Snowflake への接続を表すオブジェクト。すべてのスタート地点。
  • DataFrame: 表形式のデータを操作するためのオブジェクト。filter()group_by() をチェーンして書ける。

実際に書いてみよう!最初のSnowparkコード

まずはインストール。Python 3.8以上の環境で次のコマンドを実行します。

pip install snowflake-snowpark-python

続いて、Snowflakeに接続してテーブルを操作する基本パターンです。

from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, avg

connection_parameters = {
    "account": "xy12345.ap-northeast-1.aws",
    "user": "MY_USER",
    "password": "********",
    "role": "SYSADMIN",
    "warehouse": "COMPUTE_WH",
    "database": "SALES_DB",
    "schema": "PUBLIC",
}

# 1. セッションを作る
session = Session.builder.configs(connection_parameters).create()

# 2. テーブルをDataFrameとして読み込む
df = session.table("ORDERS")

# 3. PythonっぽくフィルタやGROUP BYを書く
result = (
    df.filter(col("STATUS") == "PAID")
      .group_by("CUSTOMER_ID")
      .agg(avg("AMOUNT").alias("AVG_AMOUNT"))
)

# 4. 実行して結果を取得
result.show()

注目すべきは、このPythonコードは裏で自動的にSQLに変換され、Snowflakeのウェアハウスで実行される点です。result.show() を呼んだ瞬間に、初めてクエリが投げられます(これを「遅延評価」と呼びます)。

実際に書いてみよう!最初のSnowparkコードの解説図

ユースケースと注意点

こんなときに便利!

  • 機械学習の前処理: 大量データのクレンジングや特徴量作成をPythonで書ける
  • Python UDF / ストアドプロシージャ: Pythonロジックを Snowflake に登録して再利用できる(Snowflake UDF入門もあわせてどうぞ)
  • ETLパイプライン: タスク×ストリームのELTパイプラインと組み合わせて自動化

はじめに知っておきたい注意点

  • 処理は Snowflakeのウェアハウスで動くので、ウェアハウスを起動した分だけクレジットを消費します。
  • to_pandas() でローカルに変換すると一気にデータが手元に流れてきます。大きなテーブルでは要注意!
  • 遅延評価のため、show()collect()write系を呼ぶまで実際のクエリは走りません。

まとめ

Snowpark は「Pythonの書きやすさ × Snowflakeのスケーラビリティ」を両取りできる仕組みです。SQLでは書きにくい複雑なロジックを Python で表現でき、しかもデータを動かさずにサーバー側で処理してくれる――まさに現代のデータエンジニア・データサイエンティストの強い味方ですね。

まずは session.table() でテーブルを読み込んで show() するところから、気軽に始めてみましょう!

参考リンク

関連記事