From f9e3e1801faaf68696903edd08fa3732915e3a0c Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Fri, 18 Aug 2023 13:45:14 +0530 Subject: [PATCH] Add External Table type support bq (#12889) --- .../source/database/bigquery/metadata.py | 39 ++++++++++++++++--- .../source/database/bigquery/queries.py | 6 +++ 2 files changed, 40 insertions(+), 5 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py index 18978f56640..3d49ad5fada 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py @@ -56,9 +56,13 @@ from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.source.database.bigquery.queries import ( BIGQUERY_SCHEMA_DESCRIPTION, + BIGQUERY_TABLE_AND_TYPE, ) from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type -from metadata.ingestion.source.database.common_db_source import CommonDbSourceService +from metadata.ingestion.source.database.common_db_source import ( + CommonDbSourceService, + TableNameAndType, +) from metadata.utils import fqn from metadata.utils.credentials import GOOGLE_CREDENTIALS from metadata.utils.filters import filter_by_database @@ -70,6 +74,11 @@ from metadata.utils.tag_utils import ( get_tag_labels, ) +_bigquery_table_types = { + "BASE TABLE": TableType.Regular, + "EXTERNAL": TableType.External, +} + class BQJSON(String): """The SQL JSON type.""" @@ -198,6 +207,29 @@ class BigquerySource(CommonDbSourceService): _, project_ids = auth.default() return project_ids + def query_table_names_and_types( + self, schema_name: str + ) -> Iterable[TableNameAndType]: + """ + Connect to the source database to get the table + name and type. By default, use the inspector method + to get the names and pass the Regular type. + + This is useful for sources where we need fine-grained + logic on how to handle table types, e.g., external, foreign,... + """ + + return [ + TableNameAndType( + name=table_name, + type_=_bigquery_table_types.get(table_type, TableType.Regular), + ) + for table_name, table_type in self.engine.execute( + BIGQUERY_TABLE_AND_TYPE.format(schema_name) + ) + or [] + ] + def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndClassification]: """ Build tag context @@ -322,10 +354,7 @@ class BigquerySource(CommonDbSourceService): def get_database_names(self) -> Iterable[str]: if isinstance( self.service_connection.credentials.gcpConfig, GcpCredentialsPath - ): - self.set_inspector(database_name=self.project_ids) - yield self.project_ids - elif isinstance( + ) or isinstance( self.service_connection.credentials.gcpConfig.projectId, SingleProjectId ): self.set_inspector(database_name=self.project_ids) diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/queries.py b/ingestion/src/metadata/ingestion/source/database/bigquery/queries.py index 4562d3f5756..6b33dc0e146 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/queries.py @@ -50,3 +50,9 @@ BIGQUERY_SCHEMA_DESCRIPTION = textwrap.dedent( and option_value is not null """ ) + +BIGQUERY_TABLE_AND_TYPE = textwrap.dedent( + """ + select table_name, table_type from {}.INFORMATION_SCHEMA.TABLES where table_type != 'VIEW' + """ +)