MinIO に保存された Iceberg テーブルを Trino から読み書きし、dbt で DWH と Mart を作る構成を理解します。
dbt + Iceberg + Semantic Layer を自分で組み立てる教材
この HTML は、このプロジェクトの実装をもとに、初学者が
dbt、Apache Iceberg、semantic layer、
time spine、dbt test、dbt docs
を理解しながらアーキテクチャを再現できるようにした学習用ガイドです。
この教材のゴール
catalog、schema、warehouse、source、ref などの意味を、実際の設定値に沿って学びます。
どのファイルを、どの順番で追加すればよいかを、コピペしやすいコード付きで追えるようにしています。
tests、documentation、semantic layer、time spine を一つの流れで理解します。
どんなアーキテクチャなのか
このプロジェクトは、データの保存場所、メタデータ管理、 SQL 実行エンジン、変換ロジック を分離したモダンデータスタックです。
Python
サンプルデータを生成し、Trino に SQL を投げて Iceberg テーブルを初期作成します。
Trino
クエリエンジンです。dbt と Python は Trino に接続して SQL を実行します。
Iceberg REST Catalog
テーブル定義やスナップショットなどのメタデータを管理します。
PostgreSQL
Iceberg Catalog の JDBC バックエンドとしてメタデータ保存に使います。
MinIO
Parquet ファイルや Iceberg のメタデータファイルの実体置き場です。
dbt
datalake を source として読み、dwh と mart を構築します。
MinIO は「データの保存先」、Iceberg REST + PostgreSQL は「テーブル定義の管理」、Trino は「SQL の実行役」、dbt は「変換ロジックの管理役」です。
Iceberg の強みは、データファイルの形式とメタデータの持ち方が標準化されていることです。だから Trino だけでなく、Spark のような別エンジンからも同じテーブルを扱えます。
共有カタログとは何か?
カタログとは「どんなテーブルが存在して、そのデータがどこにあるか」を記録した台帳です。
iceberg-rest がない場合、各エンジンが独自に台帳を持ちます。
Trino → 自分だけのメタデータ管理 → "usersテーブルはs3://warehouse/datalake/users/v1/にある"
Spark → 自分だけのメタデータ管理 → (Trinoが作ったusersテーブルを知らない)
dbt → Trinoを経由するので見える
iceberg-rest があると、全エンジンが同じ台帳を参照します。
Trino ─┐
Spark ─┼─→ iceberg-rest (共有カタログ) → MinIO上のデータ
dbt ─┘
何が嬉しいのか?
- Trinoで
CREATE TABLEしたテーブルを、そのままSparkでSELECTできる - dbtがTrinoで変換したテーブルを、Sparkのバッチ処理でも使える
- テーブルのスキーマ変更 (カラム追加など) が全エンジンに即時反映される
なぜ異なるエンジンでクエリ実行できるのか?
ポイントは データフォーマット (Parquet) とメタデータ仕様 (Iceberg) が標準化されている からです。
Trino が CREATE TABLE → Parquetファイルを MinIO に保存
→ "このファイルがusersテーブル" と iceberg-rest に登録
Spark が SELECT * FROM users
→ iceberg-rest に「usersテーブルのデータはどこ?」と問い合わせ
→ "s3://warehouse/datalake/users/..." と返ってくる
→ Spark が直接 MinIO から Parquet を読む
各エンジンは Parquetの読み方 と Iceberg REST APIの叩き方 さえ知っていれば、お互いのデータを読み書きできます。Icebergはその「共通言語」の役割を果たしています。
まず押さえる 5 つの用語
テーブルフォーマットです。Parquet の集まりを単なるファイル置き場ではなく、テーブルとして安全に扱えるようにします。
「どのテーブルがどこにあるか」を管理する仕組みです。このプロジェクトでは Trino の iceberg catalog を使います。
テーブルのまとまりです。ここでは datalake、dwh、mart に役割分担しています。
メトリクスやエンティティを共通定義する層です。分析者が毎回 SQL を手書きしなくてもよくなります。
日付軸の基準表です。日別・月別などの時系列集計を安定して行うために使います。
最終的なディレクトリ構成
modern-data-stack/
├── data/
│ ├── orders.parquet
│ ├── products.parquet
│ └── users.csv
├── dbt/
│ ├── dbt_project.yml
│ ├── profiles.yml
│ ├── macros/
│ │ └── generate_schema_name.sql
│ ├── models/
│ │ ├── datalake/
│ │ │ └── sources.yml
│ │ ├── dwh/
│ │ │ ├── _time_spine.sql
│ │ │ ├── dim_products.sql
│ │ │ ├── dim_users.sql
│ │ │ ├── fct_orders.sql
│ │ │ ├── schema.yml
│ │ │ └── semantic_layer.yml
│ │ └── mart/
│ │ └── daily_sales.sql
│ └── target/
├── docker/
│ ├── docker-compose.yaml
│ ├── Dockerfile.dbt
│ └── Dockerfile.python
├── scripts/
│ ├── generate_data.py
│ └── ingest_data.py
├── trino/
│ └── catalog/
│ └── iceberg.properties
└── index.html
target/ は dbt の実行後に生成される成果物です。初学者が手で作るべきなのは、それ以外の設定ファイルと SQL ファイルです。
ステップバイステップで再現する
以下は「このプロジェクトを自分でゼロから組み立てるならこの順番」という流れです。
Docker Compose で全体構成を作る
最初に、各コンポーネントがどう連携するかを docker-compose.yaml にまとめます。以下は今のプロジェクトで実際に使っている内容そのままです。
services:
postgres:
image: postgres:16
container_name: postgres
ports:
- 5432:5432
environment:
POSTGRES_USER: iceberg
POSTGRES_PASSWORD: iceberg
POSTGRES_DB: iceberg_catalog
healthcheck:
test: ["CMD-SHELL", "pg_isready -U iceberg -d iceberg_catalog"]
interval: 10s
timeout: 5s
retries: 10
minio:
image: minio/minio
container_name: minio
ports:
- 9000:9000 # MinIO の S3 API エンドポイント (アプリや Trino/Iceberg からオブジェクトを読み書きする)
- 9001:9001 # MinIO の管理コンソール UI (ブラウザでバケットやオブジェクトを操作する)
environment:
MINIO_ROOT_USER: admin
MINIO_ROOT_PASSWORD: password
command: server /data --console-address ":9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
# MinIO Client(mc) で初期化処理を行う一時コンテナ。
# MinIO 起動待ち -> warehouse バケット作成 -> ポリシー設定を実行して終了する。
mc:
image: minio/mc
container_name: mc
depends_on:
minio:
condition: service_healthy
# MinIO が起動して S3 API が利用可能になるまで待機してから、mc コマンドでバケット作成とポリシー設定を行う。
# 今回作成しているバケットは、warehouse という名前のバケット。
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc alias set myminio http://minio:9000 admin password) do echo '...waiting for minio...' && sleep 1; done;
/usr/bin/mc mb --ignore-existing myminio/warehouse;
/usr/bin/mc anonymous set public myminio/warehouse;
exit 0;
"
iceberg-rest:
image: tabulario/iceberg-rest # Iceberg の REST Catalog サーバー。Trino/dbt はこの REST API 経由で Iceberg メタデータを操作する。
container_name: iceberg-rest
ports:
- 8181:8181
volumes:
- ../data/iceberg-rest:/tmp/iceberg-rest
environment: # Iceberg REST Catalog の保存先(S3)とメタデータ管理方式を定義
- CATALOG_WAREHOUSE=s3://warehouse/ # データファイル/メタデータのルートパス(warehouse バケット配下)
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO # S3(AWS SDK)でファイル入出力する実装クラス
- AWS_REGION=us-east-1 # Iceberg REST 内部の AWS SDK が参照するリージョン
- CATALOG_S3_ENDPOINT=http://minio:9000 # S3 互換ストレージの接続先(MinIO)
- CATALOG_S3_ACCESS__KEY__ID=admin # MinIO アクセスキー
- CATALOG_S3_SECRET__ACCESS__KEY=password # MinIO シークレットキー
- CATALOG_S3_PATH__STYLE__ACCESS=true # MinIO 向けに path-style アクセスを有効化
- CATALOG_CATALOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog # メタデータを JDBC カタログで管理する実装
- CATALOG_URI=jdbc:postgresql://postgres:5432/iceberg_catalog # PostgreSQL カタログ DB
- CATALOG_JDBC_USER=iceberg # PostgreSQL ユーザー
- CATALOG_JDBC_PASSWORD=iceberg # PostgreSQL パスワード
depends_on:
minio:
condition: service_healthy
postgres:
condition: service_healthy
trino:
image: trinodb/trino:443
container_name: trino
ports:
- 8080:8080
volumes:
- ../trino/catalog:/etc/trino/catalog
depends_on:
- iceberg-rest
python-app:
build:
context: ../docker
dockerfile: Dockerfile.python
container_name: python-app
volumes:
- ../data:/app/data
- ../scripts:/app/scripts
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
- S3_ENDPOINT=http://minio:9000
depends_on:
- minio
dbt:
build:
context: ../docker
dockerfile: Dockerfile.dbt
container_name: dbt
ports:
- 8081:8081
volumes:
- ../dbt:/usr/app/dbt
environment: # dbt-trino 接続先設定
- DBT_TRINO_HOST=trino # Trino サーバーのホスト名(Compose サービス名)
- DBT_TRINO_PORT=8080 # Trino の HTTP ポート
- DBT_TRINO_USER=admin # Trino に接続するユーザー名
- DBT_TRINO_CATALOG=iceberg # 利用する Trino カタログ名
- DBT_TRINO_SCHEMA=mart # dbt が作成/参照するデフォルトスキーマ
depends_on:
- trino
Iceberg は単体では完結しません。保存先のオブジェクトストレージ、メタデータ管理、SQL 実行エンジンが必要なので、Compose で一式をまとめます。
POSTGRES_DB=iceberg_catalog は Iceberg Catalog 用 DB 名、CATALOG_WAREHOUSE=s3://warehouse/ は MinIO 上の保存ルートです。Trino から Iceberg を見えるようにする
trino/catalog/iceberg.properties
# TrinoがIcebergテーブルに接続するための設定ファイル
# 使用するTrinoコネクタ種別。Icebergテーブルを扱うため `iceberg` を指定。
connector.name=iceberg
# Icebergカタログの実装方式。REST Catalogサーバーを使うため `rest` を指定。
iceberg.catalog.type=rest
# Iceberg REST Catalogの接続先URL。
iceberg.rest-catalog.uri=http://iceberg-rest:8181
# Icebergのデータ/メタデータを保存するルート(warehouse)。
iceberg.rest-catalog.warehouse=s3://warehouse/
# Trino の native S3 filesystem を有効化。
fs.native-s3.enabled=true
# S3互換ストレージのエンドポイント。ここではMinIOを指定。
s3.endpoint=http://minio:9000
# S3リージョン。MinIO 利用時も値が必要なため固定値を指定。
s3.region=us-east-1
# MinIOへ接続するアクセスキー。
s3.aws-access-key=admin
# MinIOへ接続するシークレットキー。
s3.aws-secret-key=password
# MinIO互換のため path-style アクセスを有効化。
s3.path-style-access=true
dbt は直接 Iceberg REST に接続するのではなく、Trino に接続します。だから Trino 側に「Iceberg をどう扱うか」を教える必要があります。
connector.name=iceberg は Iceberg コネクタ利用、iceberg.catalog.type=rest は REST Catalog 採用、s3.path-style-access=true は MinIO 互換設定です。1.
python-app が Trino に SQL (CREATE SCHEMA, CREATE TABLE, INSERT) を送信2. Trino は
iceberg.properties の設定に基づき、メタデータ操作 (テーブル定義・スキーマ情報) を iceberg-rest:8181 に委譲3. 実際のデータファイル (Parquet) の読み書きは Trino が直接 MinIO に行う (
s3.endpoint=http://minio:9000)4.
iceberg-rest は MinIO 上のどこにデータを置くかを指示するだけで、データ転送自体は Trino ↔ MinIO 間で完結
Python コンテナと dbt コンテナを用意する
異なるファイルは別々に保存してください。以下は今のプロジェクトの実内容そのままです。
docker/Dockerfile.python
# python用のDockerfile
FROM python:3.11-slim
WORKDIR /app
RUN apt-get update && apt-get install -y \
curl \
&& rm -rf /var/lib/apt/lists/*
RUN pip install --no-cache-dir \
pandas \
pyarrow \
boto3 \
trino
CMD ["tail", "-f", "/dev/null"]
docker/Dockerfile.dbt
# dbt用のDockerfile
FROM python:3.11-slim
WORKDIR /usr/app/dbt
RUN apt-get update && apt-get install -y \
git \
&& rm -rf /var/lib/apt/lists/*
RUN pip install --no-cache-dir \
dbt-trino \
dbt-metricflow[trino]
CMD ["tail", "-f", "/dev/null"]
Python 側はデータ生成と取り込みを担当し、dbt 側はモデル実行・test・docs・semantic layer を担当します。責務を分けると学習しやすくなります。
dbt-trino は Trino アダプタ、dbt-metricflow[trino] は semantic layer / metrics 利用のためのパッケージです。サンプルデータを作る
scripts/generate_data.py
# ダミーデータを生成するスクリプト
import os
import numpy as np
import pandas as pd
# ディレクトリ作成
os.makedirs("/app/data", exist_ok=True)
# 1. ユーザーデータ (CSV)
users = pd.DataFrame(
{
"user_id": range(1, 101),
"name": [f"User_{i}" for i in range(1, 101)],
"email": [f"user_{i}@example.com" for i in range(1, 101)],
"signup_date": pd.to_datetime(pd.date_range(start="2023-01-01", periods=100)),
}
)
users.to_csv("/app/data/users.csv", index=False)
# 2. 商品データ (Parquet)
products = pd.DataFrame(
{
"product_id": range(1, 21),
"product_name": [f"Product_{i}" for i in range(1, 21)],
"category": np.random.choice(["Electronics", "Clothing", "Home", "Books"], 20),
"price": np.random.uniform(10, 500, 20).round(2),
}
)
products.to_parquet("/app/data/products.parquet", index=False)
# 3. 注文データ (Parquet)
orders = pd.DataFrame(
{
"order_id": range(1001, 1101),
"user_id": np.random.randint(1, 101, 100),
"product_id": np.random.randint(1, 21, 100),
"order_date": pd.to_datetime(pd.date_range(start="2023-06-01", periods=100)),
"quantity": np.random.randint(1, 5, 100),
}
)
orders.to_parquet("/app/data/orders.parquet", index=False)
このプロジェクトでは users は CSV、products と orders は Parquet で作っています。
複数形式を混ぜることで、取り込み処理の違いを学べます。
最初から本番データを使わず、再生成できるダミーデータを用意すると学習と検証が何度でもできます。
/app/data はホストの ./data をマウントした場所です。コンテナで作ったファイルをホストでも確認できます。raw データを Iceberg テーブルとして取り込む
scripts/ingest_data.py
# /app/scripts/generate_data.pyで生成したデータを
# Icebergテーブルとして取り込むスクリプト
import time
import pandas as pd
from trino.dbapi import connect
# Trino設定
MAX_RETRIES = 30
RETRY_DELAY = 10 # seconds
conn = None
for i in range(MAX_RETRIES):
try:
conn = connect(
host="trino",
port=8080,
user="admin",
catalog="iceberg",
)
print("Successfully connected to Trino")
break
except Exception as e:
print(f"Failed to connect to Trino: {e}")
time.sleep(RETRY_DELAY)
if not conn:
raise Exception("Failed to connect to Trino after multiple retries.")
cur = conn.cursor()
def create_schema():
print("Creating schemas if not exists...")
cur.execute("CREATE SCHEMA IF NOT EXISTS iceberg.datalake")
cur.execute("CREATE SCHEMA IF NOT EXISTS iceberg.dwh")
cur.execute("CREATE SCHEMA IF NOT EXISTS iceberg.mart")
print("Schemas created.")
def ingest_csv(file_path, table_name):
print(f"Ingesting CSV: {file_path} into iceberg.datalake.{table_name}")
df = pd.read_csv(file_path, parse_dates=["signup_date"])
cols = [
"user_id BIGINT",
"name VARCHAR",
"email VARCHAR",
"signup_date TIMESTAMP(6)",
]
cur.execute(f"DROP TABLE IF EXISTS iceberg.datalake.{table_name}")
cur.execute(
f"CREATE TABLE iceberg.datalake.{table_name} ({', '.join(cols)}) WITH (format = 'PARQUET')"
)
for _, row in df.iterrows():
cur.execute(
f"INSERT INTO iceberg.datalake.{table_name} VALUES ({row['user_id']}, '{row['name']}', '{row['email']}', TIMESTAMP '{row['signup_date']}')"
)
print(f"Table iceberg.datalake.{table_name} ingested successfully.")
def ingest_parquet(file_path, table_name, date_cols=[]):
print(f"Ingesting Parquet: {file_path} into iceberg.datalake.{table_name}")
df = pd.read_parquet(file_path)
if table_name == "products":
cols = [
"product_id BIGINT",
"product_name VARCHAR",
"category VARCHAR",
"price DOUBLE",
]
elif table_name == "orders":
cols = [
"order_id BIGINT",
"user_id BIGINT",
"product_id BIGINT",
"order_date TIMESTAMP(6)",
"quantity BIGINT",
]
cur.execute(f"DROP TABLE IF EXISTS iceberg.datalake.{table_name}")
cur.execute(
f"CREATE TABLE iceberg.datalake.{table_name} ({', '.join(cols)}) WITH (format = 'PARQUET')"
)
for _, row in df.iterrows():
if table_name == "products":
cur.execute(
f"INSERT INTO iceberg.datalake.products VALUES ({row['product_id']}, '{row['product_name']}', '{row['category']}', {row['price']})"
)
elif table_name == "orders":
cur.execute(
f"INSERT INTO iceberg.datalake.orders VALUES ({row['order_id']}, {row['user_id']}, {row['product_id']}, TIMESTAMP '{row['order_date']}', {row['quantity']})"
)
print(f"Table iceberg.datalake.{table_name} ingested successfully.")
if __name__ == "__main__":
create_schema()
ingest_csv("/app/data/users.csv", "users")
ingest_parquet("/app/data/products.parquet", "products")
ingest_parquet("/app/data/orders.parquet", "orders")
print("All data ingested successfully.")
cur.close()
conn.close()
dbt は変換担当であり、raw データの初回ロード担当にしない設計もよくあります。この教材では Python が ingestion を担当します。
iceberg.datalake.users は「catalog.schema.table」です。WITH (format = 'PARQUET') は Iceberg テーブルのデータファイル形式です。datalake は取り込んだ元データ、dwh は整形済みの分析基盤、mart は用途別集計です。
iceberg.datalake.{table_name} の命名
- Trinoの階層は カタログ.スキーマ.テーブル の3層構造
- MinIOのwarehouseバケット配下のトップディレクトリは、"スキーマ"が使われる
- テーブルはそのディレクトリ配下の"テーブル"(これもMinIOではディレクトリ)として使われる。そしてその配下のディレクトリの中には、データ実態や各種メタデータが保存されている。
| 部分 | Trino上の概念 | 実体 |
| :--- | :--- | :--- |
| **カタログ** | iceberg | `trino/catalog/iceberg.properties` のファイル名 |
| **スキーマ** | datalake | MinIO の `warehouse/datalake/` フォルダ |
| **テーブル** | {table_name} | `warehouse/datalake/{table_name}/` フォルダ配下のParquetファイル群 |
dbt プロジェクトの土台を作る
name: 'mds_tutorial_by_ken' # プロジェクト名です。ログ表示、成果物メタ情報、モデル設定の名前空間に使われる
version: '1.0.0'
config-version: 2
profile: 'mds_tutorial_by_ken' # dbt プロフィール名。profiles.yml 内の接続設定を参照する際のキーに対応。 ここで指定した名前の接続設定を dbt が使う
model-paths: ['models']
analysis-paths: ['analyses']
test-paths: ['tests']
seed-paths: ['seeds']
macro-paths: ['macros']
snapshot-paths: ['snapshots']
target-path: 'target' # dbt 実行結果(コンパイル成果物、manifest など)の出力先ディレクトリ
clean-targets: # dbt clean 実行時に削除するディレクトリ一覧
- 'target'
- 'dbt_modules'
# モデルに対するデフォルト設定
models:
mds_tutorial_by_ken: # この配下設定が適用されるプロジェクト名(名前空間). nameと同じ名前にしておく.
+materialized: table # デフォルトでモデルを table として作成する設定。dbt はモデルを view として作成するのがデフォルトなので、ここで table に変更している。
datalake:
+schema: datalake # models/datalake 配下モデルのスキーマを datalake にします
dwh:
+schema: dwh # models/dwh 配下モデルのスキーマを dwh にします
mart:
+schema: mart # models/mart 配下モデルのスキーマを mart にします
ディレクトリ名とスキーマ名を対応させると、学習時に「このモデルはどこにできるのか」が追いやすくなります。
+materialized: table はモデルを table で作る指定、profile は接続情報の参照先、+schema は出力先スキーマです。dbt から Trino に接続する
mds_tutorial_by_ken: # プロフィール名。dbt_project.yml の profile キーで指定した名前と同じにしておく必要がある
target: dev # デフォルトのターゲットを dev に設定。profiles.yml 内の dev ターゲットの接続設定が使われる
config:
send_anonymous_usage_stats: false # 匿名の使用統計情報の送信を無効化
outputs:
dev: # ターゲット名。ここでは dev という名前のターゲットを定義している。
type: trino # データベースの種類。ここでは Trino を指定している。
host: trino # データベースホスト。Trino のホスト名を指定している。Dockerのネットワーク設定により、Compose サービス名である trino をホスト名として指定できる。
port: 8080 # データベースポート。Trino のデフォルトポートを指定している。
user: admin # データベースユーザー名。admin を指定している。
catalog: iceberg # Trino カタログ名。iceberg を指定している。前提として、Trino の catalog ディレクトリに iceberg.properties というファイルを作成し、Iceberg REST Catalog サーバーへの接続設定を記述している前提。
schema: mart # データベーススキーマ。mart を指定している。
threads: 1
# password: password # データベースパスワード。password を指定している。
# dbname: mds_tutorial_by_ken_dev # データベース名。mds_tutorial_by_ken_dev を指定している。
# schema: public # スキーマ名。public を指定している。
dbt は Trino に接続し、その先で Iceberg テーブルを操作します。Trino を共通入口にすることで SQL 実行先が一貫します。
type: trino はアダプタ、catalog: iceberg は Trino 上の Iceberg catalog、schema: mart はデフォルト schema です。schema 名をきれいに制御する
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- if custom_schema_name is none -%}
{{ target.schema }}
{%- else -%}
{{ custom_schema_name | trim }}
{%- endif -%}
{%- endmacro %}
dbt の標準動作では
schema 名に target 名などが結合されることがあります。教材では dwh や mart をそのまま使った方が理解しやすいので上書きします。custom_schema_name があるときはそれをそのまま返すため、+schema: dwh なら出力先が素直に iceberg.dwh になります。dbtでマクロを使う理由
dbtは dbt_project.yml で +schema: dwh のようにモデルごとのスキーマを指定できます。 しかしデフォルトの挙動では、指定したスキーマ名はそのまま使われず、profiles.yml の schema(outputs.dev.schema)と連結されてしまいます。今の設定だと、outputs.dev.schema=martなので、dwhのスキーマは、mart_dwhとなってしまいます.
デフォルト挙動の例
| profiles.yml の schema | dbt_project.yml の +schema | 実際に作られるスキーマ |
| :--- | :--- | :--- |
| `mart` | `dwh` | `mart_dwh` |
| `mart` | `mart` | `mart_mart` |
| `mart` | 指定なし | `mart` |
なぜこのデフォルト挙動になっているのか
複数の開発者が同じ Trino/BigQuery 上で同時開発するとき、名前が衝突しないようにするためです。
Alice (target.schema=alice) → alice_dwh, alice_mart
Bob (target.schema=bob) → bob_dwh, bob_mart
本番 (target.schema=prod) → prod_dwh, prod_mart
この設計により、開発者ごとに独立したスキーマが自動的に用意されます。
マクロが必要になる場面
チュートリアルや本番環境のように「スキーマ名をそのまま使いたい」場合は、 dbt 組み込みの generate_schema_name マクロを macros/ 配下で上書きする必要があります。
まとめ
- dbt に
+schemaの設定だけ書いても連結されてしまうのは仕様 - 「
+schemaに書いた名前をそのまま使いたい」=generate_schema_nameマクロの上書きがセット - この2つがワンセットになっているのが dbt でのスキーマ管理の標準パターン
source を定義して raw テーブルを参照する
version: 2
sources:
# Trinoの階層は カタログ.スキーマ.テーブル となっており、今回は、iceberg.{MinIOのフォルダ名}.{テーブル}だから以下のような設定値になる
- name: raw_data
database: iceberg # Trino カタログ名
schema: datalake # Trino スキーマ名
tables:
- name: users
- name: products
- name: orders
モデルからは {{ source('raw_data', 'users') }} のように参照します。
raw テーブルを直接文字列で書くより、source 定義を 1 か所に集めた方が保守しやすく、dbt docs にも反映されます。
database: iceberg は Trino catalog、schema: datalake は raw 層です。DWH モデルを作る
3 つの SQL ファイルは別々です。別ファイルとして保存してください。
dbt/models/dwh/dim_users.sql
SELECT
user_id
, name
, email
, signup_date
FROM
{{ source('raw_data', 'users') }}
dbt/models/dwh/dim_products.sql
SELECT
product_id
, product_name
, category
, price
FROM
{{ source('raw_data', 'products') }}
dbt/models/dwh/fct_orders.sql
SELECT
o.order_id
, o.user_id
, o.product_id
, o.order_date
, o.quantity
, (o.quantity * p.price) AS total_price
FROM {{ source('raw_data', 'orders') }} as o
LEFT JOIN {{ ref('dim_products') }} as p
ON o.product_id = p.product_id
dim は属性を持つ次元表、fct は数値計算の中心になるファクト表です。分析しやすい形に raw データを変換します。source() は外部テーブル参照、ref() は dbt モデル間参照です。ref() を使うと依存関係が dbt に伝わります。dbt test と time spine を追加する
dbt/models/dwh/schema.yml
version: 2
models:
- name: dim_users
columns:
- name: user_id
tests: [unique, not_null]
- name: dim_products
columns:
- name: product_id
tests: [unique, not_null]
- name: fct_orders
columns:
- name: order_id
tests: [unique, not_null]
- name: user_id
tests: [not_null]
- name: product_id
tests: [not_null]
- name: _time_spine
columns:
- name: date_day
data_type: date
granularity: day
time_spine:
standard_granularity_column: date_day
dbt/models/dwh/_time_spine.sql
{{
config(
materialized='table'
)
}}
SELECT
sales_date AS date_day
, EXTRACT(YEAR FROM sales_date) AS year
, EXTRACT(QUARTER FROM sales_date) AS quarter
, EXTRACT(MONTH FROM sales_date) AS month
, EXTRACT(DAY FROM sales_date) AS day_of_month
, EXTRACT(DOW FROM sales_date) AS day_of_week
FROM {{ ref('daily_sales') }}
ORDER BY sales_date
dbt test は「この列は null であってはいけない」「主キーは重複してはいけない」を機械的に検証します。time spine は時系列分析の基準軸として semantic layer でも重要です。
tests: [unique, not_null] は組み込みテスト、granularity: day は日粒度、standard_granularity_column は基準日付列を示します。Semantic Layer と Mart を追加する
Mart の SQL と semantic layer の YAML も別ファイルです。
dbt/models/mart/daily_sales.sql
SELECT
CAST(order_date AS DATE) AS sales_date
, SUM(total_price) AS total_daily_sales
FROM {{ ref('fct_orders') }}
GROUP BY 1
ORDER BY 1
dbt/models/dwh/semantic_layer.yml
# ===================================
# このsemantic_layerの定義は、
# SQLで書くと以下のクエリと同等の意味を持つ。
# -----------------------------------
# select
# order_date,
# sum(total_price) as total_sales
# from fct_orders
# group by order_date
# ===================================
version: 2
semantic_models:
- name: fct_orders
model: ref('fct_orders')
entities:
- name: order_id
type: primary
- name: user_id
type: foreign
- name: product_id
type: foreign
dimensions:
- name: order_date
type: time
type_params: {time_granularity: day}
measures:
- name: total_sales
agg: sum
expr: total_price
agg_time_dimension: order_date
metrics:
- name: total_sales
label: Total Sales
type: simple
type_params:
measure: total_sales
Mart は用途別の集計表、semantic layer は「売上」という意味を共通定義する層です。同じ数字を別の人が別の SQL で計算してズレるのを防ぎます。
entities は主語、dimensions は切り口、measures は集計元、metrics は利用者向けの完成形です。実行方法
教材を読みながら手を動かすなら、以下の順番で実行すると全体像がつながります。
# 1. 基盤を起動
docker compose -f docker/docker-compose.yaml up -d
# 2. サンプルデータを生成
docker compose -f docker/docker-compose.yaml exec python-app python /app/scripts/generate_data.py
# 3. datalake 層に取り込み
docker compose -f docker/docker-compose.yaml exec python-app python /app/scripts/ingest_data.py
# 4. dbt で変換
docker compose -f docker/docker-compose.yaml exec dbt dbt run --project-dir /usr/app/dbt --profiles-dir /usr/app/dbt
# 5. dbt test
docker compose -f docker/docker-compose.yaml exec dbt dbt test --project-dir /usr/app/dbt --profiles-dir /usr/app/dbt
# 6. dbt docs
docker compose -f docker/docker-compose.yaml exec dbt dbt docs generate --project-dir /usr/app/dbt --profiles-dir /usr/app/dbt
docker compose -f docker/docker-compose.yaml exec dbt dbt docs serve --project-dir /usr/app/dbt --profiles-dir /usr/app/dbt --port 8081
warehouse バケットにファイルが増えること。iceberg.datalake、iceberg.dwh、iceberg.mart が見えること。dbt test が通ること。dbt docs に source, model, test の依存関係が表示されること。dbt test と documentation 機能をどう理解すればよいか
SQL が動くだけでは品質保証になりません。unique と not_null を入れると、主キーの重複や欠損をすぐ検知できます。
この教材では、dim_users.user_id、dim_products.product_id、fct_orders.order_id を主キー候補として検証しています。
source、model、test、semantic 定義のつながりを視覚化できます。初学者にとっては、SQL を読むより先に依存関係を俯瞰できるのが大きな利点です。
source から dim/fact に流れ、そこから mart や time spine に接続される構造を確認してください。
この構成で学べる設計の考え方
raw をそのまま分析せず、datalake → dwh → mart と段階を分けることで責務が明確になります。
MinIO に保存し、Trino が計算し、dbt が変換を管理します。役割を分けるのがモダンデータスタックの基本です。
semantic layer により「total_sales は total_price の合計」という定義を再利用できます。
time spine を置くと、時系列分析の粒度や欠損日の扱いを統一しやすくなります。
最後に見るべきポイント
source()、dbt モデル同士は ref()。iceberg.dwh.fct_orders のように読む。