feat(ingest/bigquery): add tag to BigQuery clustering columns (#8495)

Co-authored-by: Andrew Sikowitz <andrew.sikowitz@acryl.io>
This commit is contained in:
Alexander 2023-08-17 12:44:15 -04:00 committed by GitHub
parent 836e2f49ea
commit c0addf6eef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 27 additions and 10 deletions

View File

@ -129,6 +129,7 @@ logger: logging.Logger = logging.getLogger(__name__)
# Handle table snapshots # Handle table snapshots
# See https://cloud.google.com/bigquery/docs/table-snapshots-intro. # See https://cloud.google.com/bigquery/docs/table-snapshots-intro.
SNAPSHOT_TABLE_REGEX = re.compile(r"^(.+)@(\d{13})$") SNAPSHOT_TABLE_REGEX = re.compile(r"^(.+)@(\d{13})$")
CLUSTERING_COLUMN_TAG = "CLUSTERING_COLUMN"
# We can't use close as it is not called if the ingestion is not successful # We can't use close as it is not called if the ingestion is not successful
@ -1151,6 +1152,21 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
field.description = col.comment field.description = col.comment
schema_fields[idx] = field schema_fields[idx] = field
else: else:
tags = []
if col.is_partition_column:
tags.append(
TagAssociationClass(make_tag_urn(Constants.TAG_PARTITION_KEY))
)
if col.cluster_column_position is not None:
tags.append(
TagAssociationClass(
make_tag_urn(
f"{CLUSTERING_COLUMN_TAG}_{col.cluster_column_position}"
)
)
)
field = SchemaField( field = SchemaField(
fieldPath=col.name, fieldPath=col.name,
type=SchemaFieldDataType( type=SchemaFieldDataType(
@ -1160,15 +1176,7 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
nativeDataType=col.data_type, nativeDataType=col.data_type,
description=col.comment, description=col.comment,
nullable=col.is_nullable, nullable=col.is_nullable,
globalTags=GlobalTagsClass( globalTags=GlobalTagsClass(tags=tags),
tags=[
TagAssociationClass(
make_tag_urn(Constants.TAG_PARTITION_KEY)
)
]
)
if col.is_partition_column
else GlobalTagsClass(tags=[]),
) )
schema_fields.append(field) schema_fields.append(field)
last_id = col.ordinal_position last_id = col.ordinal_position

View File

@ -33,6 +33,7 @@ class BigqueryTableType:
class BigqueryColumn(BaseColumn): class BigqueryColumn(BaseColumn):
field_path: str field_path: str
is_partition_column: bool is_partition_column: bool
cluster_column_position: Optional[int]
RANGE_PARTITION_NAME: str = "RANGE" RANGE_PARTITION_NAME: str = "RANGE"
@ -285,7 +286,8 @@ select
CASE WHEN CONTAINS_SUBSTR(field_path, ".") THEN NULL ELSE c.data_type END as data_type, CASE WHEN CONTAINS_SUBSTR(field_path, ".") THEN NULL ELSE c.data_type END as data_type,
description as comment, description as comment,
c.is_hidden as is_hidden, c.is_hidden as is_hidden,
c.is_partitioning_column as is_partitioning_column c.is_partitioning_column as is_partitioning_column,
c.clustering_ordinal_position as clustering_ordinal_position,
from from
`{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.COLUMNS c `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.COLUMNS c
join `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS as cfp on cfp.table_name = c.table_name join `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS as cfp on cfp.table_name = c.table_name
@ -307,6 +309,7 @@ select * from
description as comment, description as comment,
c.is_hidden as is_hidden, c.is_hidden as is_hidden,
c.is_partitioning_column as is_partitioning_column, c.is_partitioning_column as is_partitioning_column,
c.clustering_ordinal_position as clustering_ordinal_position,
-- We count the columns to be able limit it later -- We count the columns to be able limit it later
row_number() over (partition by c.table_catalog, c.table_schema, c.table_name order by c.ordinal_position asc, c.data_type DESC) as column_num, row_number() over (partition by c.table_catalog, c.table_schema, c.table_name order by c.ordinal_position asc, c.data_type DESC) as column_num,
-- Getting the maximum shard for each table -- Getting the maximum shard for each table
@ -333,6 +336,7 @@ select
CASE WHEN CONTAINS_SUBSTR(field_path, ".") THEN NULL ELSE c.data_type END as data_type, CASE WHEN CONTAINS_SUBSTR(field_path, ".") THEN NULL ELSE c.data_type END as data_type,
c.is_hidden as is_hidden, c.is_hidden as is_hidden,
c.is_partitioning_column as is_partitioning_column, c.is_partitioning_column as is_partitioning_column,
c.clustering_ordinal_position as clustering_ordinal_position,
description as comment description as comment
from from
`{table_identifier.project_id}`.`{table_identifier.dataset}`.INFORMATION_SCHEMA.COLUMNS as c `{table_identifier.project_id}`.`{table_identifier.dataset}`.INFORMATION_SCHEMA.COLUMNS as c
@ -583,6 +587,7 @@ class BigQueryDataDictionary:
data_type=column.data_type, data_type=column.data_type,
comment=column.comment, comment=column.comment,
is_partition_column=column.is_partitioning_column == "YES", is_partition_column=column.is_partitioning_column == "YES",
cluster_column_position=column.clustering_ordinal_position,
) )
) )
@ -621,6 +626,7 @@ class BigQueryDataDictionary:
data_type=column.data_type, data_type=column.data_type,
comment=column.comment, comment=column.comment,
is_partition_column=column.is_partitioning_column == "YES", is_partition_column=column.is_partitioning_column == "YES",
cluster_column_position=column.clustering_ordinal_position,
) )
) )
last_seen_table = column.table_name last_seen_table = column.table_name

View File

@ -37,6 +37,7 @@ def test_generate_day_partitioned_partition_profiler_query():
ordinal_position=1, ordinal_position=1,
data_type="TIMESTAMP", data_type="TIMESTAMP",
is_partition_column=True, is_partition_column=True,
cluster_column_position=None,
comment=None, comment=None,
is_nullable=False, is_nullable=False,
) )
@ -79,6 +80,7 @@ def test_generate_day_partitioned_partition_profiler_query_with_set_partition_ti
ordinal_position=1, ordinal_position=1,
data_type="TIMESTAMP", data_type="TIMESTAMP",
is_partition_column=True, is_partition_column=True,
cluster_column_position=None,
comment=None, comment=None,
is_nullable=False, is_nullable=False,
) )
@ -120,6 +122,7 @@ def test_generate_hour_partitioned_partition_profiler_query():
ordinal_position=1, ordinal_position=1,
data_type="TIMESTAMP", data_type="TIMESTAMP",
is_partition_column=True, is_partition_column=True,
cluster_column_position=None,
comment=None, comment=None,
is_nullable=False, is_nullable=False,
) )