Fixes 11825 - Add dataset description on schema level - bigquery (#11878)

This commit is contained in:
Ayush Shah 2023-06-06 12:58:47 +05:30 committed by GitHub
parent 0315fc2a74
commit 236141d9df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 2 deletions

View File

@ -65,6 +65,9 @@ from metadata.generated.schema.type.tagLabel import (
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.bigquery.queries import (
BIGQUERY_SCHEMA_DESCRIPTION,
)
from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.utils import fqn
@ -251,6 +254,25 @@ class BigquerySource(CommonDbSourceService):
logger.debug(traceback.format_exc())
logger.warning(f"Skipping Policy Tag: {exc}")
def get_schema_description(self, schema_name: str) -> Optional[str]:
try:
query_resp = self.client.query(
BIGQUERY_SCHEMA_DESCRIPTION.format(
project_id=self.client.project,
region=self.service_connection.usageLocation,
schema_name=schema_name,
)
)
query_result = [result.schema_description for result in query_resp.result()]
return query_result[0]
except IndexError:
logger.warning(f"No dataset description found for {schema_name}")
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(f"Failed to fetch {err}")
return ""
def yield_database_schema(
self, schema_name: str
) -> Iterable[CreateDatabaseSchemaRequest]:
@ -267,8 +289,9 @@ class BigquerySource(CommonDbSourceService):
dataset_obj = self.client.get_dataset(schema_name)
if dataset_obj.labels:
database_schema_request_obj.tags = []
for label_classification, label_tag_name in dataset_obj.labels.items():
database_schema_request_obj.tags = [
database_schema_request_obj.tags.append(
TagLabel(
tagFQN=fqn.build(
self.metadata,
@ -280,7 +303,7 @@ class BigquerySource(CommonDbSourceService):
state=State.Suggested.value,
source=TagSource.Classification.value,
)
]
)
yield database_schema_request_obj
def get_tag_labels(self, table_name: str) -> Optional[List[TagLabel]]:

View File

@ -40,3 +40,13 @@ WHERE creation_time BETWEEN "{start_time}" AND "{end_time}"
BIGQUERY_TEST_STATEMENT = textwrap.dedent(
"""SELECT query FROM `region-{region}`.INFORMATION_SCHEMA.JOBS_BY_PROJECT limit 1"""
)
BIGQUERY_SCHEMA_DESCRIPTION = textwrap.dedent(
"""
SELECT option_value as schema_description FROM
{project_id}.region-{region}.INFORMATION_SCHEMA.SCHEMATA_OPTIONS
where schema_name = '{schema_name}' and option_name = 'description'
and option_value is not null
"""
)