Add External Table type support bq (#12889)

This commit is contained in:
Ayush Shah 2023-08-18 13:45:14 +05:30 committed by GitHub
parent 99b57db6db
commit f9e3e1801f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 5 deletions

View File

@ -56,9 +56,13 @@ from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.source.database.bigquery.queries import (
BIGQUERY_SCHEMA_DESCRIPTION,
BIGQUERY_TABLE_AND_TYPE,
)
from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.common_db_source import (
CommonDbSourceService,
TableNameAndType,
)
from metadata.utils import fqn
from metadata.utils.credentials import GOOGLE_CREDENTIALS
from metadata.utils.filters import filter_by_database
@ -70,6 +74,11 @@ from metadata.utils.tag_utils import (
get_tag_labels,
)
_bigquery_table_types = {
"BASE TABLE": TableType.Regular,
"EXTERNAL": TableType.External,
}
class BQJSON(String):
"""The SQL JSON type."""
@ -198,6 +207,29 @@ class BigquerySource(CommonDbSourceService):
_, project_ids = auth.default()
return project_ids
def query_table_names_and_types(
self, schema_name: str
) -> Iterable[TableNameAndType]:
"""
Connect to the source database to get the table
name and type. By default, use the inspector method
to get the names and pass the Regular type.
This is useful for sources where we need fine-grained
logic on how to handle table types, e.g., external, foreign,...
"""
return [
TableNameAndType(
name=table_name,
type_=_bigquery_table_types.get(table_type, TableType.Regular),
)
for table_name, table_type in self.engine.execute(
BIGQUERY_TABLE_AND_TYPE.format(schema_name)
)
or []
]
def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndClassification]:
"""
Build tag context
@ -322,10 +354,7 @@ class BigquerySource(CommonDbSourceService):
def get_database_names(self) -> Iterable[str]:
if isinstance(
self.service_connection.credentials.gcpConfig, GcpCredentialsPath
):
self.set_inspector(database_name=self.project_ids)
yield self.project_ids
elif isinstance(
) or isinstance(
self.service_connection.credentials.gcpConfig.projectId, SingleProjectId
):
self.set_inspector(database_name=self.project_ids)

View File

@ -50,3 +50,9 @@ BIGQUERY_SCHEMA_DESCRIPTION = textwrap.dedent(
and option_value is not null
"""
)
BIGQUERY_TABLE_AND_TYPE = textwrap.dedent(
"""
select table_name, table_type from {}.INFORMATION_SCHEMA.TABLES where table_type != 'VIEW'
"""
)