fix(ingest/bigquery): Map BigQuery policy tags to datahub column-level tags (#10669)

This commit is contained in:
sagar-salvi-apptware 2024-06-14 16:43:12 +05:30 committed by GitHub
parent bb44c4c265
commit d69966074a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 153 additions and 19 deletions

View File

@ -28,19 +28,21 @@ There are two important concepts to understand and identify:
If you have multiple projects in your BigQuery setup, the role should be granted these permissions in each of the projects.
:::
| permission                       | Description                                                                                                 | Capability               | Default GCP role which contains this permission                                                                 |
|----------------------------------|--------------------------------------------------------------------------------------------------------------|-------------------------------------|-----------------------------------------------------------------------------------------------------------------|
| `bigquery.datasets.get`         | Retrieve metadata about a dataset.                                                                           | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.datasets.getIamPolicy` | Read a dataset's IAM permissions.                                                                           | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.tables.list`           | List BigQuery tables.                                                                                       | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.tables.get`           | Retrieve metadata for a table.                                                                               | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.routines.get`           | Get Routines. Needs to retrieve metadata for a table from system table.                                                                                       | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.routines.list`           | List Routines. Needs to retrieve metadata for a table from system table                                                                               | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `resourcemanager.projects.get`   | Retrieve project names and metadata.                                                                         | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.jobs.listAll`         | List all jobs (queries) submitted by any user. Needs for Lineage extraction.                                 | Lineage Extraction/Usage extraction | [roles/bigquery.resourceViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.resourceViewer) |
| `logging.logEntries.list`       | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. | Lineage Extraction/Usage extraction | [roles/logging.privateLogViewer](https://cloud.google.com/logging/docs/access-control#logging.privateLogViewer) |
| `logging.privateLogEntries.list` | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. | Lineage Extraction/Usage extraction | [roles/logging.privateLogViewer](https://cloud.google.com/logging/docs/access-control#logging.privateLogViewer) |
| `bigquery.tables.getData`       | Access table data to extract storage size, last updated at, data profiles etc. | Profiling                           |                                                                                                                 |
| Permission | Description | Capability | Default GCP Role Which Contains This Permission |
|----------------------------------|-----------------------------------------------------------------------------------------------------------------|-------------------------------------|---------------------------------------------------------------------------|
| `bigquery.datasets.get` | Retrieve metadata about a dataset. | Table Metadata Extraction | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.datasets.getIamPolicy` | Read a dataset's IAM permissions. | Table Metadata Extraction | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.tables.list` | List BigQuery tables. | Table Metadata Extraction | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.tables.get` | Retrieve metadata for a table. | Table Metadata Extraction | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.routines.get` | Get Routines. Needs to retrieve metadata for a table from system table. | Table Metadata Extraction | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.routines.list` | List Routines. Needs to retrieve metadata for a table from system table. | Table Metadata Extraction | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `resourcemanager.projects.get` | Retrieve project names and metadata. | Table Metadata Extraction | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.jobs.listAll` | List all jobs (queries) submitted by any user. Needs for Lineage extraction. | Lineage Extraction/Usage Extraction | [roles/bigquery.resourceViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.resourceViewer) |
| `logging.logEntries.list` | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. | Lineage Extraction/Usage Extraction | [roles/logging.privateLogViewer](https://cloud.google.com/logging/docs/access-control#logging.privateLogViewer) |
| `logging.privateLogEntries.list` | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. | Lineage Extraction/Usage Extraction | [roles/logging.privateLogViewer](https://cloud.google.com/logging/docs/access-control#logging.privateLogViewer) |
| `bigquery.tables.getData` | Access table data to extract storage size, last updated at, data profiles etc. | Profiling | |
| `datacatalog.policyTags.get` | *Optional* Get policy tags for columns with associated policy tags. This permission is required only if `extract_policy_tags_from_catalog` is enabled. | Policy Tag Extraction | [roles/datacatalog.viewer](https://cloud.google.com/data-catalog/docs/access-control#permissions-and-roles) |
#### Create a service account in the Extractor Project

View File

@ -16,6 +16,7 @@ source:
#include_tables: true
#include_views: true
#include_table_lineage: true
#extract_policy_tags_from_catalog: true
#start_time: 2021-12-15T20:08:23.091Z
#end_time: 2023-12-15T20:08:23.091Z
#profiling:

View File

@ -168,6 +168,7 @@ bigquery_common = {
# Google cloud logging library
"google-cloud-logging<=3.5.0",
"google-cloud-bigquery",
"google-cloud-datacatalog>=1.5.0",
"more-itertools>=8.12.0",
"sqlalchemy-bigquery>=1.4.1",
}

View File

@ -130,6 +130,7 @@ from datahub.utilities.hive_schema_to_avro import (
)
from datahub.utilities.mapping import Constants
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.ratelimiter import RateLimiter
from datahub.utilities.registries.domain_registry import DomainRegistry
logger: logging.Logger = logging.getLogger(__name__)
@ -236,8 +237,14 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
BigqueryTableIdentifier._BQ_SHARDED_TABLE_SUFFIX = ""
self.bigquery_data_dictionary = BigQuerySchemaApi(
self.report.schema_api_perf, self.config.get_bigquery_client()
self.report.schema_api_perf,
self.config.get_bigquery_client(),
)
if self.config.extract_policy_tags_from_catalog:
self.bigquery_data_dictionary.datacatalog_client = (
self.config.get_policy_tag_manager_client()
)
self.sql_parser_schema_resolver = self._init_schema_resolver()
self.data_reader: Optional[BigQueryDataReader] = None
@ -742,6 +749,12 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
columns = None
rate_limiter: Optional[RateLimiter] = None
if self.config.rate_limit:
rate_limiter = RateLimiter(
max_calls=self.config.requests_per_min, period=60
)
if (
self.config.include_tables
or self.config.include_views
@ -752,6 +765,9 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
dataset_name=dataset_name,
column_limit=self.config.column_limit,
run_optimized_column_query=self.config.run_optimized_column_query,
extract_policy_tags_from_catalog=self.config.extract_policy_tags_from_catalog,
report=self.report,
rate_limiter=rate_limiter,
)
if self.config.include_tables:
@ -1275,6 +1291,9 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
)
)
if col.policy_tags:
for policy_tag in col.policy_tags:
tags.append(TagAssociationClass(make_tag_urn(policy_tag)))
field = SchemaField(
fieldPath=col.name,
type=SchemaFieldDataType(

View File

@ -3,7 +3,7 @@ import os
from datetime import timedelta
from typing import Any, Dict, List, Optional, Union
from google.cloud import bigquery
from google.cloud import bigquery, datacatalog_v1
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from pydantic import Field, PositiveInt, PrivateAttr, root_validator, validator
@ -70,6 +70,9 @@ class BigQueryConnectionConfig(ConfigModel):
client_options = self.extra_client_options
return bigquery.Client(self.project_on_behalf, **client_options)
def get_policy_tag_manager_client(self) -> datacatalog_v1.PolicyTagManagerClient:
return datacatalog_v1.PolicyTagManagerClient()
def make_gcp_logging_client(
self, project_id: Optional[str] = None
) -> GCPLoggingClient:
@ -226,6 +229,16 @@ class BigQueryV2Config(
description="Use the legacy sharded table urn suffix added.",
)
extract_policy_tags_from_catalog: bool = Field(
default=False,
description=(
"This flag enables the extraction of policy tags from the Google Data Catalog API. "
"When enabled, the extractor will fetch policy tags associated with BigQuery table columns. "
"For more information about policy tags and column-level security, refer to the documentation: "
"https://cloud.google.com/bigquery/docs/column-level-security-intro"
),
)
scheme: str = "bigquery"
log_page_size: PositiveInt = Field(

View File

@ -2,9 +2,9 @@ import logging
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, Iterator, List, Optional
from typing import Any, Dict, Iterable, Iterator, List, Optional
from google.cloud import bigquery
from google.cloud import bigquery, datacatalog_v1
from google.cloud.bigquery.table import (
RowIterator,
TableListItem,
@ -22,6 +22,7 @@ from datahub.ingestion.source.bigquery_v2.queries import (
BigqueryTableType,
)
from datahub.ingestion.source.sql.sql_generic import BaseColumn, BaseTable, BaseView
from datahub.utilities.ratelimiter import RateLimiter
logger: logging.Logger = logging.getLogger(__name__)
@ -31,6 +32,7 @@ class BigqueryColumn(BaseColumn):
field_path: str
is_partition_column: bool
cluster_column_position: Optional[int]
policy_tags: Optional[List[str]] = None
RANGE_PARTITION_NAME: str = "RANGE"
@ -137,10 +139,14 @@ class BigqueryProject:
class BigQuerySchemaApi:
def __init__(
self, report: BigQuerySchemaApiPerfReport, client: bigquery.Client
self,
report: BigQuerySchemaApiPerfReport,
client: bigquery.Client,
datacatalog_client: Optional[datacatalog_v1.PolicyTagManagerClient] = None,
) -> None:
self.bq_client = client
self.report = report
self.datacatalog_client = datacatalog_client
def get_query_result(self, query: str) -> RowIterator:
logger.debug(f"Query : {query}")
@ -347,12 +353,69 @@ class BigQuerySchemaApi:
rows_count=view.get("row_count"),
)
def get_policy_tags_for_column(
self,
project_id: str,
dataset_name: str,
table_name: str,
column_name: str,
report: BigQueryV2Report,
rate_limiter: Optional[RateLimiter] = None,
) -> Iterable[str]:
assert self.datacatalog_client
try:
# Get the table schema
table_ref = f"{project_id}.{dataset_name}.{table_name}"
table = self.bq_client.get_table(table_ref)
schema = table.schema
# Find the specific field in the schema
field = next((f for f in schema if f.name == column_name), None)
if not field or not field.policy_tags:
return
# Retrieve policy tag display names
for policy_tag_name in field.policy_tags.names:
try:
if rate_limiter:
with rate_limiter:
policy_tag = self.datacatalog_client.get_policy_tag(
name=policy_tag_name
)
else:
policy_tag = self.datacatalog_client.get_policy_tag(
name=policy_tag_name
)
yield policy_tag.display_name
except Exception as e:
logger.warning(
f"Unexpected error when retrieving policy tag {policy_tag_name} for column {column_name} in table {table_name}: {e}",
exc_info=True,
)
report.report_warning(
"metadata-extraction",
f"Failed to retrieve policy tag {policy_tag_name} for column {column_name} in table {table_name} due to unexpected error: {e}",
)
except Exception as e:
logger.error(
f"Unexpected error retrieving schema for table {table_name} in dataset {dataset_name}, project {project_id}: {e}",
exc_info=True,
)
report.report_warning(
"metadata-extraction",
f"Failed to retrieve schema for table {table_name} in dataset {dataset_name}, project {project_id} due to unexpected error: {e}",
)
def get_columns_for_dataset(
self,
project_id: str,
dataset_name: str,
column_limit: int,
report: BigQueryV2Report,
run_optimized_column_query: bool = False,
extract_policy_tags_from_catalog: bool = False,
rate_limiter: Optional[RateLimiter] = None,
) -> Optional[Dict[str, List[BigqueryColumn]]]:
columns: Dict[str, List[BigqueryColumn]] = defaultdict(list)
with self.report.get_columns_for_dataset:
@ -397,6 +460,18 @@ class BigQuerySchemaApi:
comment=column.comment,
is_partition_column=column.is_partitioning_column == "YES",
cluster_column_position=column.clustering_ordinal_position,
policy_tags=list(
self.get_policy_tags_for_column(
project_id,
dataset_name,
column.table_name,
column.column_name,
report,
rate_limiter,
)
)
if extract_policy_tags_from_catalog
else [],
)
)

View File

@ -249,7 +249,11 @@
"nativeDataType": "INT",
"recursive": false,
"globalTags": {
"tags": []
"tags": [
{
"tag": "urn:li:tag:Test Policy Tag"
}
]
},
"glossaryTerms": {
"terms": [
@ -428,5 +432,21 @@
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "tag",
"entityUrn": "urn:li:tag:Test Policy Tag",
"changeType": "UPSERT",
"aspectName": "tagKey",
"aspect": {
"json": {
"name": "Test Policy Tag"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -44,8 +44,10 @@ def random_email():
@patch.object(BigQuerySchemaApi, "get_columns_for_dataset")
@patch.object(BigQueryDataReader, "get_sample_data_for_table")
@patch("google.cloud.bigquery.Client")
@patch("google.cloud.datacatalog_v1.PolicyTagManagerClient")
def test_bigquery_v2_ingest(
client,
policy_tag_manager_client,
get_sample_data_for_table,
get_columns_for_dataset,
get_datasets_for_project_id,
@ -78,6 +80,7 @@ def test_bigquery_v2_ingest(
comment="comment",
is_partition_column=False,
cluster_column_position=None,
policy_tags=["Test Policy Tag"],
),
BigqueryColumn(
name="email",