diff --git a/ingestion/src/metadata/ingestion/source/bigquery.py b/ingestion/src/metadata/ingestion/source/bigquery.py index a6a44a4167f..e57b59d0eb8 100644 --- a/ingestion/src/metadata/ingestion/source/bigquery.py +++ b/ingestion/src/metadata/ingestion/source/bigquery.py @@ -9,9 +9,11 @@ # See the License for the specific language governing permissions and # limitations under the License. import os -from typing import Optional, Tuple +import uuid +from typing import Iterable, Optional, Tuple from google.cloud.datacatalog_v1 import PolicyTagManagerClient +from sqlalchemy.engine.reflection import Inspector from sqlalchemy_bigquery import _types from sqlalchemy_bigquery._struct import STRUCT from sqlalchemy_bigquery._types import ( @@ -23,7 +25,7 @@ from metadata.generated.schema.api.tags.createTagCategory import ( CreateTagCategoryRequest, ) from metadata.generated.schema.entity.data.database import Database -from metadata.generated.schema.entity.data.table import TableData +from metadata.generated.schema.entity.data.table import Table, TableData from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import ( BigQueryConnection, ) @@ -35,8 +37,10 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException -from metadata.ingestion.source.sql_source import SQLSource +from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable +from metadata.ingestion.source.sql_source import SQLSource, _get_table_description from metadata.utils.column_type_parser import create_sqlalchemy_type +from metadata.utils.filters import filter_by_table from metadata.utils.helpers import get_start_and_end from metadata.utils.logger import ingestion_logger @@ -167,6 +171,76 @@ class BigquerySource(SQLSource): ), ) + def fetch_views( + self, inspector: Inspector, schema: str + ) -> Iterable[OMetaDatabaseAndTable]: + """ + Get all views in the SQL schema and prepare + Database & Table OpenMetadata Entities + """ + for view_name in inspector.get_view_names(schema): + try: + schema, view_name = self.standardize_schema_table_names( + schema, view_name + ) + + if filter_by_table( + self.source_config.tableFilterPattern, table_name=view_name + ): + self.status.filter( + f"{self.config.serviceName}.{view_name}", + "View pattern not allowed", + ) + continue + try: + view_definition = inspector.get_view_definition( + f"{self.service_connection.projectId}.{schema}.{view_name}" + ) + + view_definition = ( + "" if view_definition is None else str(view_definition) + ) + except NotImplementedError: + view_definition = "" + + table = Table( + id=uuid.uuid4(), + name=view_name, + tableType="View", + description=_get_table_description(schema, view_name, inspector) + or "", + # This will be generated in the backend!! #1673 + columns=self._get_columns(schema, view_name, inspector), + viewDefinition=view_definition, + ) + if self.source_config.generateSampleData: + table_data = self.fetch_sample_data(schema, view_name) + table.sampleData = table_data + + try: + if self.source_config.enableDataProfiler: + profile = self.run_profiler(table=table, schema=schema) + table.tableProfile = [profile] if profile else None + # Catch any errors during the profile runner and continue + except Exception as err: + logger.error(err) + + database = self._get_database(self.service_connection.database) + table_schema_and_db = OMetaDatabaseAndTable( + table=table, + database=database, + database_schema=self._get_schema(schema, database), + ) + + self.register_record(table_schema_and_db) + + yield table_schema_and_db + # Catch any errors and continue the ingestion + except Exception as err: # pylint: disable=broad-except + logger.error(err) + self.status.warnings.append(f"{self.config.serviceName}.{view_name}") + continue + def parse_raw_data_type(self, raw_data_type): return raw_data_type.replace(", ", ",").replace(" ", ":").lower() diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index 2bc11f931b1..15296ec6c60 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -374,10 +374,6 @@ class SQLSource(Source[OMetaDatabaseAndTable]): """ for view_name in inspector.get_view_names(schema): try: - if self.service_connection.scheme == "bigquery": - schema, view_name = self.standardize_schema_table_names( - schema, view_name - ) if filter_by_table( self.source_config.tableFilterPattern, table_name=view_name @@ -388,14 +384,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): ) continue try: - if self.service_connection.scheme == "bigquery": - view_definition = inspector.get_view_definition( - f"{self.service_connection.projectId}.{schema}.{view_name}" - ) - else: - view_definition = inspector.get_view_definition( - view_name, schema - ) + view_definition = inspector.get_view_definition(view_name, schema) view_definition = ( "" if view_definition is None else str(view_definition) )