bigquery view fix (#4897)

* bigquery view fix

* overide bigquery fetch_view method

Co-authored-by: Onkar Ravgan <onkarravgan@Onkars-MacBook-Pro.local>
This commit is contained in:
Onkar Ravgan 2022-05-12 16:25:11 +05:30 committed by GitHub
parent 862b08ef2c
commit c419b07ab5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 78 additions and 15 deletions

View File

@ -9,9 +9,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from typing import Optional, Tuple
import uuid
from typing import Iterable, Optional, Tuple
from google.cloud.datacatalog_v1 import PolicyTagManagerClient
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy_bigquery import _types
from sqlalchemy_bigquery._struct import STRUCT
from sqlalchemy_bigquery._types import (
@ -23,7 +25,7 @@ from metadata.generated.schema.api.tags.createTagCategory import (
CreateTagCategoryRequest,
)
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import TableData
from metadata.generated.schema.entity.data.table import Table, TableData
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigQueryConnection,
)
@ -35,8 +37,10 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.sql_source import SQLSource
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.source.sql_source import SQLSource, _get_table_description
from metadata.utils.column_type_parser import create_sqlalchemy_type
from metadata.utils.filters import filter_by_table
from metadata.utils.helpers import get_start_and_end
from metadata.utils.logger import ingestion_logger
@ -167,6 +171,76 @@ class BigquerySource(SQLSource):
),
)
def fetch_views(
self, inspector: Inspector, schema: str
) -> Iterable[OMetaDatabaseAndTable]:
"""
Get all views in the SQL schema and prepare
Database & Table OpenMetadata Entities
"""
for view_name in inspector.get_view_names(schema):
try:
schema, view_name = self.standardize_schema_table_names(
schema, view_name
)
if filter_by_table(
self.source_config.tableFilterPattern, table_name=view_name
):
self.status.filter(
f"{self.config.serviceName}.{view_name}",
"View pattern not allowed",
)
continue
try:
view_definition = inspector.get_view_definition(
f"{self.service_connection.projectId}.{schema}.{view_name}"
)
view_definition = (
"" if view_definition is None else str(view_definition)
)
except NotImplementedError:
view_definition = ""
table = Table(
id=uuid.uuid4(),
name=view_name,
tableType="View",
description=_get_table_description(schema, view_name, inspector)
or "",
# This will be generated in the backend!! #1673
columns=self._get_columns(schema, view_name, inspector),
viewDefinition=view_definition,
)
if self.source_config.generateSampleData:
table_data = self.fetch_sample_data(schema, view_name)
table.sampleData = table_data
try:
if self.source_config.enableDataProfiler:
profile = self.run_profiler(table=table, schema=schema)
table.tableProfile = [profile] if profile else None
# Catch any errors during the profile runner and continue
except Exception as err:
logger.error(err)
database = self._get_database(self.service_connection.database)
table_schema_and_db = OMetaDatabaseAndTable(
table=table,
database=database,
database_schema=self._get_schema(schema, database),
)
self.register_record(table_schema_and_db)
yield table_schema_and_db
# Catch any errors and continue the ingestion
except Exception as err: # pylint: disable=broad-except
logger.error(err)
self.status.warnings.append(f"{self.config.serviceName}.{view_name}")
continue
def parse_raw_data_type(self, raw_data_type):
return raw_data_type.replace(", ", ",").replace(" ", ":").lower()

View File

@ -374,10 +374,6 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
"""
for view_name in inspector.get_view_names(schema):
try:
if self.service_connection.scheme == "bigquery":
schema, view_name = self.standardize_schema_table_names(
schema, view_name
)
if filter_by_table(
self.source_config.tableFilterPattern, table_name=view_name
@ -388,14 +384,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
)
continue
try:
if self.service_connection.scheme == "bigquery":
view_definition = inspector.get_view_definition(
f"{self.service_connection.projectId}.{schema}.{view_name}"
)
else:
view_definition = inspector.get_view_definition(
view_name, schema
)
view_definition = inspector.get_view_definition(view_name, schema)
view_definition = (
"" if view_definition is None else str(view_definition)
)