From 0bb83f138b32043716d1e71ed6ff7ca7a80d60a9 Mon Sep 17 00:00:00 2001 From: Teddy Date: Wed, 14 Sep 2022 15:00:45 +0200 Subject: [PATCH] Fixed table filter pattern for bigquery service (#7416) --- .../ingestion/source/database/bigquery.py | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery.py b/ingestion/src/metadata/ingestion/source/database/bigquery.py index df3cab9e8ed..31831d539bc 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery.py @@ -20,7 +20,7 @@ from google.cloud.bigquery.client import Client from google.cloud.datacatalog_v1 import PolicyTagManagerClient from sqlalchemy import inspect from sqlalchemy.engine.reflection import Inspector -from sqlalchemy_bigquery import _types +from sqlalchemy_bigquery import BigQueryDialect, _types from sqlalchemy_bigquery._struct import STRUCT from sqlalchemy_bigquery._types import ( _get_sqla_column_type, @@ -52,6 +52,7 @@ from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type from metadata.ingestion.source.database.common_db_source import CommonDbSourceService from metadata.utils import fqn +from metadata.utils.filters import filter_by_table from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -94,6 +95,18 @@ def get_columns(bq_schema): _types.get_columns = get_columns +@staticmethod +def _build_formatted_table_id(table): + """We overide the methid as it returns both schema and table name if dataset_id is None. From our + investigation, this method seems to be used only in `_get_table_or_view_names()` of bigquery sqalchemy + https://github.com/googleapis/python-bigquery-sqlalchemy/blob/2b1f5c464ad2576e4512a0407bb044da4287c65e/sqlalchemy_bigquery/base.py + """ + return f"{table.table_id}" + + +BigQueryDialect._build_formatted_table_id = _build_formatted_table_id + + class BigquerySource(CommonDbSourceService): def __init__(self, config, metadata_config): super().__init__(config, metadata_config) @@ -114,14 +127,6 @@ class BigquerySource(CommonDbSourceService): ) return cls(config, metadata_config) - def standardize_table_name(self, schema: str, table: str) -> str: - segments = fqn.split(table) - if len(segments) != 2: - raise ValueError(f"expected table to contain schema name already {table}") - if segments[0] != schema: - raise ValueError(f"schema {schema} does not match table {table}") - return segments[1] - @staticmethod def set_project_id(): _, project_id = auth.default()