diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py index 84d76026d70..ca001de8410 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py @@ -65,6 +65,9 @@ from metadata.generated.schema.type.tagLabel import ( from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.source.connections import get_connection +from metadata.ingestion.source.database.bigquery.queries import ( + BIGQUERY_SCHEMA_DESCRIPTION, +) from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type from metadata.ingestion.source.database.common_db_source import CommonDbSourceService from metadata.utils import fqn @@ -251,6 +254,25 @@ class BigquerySource(CommonDbSourceService): logger.debug(traceback.format_exc()) logger.warning(f"Skipping Policy Tag: {exc}") + def get_schema_description(self, schema_name: str) -> Optional[str]: + try: + query_resp = self.client.query( + BIGQUERY_SCHEMA_DESCRIPTION.format( + project_id=self.client.project, + region=self.service_connection.usageLocation, + schema_name=schema_name, + ) + ) + + query_result = [result.schema_description for result in query_resp.result()] + return query_result[0] + except IndexError: + logger.warning(f"No dataset description found for {schema_name}") + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(f"Failed to fetch {err}") + return "" + def yield_database_schema( self, schema_name: str ) -> Iterable[CreateDatabaseSchemaRequest]: @@ -267,8 +289,9 @@ class BigquerySource(CommonDbSourceService): dataset_obj = self.client.get_dataset(schema_name) if dataset_obj.labels: + database_schema_request_obj.tags = [] for label_classification, label_tag_name in dataset_obj.labels.items(): - database_schema_request_obj.tags = [ + database_schema_request_obj.tags.append( TagLabel( tagFQN=fqn.build( self.metadata, @@ -280,7 +303,7 @@ class BigquerySource(CommonDbSourceService): state=State.Suggested.value, source=TagSource.Classification.value, ) - ] + ) yield database_schema_request_obj def get_tag_labels(self, table_name: str) -> Optional[List[TagLabel]]: diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/queries.py b/ingestion/src/metadata/ingestion/source/database/bigquery/queries.py index fb9aa150586..4562d3f5756 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/queries.py @@ -40,3 +40,13 @@ WHERE creation_time BETWEEN "{start_time}" AND "{end_time}" BIGQUERY_TEST_STATEMENT = textwrap.dedent( """SELECT query FROM `region-{region}`.INFORMATION_SCHEMA.JOBS_BY_PROJECT limit 1""" ) + + +BIGQUERY_SCHEMA_DESCRIPTION = textwrap.dedent( + """ + SELECT option_value as schema_description FROM + {project_id}.region-{region}.INFORMATION_SCHEMA.SCHEMATA_OPTIONS + where schema_name = '{schema_name}' and option_name = 'description' + and option_value is not null + """ +)