Hands-on Tutorial

dbt + Iceberg + Semantic Layer を自分で組み立てる教材

この HTML は、このプロジェクトの実装をもとに、初学者が dbtApache Icebergsemantic layertime spinedbt testdbt docs を理解しながらアーキテクチャを再現できるようにした学習用ガイドです。

この教材のゴール

1. 何を作っているか分かる

MinIO に保存された Iceberg テーブルを Trino から読み書きし、dbt で DWH と Mart を作る構成を理解します。

2. なぜその設定か分かる

catalogschemawarehousesourceref などの意味を、実際の設定値に沿って学びます。

3. 自分で再現できる

どのファイルを、どの順番で追加すればよいかを、コピペしやすいコード付きで追えるようにしています。

4. dbt の応用まで触れる

testsdocumentationsemantic layertime spine を一つの流れで理解します。

どんなアーキテクチャなのか

このプロジェクトは、データの保存場所メタデータ管理SQL 実行エンジン変換ロジック を分離したモダンデータスタックです。

Python

サンプルデータを生成し、Trino に SQL を投げて Iceberg テーブルを初期作成します。

Trino

クエリエンジンです。dbt と Python は Trino に接続して SQL を実行します。

Iceberg REST Catalog

テーブル定義やスナップショットなどのメタデータを管理します。

PostgreSQL

Iceberg Catalog の JDBC バックエンドとしてメタデータ保存に使います。

MinIO

Parquet ファイルや Iceberg のメタデータファイルの実体置き場です。

dbt

datalake を source として読み、dwhmart を構築します。

データの流れ: Python が raw データを作る → Trino 経由で Iceberg テーブル化 → dbt が source を参照して DWH/Mart を作る → test / docs / semantic layer で活用しやすくする
重要な整理
MinIO は「データの保存先」、Iceberg REST + PostgreSQL は「テーブル定義の管理」、Trino は「SQL の実行役」、dbt は「変換ロジックの管理役」です。
オープンテーブルフォーマット(Iceberg)の嬉しさ
Iceberg の強みは、データファイルの形式とメタデータの持ち方が標準化されていることです。だから Trino だけでなく、Spark のような別エンジンからも同じテーブルを扱えます。

共有カタログとは何か?

カタログとは「どんなテーブルが存在して、そのデータがどこにあるか」を記録した台帳です。

iceberg-rest がない場合、各エンジンが独自に台帳を持ちます。

Trino  → 自分だけのメタデータ管理 → "usersテーブルはs3://warehouse/datalake/users/v1/にある"
Spark  → 自分だけのメタデータ管理 → (Trinoが作ったusersテーブルを知らない)
dbt    → Trinoを経由するので見える

iceberg-rest があると、全エンジンが同じ台帳を参照します。

Trino  ─┐
Spark  ─┼─→ iceberg-rest (共有カタログ) → MinIO上のデータ
dbt    ─┘

何が嬉しいのか?

なぜ異なるエンジンでクエリ実行できるのか?

ポイントは データフォーマット (Parquet) とメタデータ仕様 (Iceberg) が標準化されている からです。

Trino が CREATE TABLE → Parquetファイルを MinIO に保存
                      → "このファイルがusersテーブル" と iceberg-rest に登録
Spark が SELECT * FROM users
  → iceberg-rest に「usersテーブルのデータはどこ?」と問い合わせ
  → "s3://warehouse/datalake/users/..." と返ってくる
  → Spark が直接 MinIO から Parquet を読む

各エンジンは Parquetの読み方 と Iceberg REST APIの叩き方 さえ知っていれば、お互いのデータを読み書きできます。Icebergはその「共通言語」の役割を果たしています。

まず押さえる 5 つの用語

Iceberg

テーブルフォーマットです。Parquet の集まりを単なるファイル置き場ではなく、テーブルとして安全に扱えるようにします。

Catalog

「どのテーブルがどこにあるか」を管理する仕組みです。このプロジェクトでは Trino の iceberg catalog を使います。

Schema

テーブルのまとまりです。ここでは datalakedwhmart に役割分担しています。

Semantic Layer

メトリクスやエンティティを共通定義する層です。分析者が毎回 SQL を手書きしなくてもよくなります。

Time Spine

日付軸の基準表です。日別・月別などの時系列集計を安定して行うために使います。

最終的なディレクトリ構成

modern-data-stack/
├── data/
│   ├── orders.parquet
│   ├── products.parquet
│   └── users.csv
├── dbt/
│   ├── dbt_project.yml
│   ├── profiles.yml
│   ├── macros/
│   │   └── generate_schema_name.sql
│   ├── models/
│   │   ├── datalake/
│   │   │   └── sources.yml
│   │   ├── dwh/
│   │   │   ├── _time_spine.sql
│   │   │   ├── dim_products.sql
│   │   │   ├── dim_users.sql
│   │   │   ├── fct_orders.sql
│   │   │   ├── schema.yml
│   │   │   └── semantic_layer.yml
│   │   └── mart/
│   │       └── daily_sales.sql
│   └── target/
├── docker/
│   ├── docker-compose.yaml
│   ├── Dockerfile.dbt
│   └── Dockerfile.python
├── scripts/
│   ├── generate_data.py
│   └── ingest_data.py
├── trino/
│   └── catalog/
│       └── iceberg.properties
└── index.html

target/ は dbt の実行後に生成される成果物です。初学者が手で作るべきなのは、それ以外の設定ファイルと SQL ファイルです。

ステップバイステップで再現する

以下は「このプロジェクトを自分でゼロから組み立てるならこの順番」という流れです。

Step 1

Docker Compose で全体構成を作る

追加ファイル: docker/docker-compose.yaml 目的: 基盤の起動

最初に、各コンポーネントがどう連携するかを docker-compose.yaml にまとめます。以下は今のプロジェクトで実際に使っている内容そのままです。

services:
  postgres:
    image: postgres:16
    container_name: postgres
    ports:
      - 5432:5432
    environment:
      POSTGRES_USER: iceberg
      POSTGRES_PASSWORD: iceberg
      POSTGRES_DB: iceberg_catalog
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U iceberg -d iceberg_catalog"]
      interval: 10s
      timeout: 5s
      retries: 10

  minio:
    image: minio/minio
    container_name: minio
    ports:
      - 9000:9000 # MinIO の S3 API エンドポイント (アプリや Trino/Iceberg からオブジェクトを読み書きする)
      - 9001:9001 # MinIO の管理コンソール UI (ブラウザでバケットやオブジェクトを操作する)
    environment:
      MINIO_ROOT_USER: admin
      MINIO_ROOT_PASSWORD: password
    command: server /data --console-address ":9001"
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
      interval: 30s
      timeout: 20s
      retries: 3

  # MinIO Client(mc) で初期化処理を行う一時コンテナ。
  # MinIO 起動待ち -> warehouse バケット作成 -> ポリシー設定を実行して終了する。
  mc:
    image: minio/mc
    container_name: mc
    depends_on:
      minio:
        condition: service_healthy
    # MinIO が起動して S3 API が利用可能になるまで待機してから、mc コマンドでバケット作成とポリシー設定を行う。
    # 今回作成しているバケットは、warehouse という名前のバケット。
    entrypoint: >
      /bin/sh -c "
      until (/usr/bin/mc alias set myminio http://minio:9000 admin password) do echo '...waiting for minio...' && sleep 1; done;
      /usr/bin/mc mb --ignore-existing myminio/warehouse;
      /usr/bin/mc anonymous set public myminio/warehouse;
      exit 0;
      "

  iceberg-rest:
    image: tabulario/iceberg-rest # Iceberg の REST Catalog サーバー。Trino/dbt はこの REST API 経由で Iceberg メタデータを操作する。
    container_name: iceberg-rest
    ports:
      - 8181:8181
    volumes:
      - ../data/iceberg-rest:/tmp/iceberg-rest
    environment: # Iceberg REST Catalog の保存先(S3)とメタデータ管理方式を定義
      - CATALOG_WAREHOUSE=s3://warehouse/ # データファイル/メタデータのルートパス(warehouse バケット配下)
      - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO # S3(AWS SDK)でファイル入出力する実装クラス
      - AWS_REGION=us-east-1 # Iceberg REST 内部の AWS SDK が参照するリージョン
      - CATALOG_S3_ENDPOINT=http://minio:9000 # S3 互換ストレージの接続先(MinIO)
      - CATALOG_S3_ACCESS__KEY__ID=admin # MinIO アクセスキー
      - CATALOG_S3_SECRET__ACCESS__KEY=password # MinIO シークレットキー
      - CATALOG_S3_PATH__STYLE__ACCESS=true # MinIO 向けに path-style アクセスを有効化
      - CATALOG_CATALOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog # メタデータを JDBC カタログで管理する実装
      - CATALOG_URI=jdbc:postgresql://postgres:5432/iceberg_catalog # PostgreSQL カタログ DB
      - CATALOG_JDBC_USER=iceberg # PostgreSQL ユーザー
      - CATALOG_JDBC_PASSWORD=iceberg # PostgreSQL パスワード
    depends_on:
      minio:
        condition: service_healthy
      postgres:
        condition: service_healthy

  trino:
    image: trinodb/trino:443
    container_name: trino
    ports:
      - 8080:8080
    volumes:
      - ../trino/catalog:/etc/trino/catalog
    depends_on:
      - iceberg-rest

  python-app:
    build:
      context: ../docker
      dockerfile: Dockerfile.python
    container_name: python-app
    volumes:
      - ../data:/app/data
      - ../scripts:/app/scripts
    environment:
      - AWS_ACCESS_KEY_ID=admin
      - AWS_SECRET_ACCESS_KEY=password
      - AWS_REGION=us-east-1
      - S3_ENDPOINT=http://minio:9000
    depends_on:
      - minio

  dbt:
    build:
      context: ../docker
      dockerfile: Dockerfile.dbt
    container_name: dbt
    ports:
      - 8081:8081
    volumes:
      - ../dbt:/usr/app/dbt
    environment: # dbt-trino 接続先設定
      - DBT_TRINO_HOST=trino # Trino サーバーのホスト名(Compose サービス名)
      - DBT_TRINO_PORT=8080 # Trino の HTTP ポート
      - DBT_TRINO_USER=admin # Trino に接続するユーザー名
      - DBT_TRINO_CATALOG=iceberg # 利用する Trino カタログ名
      - DBT_TRINO_SCHEMA=mart # dbt が作成/参照するデフォルトスキーマ
    depends_on:
      - trino
なぜこの設定か
Iceberg は単体では完結しません。保存先のオブジェクトストレージ、メタデータ管理、SQL 実行エンジンが必要なので、Compose で一式をまとめます。
設定値の意味
POSTGRES_DB=iceberg_catalog は Iceberg Catalog 用 DB 名、CATALOG_WAREHOUSE=s3://warehouse/ は MinIO 上の保存ルートです。
Step 2

Trino から Iceberg を見えるようにする

追加ファイル: trino/catalog/iceberg.properties 目的: Trino と Iceberg の接続

trino/catalog/iceberg.properties

# TrinoがIcebergテーブルに接続するための設定ファイル
# 使用するTrinoコネクタ種別。Icebergテーブルを扱うため `iceberg` を指定。
connector.name=iceberg

# Icebergカタログの実装方式。REST Catalogサーバーを使うため `rest` を指定。
iceberg.catalog.type=rest

# Iceberg REST Catalogの接続先URL。
iceberg.rest-catalog.uri=http://iceberg-rest:8181

# Icebergのデータ/メタデータを保存するルート(warehouse)。
iceberg.rest-catalog.warehouse=s3://warehouse/

# Trino の native S3 filesystem を有効化。
fs.native-s3.enabled=true

# S3互換ストレージのエンドポイント。ここではMinIOを指定。
s3.endpoint=http://minio:9000

# S3リージョン。MinIO 利用時も値が必要なため固定値を指定。
s3.region=us-east-1

# MinIOへ接続するアクセスキー。
s3.aws-access-key=admin

# MinIOへ接続するシークレットキー。
s3.aws-secret-key=password

# MinIO互換のため path-style アクセスを有効化。
s3.path-style-access=true
なぜこの設定か
dbt は直接 Iceberg REST に接続するのではなく、Trino に接続します。だから Trino 側に「Iceberg をどう扱うか」を教える必要があります。
設定値の意味
connector.name=iceberg は Iceberg コネクタ利用、iceberg.catalog.type=rest は REST Catalog 採用、s3.path-style-access=true は MinIO 互換設定です。
データのインジェスチョン周りの理解
1. python-app が Trino に SQL (CREATE SCHEMA, CREATE TABLE, INSERT) を送信
2. Trino は iceberg.properties の設定に基づき、メタデータ操作 (テーブル定義・スキーマ情報) を iceberg-rest:8181 に委譲
3. 実際のデータファイル (Parquet) の読み書きは Trino が直接 MinIO に行う (s3.endpoint=http://minio:9000)
4. iceberg-rest は MinIO 上のどこにデータを置くかを指示するだけで、データ転送自体は Trino ↔ MinIO 間で完結
Step 3

Python コンテナと dbt コンテナを用意する

追加ファイル: docker/Dockerfile.python 追加ファイル: docker/Dockerfile.dbt

異なるファイルは別々に保存してください。以下は今のプロジェクトの実内容そのままです。

docker/Dockerfile.python

# python用のDockerfile
FROM python:3.11-slim

WORKDIR /app

RUN apt-get update && apt-get install -y \
    curl \
    && rm -rf /var/lib/apt/lists/*

RUN pip install --no-cache-dir \
    pandas \
    pyarrow \
    boto3 \
    trino

CMD ["tail", "-f", "/dev/null"]

docker/Dockerfile.dbt

# dbt用のDockerfile
FROM python:3.11-slim

WORKDIR /usr/app/dbt

RUN apt-get update && apt-get install -y \
    git \
    && rm -rf /var/lib/apt/lists/*

RUN pip install --no-cache-dir \
    dbt-trino \
    dbt-metricflow[trino]

CMD ["tail", "-f", "/dev/null"]
なぜこの設定か
Python 側はデータ生成と取り込みを担当し、dbt 側はモデル実行・test・docs・semantic layer を担当します。責務を分けると学習しやすくなります。
設定値の意味
dbt-trino は Trino アダプタ、dbt-metricflow[trino] は semantic layer / metrics 利用のためのパッケージです。
Step 4

サンプルデータを作る

追加ファイル: scripts/generate_data.py 目的: raw データ作成

scripts/generate_data.py

# ダミーデータを生成するスクリプト
import os

import numpy as np
import pandas as pd

# ディレクトリ作成
os.makedirs("/app/data", exist_ok=True)

# 1. ユーザーデータ (CSV)
users = pd.DataFrame(
    {
        "user_id": range(1, 101),
        "name": [f"User_{i}" for i in range(1, 101)],
        "email": [f"user_{i}@example.com" for i in range(1, 101)],
        "signup_date": pd.to_datetime(pd.date_range(start="2023-01-01", periods=100)),
    }
)
users.to_csv("/app/data/users.csv", index=False)

# 2. 商品データ (Parquet)
products = pd.DataFrame(
    {
        "product_id": range(1, 21),
        "product_name": [f"Product_{i}" for i in range(1, 21)],
        "category": np.random.choice(["Electronics", "Clothing", "Home", "Books"], 20),
        "price": np.random.uniform(10, 500, 20).round(2),
    }
)
products.to_parquet("/app/data/products.parquet", index=False)

# 3. 注文データ (Parquet)
orders = pd.DataFrame(
    {
        "order_id": range(1001, 1101),
        "user_id": np.random.randint(1, 101, 100),
        "product_id": np.random.randint(1, 21, 100),
        "order_date": pd.to_datetime(pd.date_range(start="2023-06-01", periods=100)),
        "quantity": np.random.randint(1, 5, 100),
    }
)
orders.to_parquet("/app/data/orders.parquet", index=False)

このプロジェクトでは users は CSV、productsorders は Parquet で作っています。 複数形式を混ぜることで、取り込み処理の違いを学べます。

なぜこの設定か
最初から本番データを使わず、再生成できるダミーデータを用意すると学習と検証が何度でもできます。
設定値の意味
/app/data はホストの ./data をマウントした場所です。コンテナで作ったファイルをホストでも確認できます。
Step 5

raw データを Iceberg テーブルとして取り込む

追加ファイル: scripts/ingest_data.py 目的: datalake 層の初期化

scripts/ingest_data.py

# /app/scripts/generate_data.pyで生成したデータを
# Icebergテーブルとして取り込むスクリプト
import time

import pandas as pd
from trino.dbapi import connect

# Trino設定
MAX_RETRIES = 30
RETRY_DELAY = 10  # seconds

conn = None
for i in range(MAX_RETRIES):
    try:
        conn = connect(
            host="trino",
            port=8080,
            user="admin",
            catalog="iceberg",
        )
        print("Successfully connected to Trino")
        break
    except Exception as e:
        print(f"Failed to connect to Trino: {e}")
        time.sleep(RETRY_DELAY)

if not conn:
    raise Exception("Failed to connect to Trino after multiple retries.")


cur = conn.cursor()


def create_schema():
    print("Creating schemas if not exists...")
    cur.execute("CREATE SCHEMA IF NOT EXISTS iceberg.datalake")
    cur.execute("CREATE SCHEMA IF NOT EXISTS iceberg.dwh")
    cur.execute("CREATE SCHEMA IF NOT EXISTS iceberg.mart")
    print("Schemas created.")


def ingest_csv(file_path, table_name):
    print(f"Ingesting CSV: {file_path} into iceberg.datalake.{table_name}")
    df = pd.read_csv(file_path, parse_dates=["signup_date"])

    cols = [
        "user_id BIGINT",
        "name VARCHAR",
        "email VARCHAR",
        "signup_date TIMESTAMP(6)",
    ]

    cur.execute(f"DROP TABLE IF EXISTS iceberg.datalake.{table_name}")
    cur.execute(
        f"CREATE TABLE iceberg.datalake.{table_name} ({', '.join(cols)}) WITH (format = 'PARQUET')"
    )

    for _, row in df.iterrows():
        cur.execute(
            f"INSERT INTO iceberg.datalake.{table_name} VALUES ({row['user_id']}, '{row['name']}', '{row['email']}', TIMESTAMP '{row['signup_date']}')"
        )
    print(f"Table iceberg.datalake.{table_name} ingested successfully.")


def ingest_parquet(file_path, table_name, date_cols=[]):
    print(f"Ingesting Parquet: {file_path} into iceberg.datalake.{table_name}")
    df = pd.read_parquet(file_path)

    if table_name == "products":
        cols = [
            "product_id BIGINT",
            "product_name VARCHAR",
            "category VARCHAR",
            "price DOUBLE",
        ]
    elif table_name == "orders":
        cols = [
            "order_id BIGINT",
            "user_id BIGINT",
            "product_id BIGINT",
            "order_date TIMESTAMP(6)",
            "quantity BIGINT",
        ]

    cur.execute(f"DROP TABLE IF EXISTS iceberg.datalake.{table_name}")
    cur.execute(
        f"CREATE TABLE iceberg.datalake.{table_name} ({', '.join(cols)}) WITH (format = 'PARQUET')"
    )

    for _, row in df.iterrows():
        if table_name == "products":
            cur.execute(
                f"INSERT INTO iceberg.datalake.products VALUES ({row['product_id']}, '{row['product_name']}', '{row['category']}', {row['price']})"
            )
        elif table_name == "orders":
            cur.execute(
                f"INSERT INTO iceberg.datalake.orders VALUES ({row['order_id']}, {row['user_id']}, {row['product_id']}, TIMESTAMP '{row['order_date']}', {row['quantity']})"
            )
    print(f"Table iceberg.datalake.{table_name} ingested successfully.")


if __name__ == "__main__":
    create_schema()
    ingest_csv("/app/data/users.csv", "users")
    ingest_parquet("/app/data/products.parquet", "products")
    ingest_parquet("/app/data/orders.parquet", "orders")
    print("All data ingested successfully.")
    cur.close()
    conn.close()
なぜこの設定か
dbt は変換担当であり、raw データの初回ロード担当にしない設計もよくあります。この教材では Python が ingestion を担当します。
設定値の意味
iceberg.datalake.users は「catalog.schema.table」です。WITH (format = 'PARQUET') は Iceberg テーブルのデータファイル形式です。
層の役割
datalake は取り込んだ元データ、dwh は整形済みの分析基盤、mart は用途別集計です。

iceberg.datalake.{table_name} の命名

  • Trinoの階層は カタログ.スキーマ.テーブル の3層構造
  • MinIOのwarehouseバケット配下のトップディレクトリは、"スキーマ"が使われる
  • テーブルはそのディレクトリ配下の"テーブル"(これもMinIOではディレクトリ)として使われる。そしてその配下のディレクトリの中には、データ実態や各種メタデータが保存されている。
| 部分 | Trino上の概念 | 実体 |
| :--- | :--- | :--- |
| **カタログ** | iceberg | `trino/catalog/iceberg.properties` のファイル名 |
| **スキーマ** | datalake | MinIO の `warehouse/datalake/` フォルダ |
| **テーブル** | {table_name} | `warehouse/datalake/{table_name}/` フォルダ配下のParquetファイル群 |
Step 6

dbt プロジェクトの土台を作る

追加ファイル: dbt/dbt_project.yml 目的: dbt の全体設定
name: 'mds_tutorial_by_ken' # プロジェクト名です。ログ表示、成果物メタ情報、モデル設定の名前空間に使われる
version: '1.0.0'
config-version: 2

profile: 'mds_tutorial_by_ken' # dbt プロフィール名。profiles.yml 内の接続設定を参照する際のキーに対応。 ここで指定した名前の接続設定を dbt が使う

model-paths: ['models']
analysis-paths: ['analyses']
test-paths: ['tests']
seed-paths: ['seeds']
macro-paths: ['macros']
snapshot-paths: ['snapshots']
target-path: 'target' # dbt 実行結果(コンパイル成果物、manifest など)の出力先ディレクトリ

clean-targets: # dbt clean 実行時に削除するディレクトリ一覧
  - 'target'
  - 'dbt_modules'

# モデルに対するデフォルト設定
models:
  mds_tutorial_by_ken: # この配下設定が適用されるプロジェクト名(名前空間). nameと同じ名前にしておく.
    +materialized: table # デフォルトでモデルを table として作成する設定。dbt はモデルを view として作成するのがデフォルトなので、ここで table に変更している。
    datalake:
      +schema: datalake # models/datalake 配下モデルのスキーマを datalake にします
    dwh:
      +schema: dwh # models/dwh 配下モデルのスキーマを dwh にします
    mart:
      +schema: mart # models/mart 配下モデルのスキーマを mart にします
なぜこの設定か
ディレクトリ名とスキーマ名を対応させると、学習時に「このモデルはどこにできるのか」が追いやすくなります。
設定値の意味
+materialized: table はモデルを table で作る指定、profile は接続情報の参照先、+schema は出力先スキーマです。
Step 7

dbt から Trino に接続する

追加ファイル: dbt/profiles.yml 目的: 接続定義
mds_tutorial_by_ken: # プロフィール名。dbt_project.yml の profile キーで指定した名前と同じにしておく必要がある
  target: dev # デフォルトのターゲットを dev に設定。profiles.yml 内の dev ターゲットの接続設定が使われる
  config:
    send_anonymous_usage_stats: false # 匿名の使用統計情報の送信を無効化
  outputs:
    dev: # ターゲット名。ここでは dev という名前のターゲットを定義している。
      type: trino # データベースの種類。ここでは Trino を指定している。
      host: trino # データベースホスト。Trino のホスト名を指定している。Dockerのネットワーク設定により、Compose サービス名である trino をホスト名として指定できる。
      port: 8080 # データベースポート。Trino のデフォルトポートを指定している。
      user: admin # データベースユーザー名。admin を指定している。
      catalog: iceberg # Trino カタログ名。iceberg を指定している。前提として、Trino の catalog ディレクトリに iceberg.properties というファイルを作成し、Iceberg REST Catalog サーバーへの接続設定を記述している前提。
      schema: mart # データベーススキーマ。mart を指定している。
      threads: 1
      # password: password # データベースパスワード。password を指定している。
      # dbname: mds_tutorial_by_ken_dev # データベース名。mds_tutorial_by_ken_dev を指定している。
      # schema: public # スキーマ名。public を指定している。
なぜこの設定か
dbt は Trino に接続し、その先で Iceberg テーブルを操作します。Trino を共通入口にすることで SQL 実行先が一貫します。
設定値の意味
type: trino はアダプタ、catalog: iceberg は Trino 上の Iceberg catalog、schema: mart はデフォルト schema です。
Step 8

schema 名をきれいに制御する

追加ファイル: dbt/macros/generate_schema_name.sql 目的: dbt のデフォルト schema 命名を上書き
{% macro generate_schema_name(custom_schema_name, node) -%}
    {%- if custom_schema_name is none -%}
        {{ target.schema }}
    {%- else -%}
        {{ custom_schema_name | trim }}
    {%- endif -%}
{%- endmacro %}
なぜこの設定か
dbt の標準動作では schema 名に target 名などが結合されることがあります。教材では dwhmart をそのまま使った方が理解しやすいので上書きします。
設定値の意味
custom_schema_name があるときはそれをそのまま返すため、+schema: dwh なら出力先が素直に iceberg.dwh になります。

dbtでマクロを使う理由

dbtは dbt_project.yml+schema: dwh のようにモデルごとのスキーマを指定できます。 しかしデフォルトの挙動では、指定したスキーマ名はそのまま使われず、profiles.ymlschemaoutputs.dev.schema)と連結されてしまいます。今の設定だと、outputs.dev.schema=martなので、dwhのスキーマは、mart_dwhとなってしまいます.

デフォルト挙動の例

| profiles.yml の schema | dbt_project.yml の +schema | 実際に作られるスキーマ |
| :--- | :--- | :--- |
| `mart` | `dwh` | `mart_dwh` |
| `mart` | `mart` | `mart_mart` |
| `mart` | 指定なし | `mart` |

なぜこのデフォルト挙動になっているのか

複数の開発者が同じ Trino/BigQuery 上で同時開発するとき、名前が衝突しないようにするためです。

Alice (target.schema=alice) → alice_dwh, alice_mart
Bob   (target.schema=bob)   → bob_dwh,  bob_mart
本番  (target.schema=prod)  → prod_dwh, prod_mart

この設計により、開発者ごとに独立したスキーマが自動的に用意されます。

マクロが必要になる場面

チュートリアルや本番環境のように「スキーマ名をそのまま使いたい」場合は、 dbt 組み込みの generate_schema_name マクロを macros/ 配下で上書きする必要があります。

まとめ

  • dbt に +schema の設定だけ書いても連結されてしまうのは仕様
  • +schema に書いた名前をそのまま使いたい」= generate_schema_name マクロの上書きがセット
  • この2つがワンセットになっているのが dbt でのスキーマ管理の標準パターン
Step 9

source を定義して raw テーブルを参照する

追加ファイル: dbt/models/datalake/sources.yml 目的: raw データの入口を固定化
version: 2
sources:
  # Trinoの階層は カタログ.スキーマ.テーブル となっており、今回は、iceberg.{MinIOのフォルダ名}.{テーブル}だから以下のような設定値になる
  - name: raw_data
    database: iceberg # Trino カタログ名
    schema: datalake # Trino スキーマ名
    tables:
      - name: users
      - name: products
      - name: orders

モデルからは {{ source('raw_data', 'users') }} のように参照します。

なぜこの設定か
raw テーブルを直接文字列で書くより、source 定義を 1 か所に集めた方が保守しやすく、dbt docs にも反映されます。
設定値の意味
database: iceberg は Trino catalog、schema: datalake は raw 層です。
Step 10

DWH モデルを作る

追加ファイル: dbt/models/dwh/dim_users.sql 追加ファイル: dbt/models/dwh/dim_products.sql 追加ファイル: dbt/models/dwh/fct_orders.sql

3 つの SQL ファイルは別々です。別ファイルとして保存してください。

dbt/models/dwh/dim_users.sql

SELECT 
    user_id
    , name
    , email
    , signup_date
FROM 
    {{ source('raw_data', 'users') }}

dbt/models/dwh/dim_products.sql

SELECT 
    product_id
    , product_name
    , category
    , price 
FROM 
    {{ source('raw_data', 'products') }}

dbt/models/dwh/fct_orders.sql

SELECT 
    o.order_id
    , o.user_id
    , o.product_id
    , o.order_date
    , o.quantity
    , (o.quantity * p.price) AS total_price
FROM {{ source('raw_data', 'orders') }} as o
LEFT JOIN {{ ref('dim_products') }} as p
    ON o.product_id = p.product_id
なぜこの設定か
dim は属性を持つ次元表、fct は数値計算の中心になるファクト表です。分析しやすい形に raw データを変換します。
設定値の意味
source() は外部テーブル参照、ref() は dbt モデル間参照です。ref() を使うと依存関係が dbt に伝わります。
Step 11

dbt test と time spine を追加する

追加ファイル: dbt/models/dwh/schema.yml 追加ファイル: dbt/models/dwh/_time_spine.sql

dbt/models/dwh/schema.yml

version: 2

models:
  - name: dim_users
    columns:
      - name: user_id
        tests: [unique, not_null]
  - name: dim_products
    columns:
      - name: product_id
        tests: [unique, not_null]
  - name: fct_orders
    columns:
      - name: order_id
        tests: [unique, not_null]
      - name: user_id
        tests: [not_null]
      - name: product_id
        tests: [not_null]
  - name: _time_spine
    columns:
      - name: date_day
        data_type: date
        granularity: day
    time_spine:
      standard_granularity_column: date_day

dbt/models/dwh/_time_spine.sql

{{
  config(
    materialized='table'
  )
}}

SELECT
  sales_date AS date_day
  , EXTRACT(YEAR FROM sales_date) AS year
  , EXTRACT(QUARTER FROM sales_date) AS quarter
  , EXTRACT(MONTH FROM sales_date) AS month
  , EXTRACT(DAY FROM sales_date) AS day_of_month
  , EXTRACT(DOW FROM sales_date) AS day_of_week
FROM {{ ref('daily_sales') }}
ORDER BY sales_date
なぜこの設定か
dbt test は「この列は null であってはいけない」「主キーは重複してはいけない」を機械的に検証します。time spine は時系列分析の基準軸として semantic layer でも重要です。
設定値の意味
tests: [unique, not_null] は組み込みテスト、granularity: day は日粒度、standard_granularity_column は基準日付列を示します。
Step 12

Semantic Layer と Mart を追加する

追加ファイル: dbt/models/dwh/semantic_layer.yml 追加ファイル: dbt/models/mart/daily_sales.sql

Mart の SQL と semantic layer の YAML も別ファイルです。

dbt/models/mart/daily_sales.sql

SELECT 
    CAST(order_date AS DATE) AS sales_date
    , SUM(total_price) AS total_daily_sales
FROM {{ ref('fct_orders') }}
GROUP BY 1
ORDER BY 1

dbt/models/dwh/semantic_layer.yml

# ===================================
# このsemantic_layerの定義は、
# SQLで書くと以下のクエリと同等の意味を持つ。
# -----------------------------------
# select
#   order_date,
#   sum(total_price) as total_sales
# from fct_orders
# group by order_date
# ===================================

version: 2

semantic_models:
  - name: fct_orders
    model: ref('fct_orders')
    entities:
      - name: order_id
        type: primary
      - name: user_id
        type: foreign
      - name: product_id
        type: foreign
    dimensions:
      - name: order_date
        type: time
        type_params: {time_granularity: day}
    measures:
      - name: total_sales
        agg: sum
        expr: total_price
        agg_time_dimension: order_date

metrics:
  - name: total_sales
    label: Total Sales
    type: simple
    type_params:
      measure: total_sales
なぜこの設定か
Mart は用途別の集計表、semantic layer は「売上」という意味を共通定義する層です。同じ数字を別の人が別の SQL で計算してズレるのを防ぎます。
設定値の意味
entities は主語、dimensions は切り口、measures は集計元、metrics は利用者向けの完成形です。

実行方法

教材を読みながら手を動かすなら、以下の順番で実行すると全体像がつながります。

# 1. 基盤を起動
docker compose -f docker/docker-compose.yaml up -d

# 2. サンプルデータを生成
docker compose -f docker/docker-compose.yaml exec python-app python /app/scripts/generate_data.py

# 3. datalake 層に取り込み
docker compose -f docker/docker-compose.yaml exec python-app python /app/scripts/ingest_data.py

# 4. dbt で変換
docker compose -f docker/docker-compose.yaml exec dbt dbt run --project-dir /usr/app/dbt --profiles-dir /usr/app/dbt

# 5. dbt test
docker compose -f docker/docker-compose.yaml exec dbt dbt test --project-dir /usr/app/dbt --profiles-dir /usr/app/dbt

# 6. dbt docs
docker compose -f docker/docker-compose.yaml exec dbt dbt docs generate --project-dir /usr/app/dbt --profiles-dir /usr/app/dbt
docker compose -f docker/docker-compose.yaml exec dbt dbt docs serve --project-dir /usr/app/dbt --profiles-dir /usr/app/dbt --port 8081
確認 1 MinIO の warehouse バケットにファイルが増えること。
確認 2 Trino 経由で iceberg.datalakeiceberg.dwhiceberg.mart が見えること。
確認 3 dbt test が通ること。
確認 4 dbt docs に source, model, test の依存関係が表示されること。

dbt test と documentation 機能をどう理解すればよいか

dbt test

SQL が動くだけでは品質保証になりません。uniquenot_null を入れると、主キーの重複や欠損をすぐ検知できます。

この教材では、dim_users.user_iddim_products.product_idfct_orders.order_id を主キー候補として検証しています。

dbt docs

source、model、test、semantic 定義のつながりを視覚化できます。初学者にとっては、SQL を読むより先に依存関係を俯瞰できるのが大きな利点です。

source から dim/fact に流れ、そこから mart や time spine に接続される構造を確認してください。

この構成で学べる設計の考え方

レイヤ分離

raw をそのまま分析せず、datalake → dwh → mart と段階を分けることで責務が明確になります。

ストレージと計算の分離

MinIO に保存し、Trino が計算し、dbt が変換を管理します。役割を分けるのがモダンデータスタックの基本です。

意味の共通化

semantic layer により「total_sales は total_price の合計」という定義を再利用できます。

時間軸の標準化

time spine を置くと、時系列分析の粒度や欠損日の扱いを統一しやすくなります。

最後に見るべきポイント

source と ref の違い raw テーブルは source()、dbt モデル同士は ref()
catalog / schema / table の3階層 このプロジェクトでは iceberg.dwh.fct_orders のように読む。
Iceberg の本体 テーブルの実体は MinIO にあり、Trino や dbt はそこに直接住んでいるわけではない。
semantic layer の価値 売上定義を SQL から切り出し、再利用できるようにすること。