Fixed table filter pattern for bigquery service (#7416)

This commit is contained in:
Teddy 2022-09-14 15:00:45 +02:00 committed by GitHub
parent 1d0c6360bf
commit 0bb83f138b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -20,7 +20,7 @@ from google.cloud.bigquery.client import Client
from google.cloud.datacatalog_v1 import PolicyTagManagerClient
from sqlalchemy import inspect
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy_bigquery import _types
from sqlalchemy_bigquery import BigQueryDialect, _types
from sqlalchemy_bigquery._struct import STRUCT
from sqlalchemy_bigquery._types import (
_get_sqla_column_type,
@ -52,6 +52,7 @@ from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory
from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.utils import fqn
from metadata.utils.filters import filter_by_table
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -94,6 +95,18 @@ def get_columns(bq_schema):
_types.get_columns = get_columns
@staticmethod
def _build_formatted_table_id(table):
"""We overide the methid as it returns both schema and table name if dataset_id is None. From our
investigation, this method seems to be used only in `_get_table_or_view_names()` of bigquery sqalchemy
https://github.com/googleapis/python-bigquery-sqlalchemy/blob/2b1f5c464ad2576e4512a0407bb044da4287c65e/sqlalchemy_bigquery/base.py
"""
return f"{table.table_id}"
BigQueryDialect._build_formatted_table_id = _build_formatted_table_id
class BigquerySource(CommonDbSourceService):
def __init__(self, config, metadata_config):
super().__init__(config, metadata_config)
@ -114,14 +127,6 @@ class BigquerySource(CommonDbSourceService):
)
return cls(config, metadata_config)
def standardize_table_name(self, schema: str, table: str) -> str:
segments = fqn.split(table)
if len(segments) != 2:
raise ValueError(f"expected table to contain schema name already {table}")
if segments[0] != schema:
raise ValueError(f"schema {schema} does not match table {table}")
return segments[1]
@staticmethod
def set_project_id():
_, project_id = auth.default()