✏️ 編集
BigQueryでは、パーティション変更を伴う場合、 REPLACE TABLE 文は使えない。
dbtのfull-refreshは CREATE OR REPLACE TABLE 文でテーブルを置き換える。
したがって、incremental更新のdbtモデルで、パーティション変更を伴う変更を加えた場合、full-refresh実行時にエラーが発生する…ように思われるが、実はエラーにはならない。
なぜなら、dbtの内部処理で、 REPLACE 句が使えない場合はテーブル削除を行ってからテーブル作成を行うようになっているから。
参考: コードからdbtを理解する
…ということに先日つまづいたので、その調査メモを残す。
REPLACE TABLE 文の挙動
例えば、下記クエリで一度テーブルを作成したとする。
1
2
3
4
5
6
7
8
9
10
11
12
| CREATE OR REPLACE TABLE `YOUR_GC_PROJECT_ID.YOUR_DATASET.for_replace_test_table`
PARTITION BY dt
CLUSTER BY dt
-- CLUSTER BY user_id -- Cannot replace a table with a different partitioning spec. Instead, DROP the table, and then recreate it. New partitioning spec is interval(type:day,field:dt) clustering(user_id) and existing spec is interval(type:day,field:dt) clustering(dt)
AS
SELECT DATE('2026-01-01') AS dt, 'user1' AS user_id, 'hogehoge' AS text
UNION ALL
SELECT DATE('2026-01-02') AS dt, 'user2' AS user_id, 'hogehoge' AS text
UNION ALL
SELECT DATE('2026-01-03') AS dt, 'user3' AS user_id, 'hogehoge' AS text
|
その後、次のようにクラスタリング対象のカラムを変更してテーブルを置き換えようとすると、エラーが発生する。
[!NOTE]
クラスタリング対象のカラム変更もパーティション変更を伴うんだ…。
1
2
3
4
5
6
7
8
9
10
11
12
| CREATE OR REPLACE TABLE `YOUR_GC_PROJECT_ID.YOUR_DATASET.for_replace_test_table`
PARTITION BY dt
- CLUSTER BY dt
+ CLUSTER BY user_id
AS
SELECT DATE('2026-01-01') AS dt, 'user1' AS user_id, 'hogehoge' AS text
UNION ALL
SELECT DATE('2026-01-02') AS dt, 'user2' AS user_id, 'hogehoge' AS text
UNION ALL
SELECT DATE('2026-01-03') AS dt, 'user3' AS user_id, 'hogehoge' AS text
|
発生したエラー:
1
| Cannot replace a table with a different partitioning spec. Instead, DROP the table, and then recreate it. New partitioning spec is interval(type:day,field:dt) clustering(user_id) and existing spec is interval(type:day,field:dt) clustering(dt)
|
dbtのincrementalモデルの挙動
初期状態
seeds/raw_data.csv :
1
2
3
4
5
6
7
8
9
10
11
| dt,user_id,text
2026-01-01,user1,あけましておめでとうございます!今年もよろしくお願いします。
2026-01-01,user2,新年の抱負は何ですか?
2026-01-02,user1,今年はもっと運動を頑張りたいと思います。
2026-01-02,user2,私は新しい趣味を始めたいと思っています。
2026-01-03,user1,どんな趣味を始める予定ですか?
2026-01-04,user2,料理を始めたいと思っています。新しいレシピを試すのが楽しみです。
2026-01-05,user1,それは素晴らしいですね!私も料理が好きです。
2026-01-06,user2,どんな料理が得意ですか?
2026-01-07,user1,私は和食が得意です。特に寿司が好きです。
2026-01-07,user1,寿司を作るのは楽しいですよね!あなたはどんな料理が得意ですか?
|
models/incremental_model.sql :
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| {{ config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={
"field": "dt",
"data_type": "date",
"granularity": "day"
},
cluster_by=['dt']
) }}
WITH
max_date AS (
SELECT
MAX(dt) AS max_dt
FROM
{{ ref('raw_data') }}
),
raw_data AS (
SELECT
dt,
user_id
FROM
{{ ref('raw_data') }}
{% if is_incremental() %}
WHERE
dt >= (SELECT max_dt FROM max_date) - 2
{% endif %}
),
final AS (
SELECT
dt,
user_id,
COUNT(*) AS cnt
FROM
raw_data
GROUP BY ALL
)
SELECT * FROM final
|
dbt seed コマンドで、シードデータをロードしておく。
初回実行
実行コマンド:
1
| $ dbt run -s incremental_model
|
実行されたクエリ:
普通に CREATE OR REPLACE TABLE 文でテーブルが作成されている。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
create or replace table `YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`incremental_model`
partition by dt
cluster by dt
OPTIONS()
as (
WITH
max_date AS (
SELECT
MAX(dt) AS max_dt
FROM
`YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`raw_data`
),
raw_data AS (
SELECT
dt,
user_id
FROM
`YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`raw_data`
),
final AS (
SELECT
dt,
user_id,
COUNT(*) AS cnt
FROM
raw_data
GROUP BY ALL
)
SELECT * FROM final
);
|
2回目実行
incremental更新をさせたいので、シードデータに行を追加しておく。
seeds/raw_data.csv に追加したレコード:
1
2
3
4
5
6
7
8
9
10
11
12
13
| dt,user_id,text
2026-01-01,user1,あけましておめでとうございます!今年もよろしくお願いします。
2026-01-01,user2,新年の抱負は何ですか?
2026-01-02,user1,今年はもっと運動を頑張りたいと思います。
2026-01-02,user2,私は新しい趣味を始めたいと思っています。
2026-01-03,user1,どんな趣味を始める予定ですか?
2026-01-04,user2,料理を始めたいと思っています。新しいレシピを試すのが楽しみです。
2026-01-05,user1,それは素晴らしいですね!私も料理が好きです。
2026-01-06,user2,どんな料理が得意ですか?
2026-01-07,user1,私は和食が得意です。特に寿司が好きです。
2026-01-07,user1,寿司を作るのは楽しいですよね!あなたはどんな料理が得意ですか?
+ 2026-01-08,user2,寿司は美味しいですよね!私も大好きです。
+ 2026-01-08,user1,今度一緒に寿司を食べに行きましょうか?
|
反映コマンド:
[!NOTE]
余談だが、dbt seed コマンドも初回実行と2回目実行で内部処理が少し違う模様。
参考: コードリーディングで理解する dbt seed の仕組み
実行コマンド:
1
| $ dbt run -s incremental_model
|
実行されたクエリ:
tmpテーブルを作った後、元のテーブルに対して、 MERGE 文で更新処理を行っている。
[!NOTE]
この辺りの挙動については、次の記事でわかりやすく解説されている。
参考: BigQuery で使える dbt incremental strategy 完全ガイド
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
-- generated script to merge partitions into `YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`incremental_model`
declare dbt_partitions_for_replacement array<date>;
-- 1. create a temp table with model data
create or replace table `YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`incremental_model__dbt_tmp`
partition by dt
cluster by dt
OPTIONS(
expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour)
)
as (
WITH
max_date AS (
SELECT
MAX(dt) AS max_dt
FROM
`YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`raw_data`
),
raw_data AS (
SELECT
dt,
user_id
FROM
`YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`raw_data`
WHERE
dt >= (SELECT max_dt FROM max_date) - 2
),
final AS (
SELECT
dt,
user_id,
COUNT(*) AS cnt
FROM
raw_data
GROUP BY ALL
)
SELECT * FROM final
);
-- 2. define partitions to update
set (dbt_partitions_for_replacement) = (
select as struct
-- IGNORE NULLS: this needs to be aligned to _dbt_max_partition, which ignores null
array_agg(distinct date(dt) IGNORE NULLS)
from `YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`incremental_model__dbt_tmp`
);
-- 3. run the merge statement
merge into `YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`incremental_model` as DBT_INTERNAL_DEST
using (
select
* from `YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`incremental_model__dbt_tmp`
) as DBT_INTERNAL_SOURCE
on FALSE
when not matched by source
and date(DBT_INTERNAL_DEST.dt) in unnest(dbt_partitions_for_replacement)
then delete
when not matched then insert
(`dt`, `user_id`, `cnt`)
values
(`dt`, `user_id`, `cnt`)
;
-- 4. clean up the temp table
drop table if exists `YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`incremental_model__dbt_tmp`
|
full-refresh実行
実行コマンド:
1
| $ dbt run --full-refresh -s incremental_model
|
実行されたクエリ:
初回実行と全く同じクエリが実行されている。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
create or replace table `YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`incremental_model`
partition by dt
cluster by dt
OPTIONS()
as (
WITH
max_date AS (
SELECT
MAX(dt) AS max_dt
FROM
`YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`raw_data`
),
raw_data AS (
SELECT
dt,
user_id
FROM
`YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`raw_data`
),
final AS (
SELECT
dt,
user_id,
COUNT(*) AS cnt
FROM
raw_data
GROUP BY ALL
)
SELECT * FROM final
);
|
クラスタを変更して普通にrun
models/incremental_model.sql :
クラスタを変更している。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
| {{ config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={
"field": "dt",
"data_type": "date",
"granularity": "day"
},
- cluster_by=['dt']
+ cluster_by=['user_id']
) }}
WITH
max_date AS (
SELECT
MAX(dt) AS max_dt
FROM
{{ ref('raw_data') }}
),
raw_data AS (
SELECT
dt,
user_id
FROM
{{ ref('raw_data') }}
{% if is_incremental() %}
WHERE
dt >= (SELECT max_dt FROM max_date) - 2
{% endif %}
),
final AS (
SELECT
dt,
user_id,
COUNT(*) AS cnt
FROM
raw_data
GROUP BY ALL
)
SELECT * FROM final
|
実行コマンド:
1
| $ dbt run -s incremental_model
|
実行されたクエリ:
普通にincremental更新した時と同じクエリが実行されている。クラスタ変更も反映されておらず、元のクラスタのまま。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
-- generated script to merge partitions into `YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`incremental_model`
declare dbt_partitions_for_replacement array<date>;
-- 1. create a temp table with model data
create or replace table `YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`incremental_model__dbt_tmp`
partition by dt
cluster by user_id
OPTIONS(
expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour)
)
as (
WITH
max_date AS (
SELECT
MAX(dt) AS max_dt
FROM
`YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`raw_data`
),
raw_data AS (
SELECT
dt,
user_id
FROM
`YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`raw_data`
WHERE
dt >= (SELECT max_dt FROM max_date) - 2
),
final AS (
SELECT
dt,
user_id,
COUNT(*) AS cnt
FROM
raw_data
GROUP BY ALL
)
SELECT * FROM final
);
-- 2. define partitions to update
set (dbt_partitions_for_replacement) = (
select as struct
-- IGNORE NULLS: this needs to be aligned to _dbt_max_partition, which ignores null
array_agg(distinct date(dt) IGNORE NULLS)
from `YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`incremental_model__dbt_tmp`
);
-- 3. run the merge statement
merge into `YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`incremental_model` as DBT_INTERNAL_DEST
using (
select
* from `YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`incremental_model__dbt_tmp`
) as DBT_INTERNAL_SOURCE
on FALSE
when not matched by source
and date(DBT_INTERNAL_DEST.dt) in unnest(dbt_partitions_for_replacement)
then delete
when not matched then insert
(`dt`, `user_id`, `cnt`)
values
(`dt`, `user_id`, `cnt`)
;
-- 4. clean up the temp table
drop table if exists `YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`incremental_model__dbt_tmp`
|
変更されたクラスタでfull-refresh
実行コマンド:
1
| $ dbt run --full-refresh -s incremental_model
|
実行されたクエリ:
初回実行、full-refresh実行の時と同じクエリが実行されている。クラスタ変更は反映されている。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
create or replace table `YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`incremental_model`
partition by dt
cluster by user_id
OPTIONS()
as (
WITH
max_date AS (
SELECT
MAX(dt) AS max_dt
FROM
`YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`raw_data`
),
raw_data AS (
SELECT
dt,
user_id
FROM
`YOUR_GC_PROJECT_ID`.`YOUR_DATASET`.`raw_data`
),
final AS (
SELECT
dt,
user_id,
COUNT(*) AS cnt
FROM
raw_data
GROUP BY ALL
)
SELECT * FROM final
);
|
INFORMATION_SCHEMA.JOBS_BY_PROJECTや監査ログで、テーブルを削除してそうなログを探してみたが、見つからなかった…(探し方が悪いのだろうか…)。
コードリーディング
冒頭の記事でもコードリーディングの結果が貼られていたが、改めて自分でも読んでみる。
次の箇所で、 adapter.is_replaceable() で REPLACE 句が使えるかどうかを判定し、使えない場合は adapter.drop_relation() でテーブル削除を行っている。
1
2
3
4
5
6
7
8
9
10
11
12
| {% elif full_refresh_mode %}
{#-- If the partition/cluster config has changed, then we must drop and recreate --#}
{% if not adapter.is_replaceable(existing_relation, partition_by, cluster_by) %}
{% do log("Hard refreshing " ~ existing_relation ~ " because it is not replaceable") %}
{{ adapter.drop_relation(existing_relation) }}
{% endif %}
{%- call statement('main', language=language) -%}
{{ bq_create_table_as(partition_by, False, target_relation, compiled_code, language) }}
{%- endcall -%}
{% else %}
|
引用: https://github.com/dbt-labs/dbt-adapters/blob/main/dbt-bigquery/src/dbt/include/bigquery/macros/materializations/incremental.sql#L116-L126
adapter.is_replaceable() の実装は次のようになっている。テーブルが存在しない場合や、テーブルのパーティション・クラスタリングの設定が変更されていない場合は True を返すようになっている。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| def is_replaceable(
self, relation, conf_partition: Optional[PartitionConfig], conf_cluster
) -> bool:
"""
Check if a given partition and clustering column spec for a table
can replace an existing relation in the database. BigQuery does not
allow tables to be replaced with another table that has a different
partitioning spec. This method returns True if the given config spec is
identical to that of the existing table.
"""
if not relation:
return True
try:
table = self.connections.get_bq_table(
database=relation.database, schema=relation.schema, identifier=relation.identifier
)
except google.cloud.exceptions.NotFound:
return True
return all(
(
self._partitions_match(table, conf_partition),
self._clusters_match(table, conf_cluster),
)
)
|
引用: https://github.com/dbt-labs/dbt-adapters/blob/cb1b4a0b0758fd307dc21583bb3acfc78397a077/dbt-bigquery/src/dbt/adapters/bigquery/impl.py#L679-L704
adapter.drop_relation() の実装は次のようになっている。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| def drop_relation(self, relation: BigQueryRelation) -> None:
is_cached = self._schema_is_cached(relation.database, relation.schema) # type:ignore
if is_cached:
self.cache_dropped(relation)
conn = self.connections.get_thread_connection()
table_ref = self.get_table_ref_from_relation(relation)
# mimic "drop if exists" functionality that's ubiquitous in most sql implementations
conn.handle.delete_table(table_ref, not_found_ok=True)
|
引用: https://github.com/dbt-labs/dbt-adapters/blob/cb1b4a0b0758fd307dc21583bb3acfc78397a077/dbt-bigquery/src/dbt/adapters/bigquery/impl.py#L246-L256
conn.handle.delete_table() の部分をもう少し追ってみる。
BigQueryConnectionManager が open() された時に handle へBigQueryクライアントをセットしている。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| @classmethod
def open(cls, connection):
if connection.state == ConnectionState.OPEN:
logger.debug("Connection is already open, skipping open.")
return connection
try:
connection.handle = create_bigquery_client(connection.credentials)
connection.state = ConnectionState.OPEN
return connection
except Exception as e:
logger.debug(f"""Got an error when attempting to create a bigquery " "client: '{e}'""")
connection.handle = None
connection.state = ConnectionState.FAIL
raise FailedToConnectError(str(e))
|
引用: https://github.com/dbt-labs/dbt-adapters/blob/cb1b4a0b0758fd307dc21583bb3acfc78397a077/dbt-bigquery/src/dbt/adapters/bigquery/connections.py#L189-L204
したがって、 conn.handle.delete_table() は、BigQueryクライアントの delete_table() メソッドを呼び出していることになる。
参考: Package Methods (3.41.0) | Python client libraries | Google Cloud Documentation
(であれば、監査ログにテーブル削除のログが残るはずなんだけどなぁ…)