diff --git a/ingestion/src/metadata/ingestion/source/bigquery.py b/ingestion/src/metadata/ingestion/source/bigquery.py index 017890be88e..ca663b78348 100644 --- a/ingestion/src/metadata/ingestion/source/bigquery.py +++ b/ingestion/src/metadata/ingestion/source/bigquery.py @@ -14,7 +14,7 @@ # limitations under the License. from typing import Optional, Tuple - +import os from metadata.generated.schema.entity.data.table import TableData # This import verifies that the dependencies are available. @@ -41,12 +41,13 @@ class BigquerySource(SQLSource): @classmethod def create(cls, config_dict, metadata_config_dict, ctx): - config = BigQueryConfig.parse_obj(config_dict) + config: SQLConnectionConfig = BigQueryConfig.parse_obj(config_dict) metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = config.options['credentials_path'] return cls(config, metadata_config, ctx) def standardize_schema_table_names( - self, schema: str, table: str + self, schema: str, table: str ) -> Tuple[str, str]: segments = table.split(".") if len(segments) != 2: diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index 5feaa9198d5..e866ab7200b 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -18,6 +18,7 @@ import uuid from abc import abstractmethod from dataclasses import dataclass, field from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Type +from urllib.parse import quote_plus from pydantic import ValidationError @@ -28,7 +29,8 @@ from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.entity.data.database import Database -from metadata.generated.schema.entity.data.table import Table, Column, ColumnConstraint, TableType, TableData, \ +from metadata.generated.schema.entity.data.table import Table, Column, ColumnConstraint, TableType, \ + TableData, \ TableProfile from sqlalchemy import create_engine from sqlalchemy.engine.reflection import Inspector @@ -57,7 +59,9 @@ class SQLSourceStatus(SourceStatus): self.success.append(table_name) logger.info('Table Scanned: {}'.format(table_name)) - def filter(self, table_name: str, err: str, dataset_name: str = None, col_type: str = None) -> None: + def filter( + self, table_name: str, err: str, dataset_name: str = None, col_type: str = None + ) -> None: self.filtered.append(table_name) logger.warning("Dropped Table {} due to {}".format(table_name, err)) @@ -83,10 +87,10 @@ class SQLConnectionConfig(ConfigModel): @abstractmethod def get_connection_url(self): url = f"{self.scheme}://" - if self.username: - url += f"{self.username}" - if self.password: - url += f":{self.password}" + if self.username is not None: + url += f"{quote_plus(self.username)}" + if self.password is not None: + url += f":{quote_plus(self.password)}" url += "@" url += f"{self.host_port}" if self.database: @@ -170,8 +174,10 @@ def _get_table_description(schema: str, table: str, inspector: Inspector) -> str class SQLSource(Source): - def __init__(self, config: SQLConnectionConfig, metadata_config: MetadataServerConfig, - ctx: WorkflowContext): + def __init__( + self, config: SQLConnectionConfig, metadata_config: MetadataServerConfig, + ctx: WorkflowContext + ): super().__init__(ctx) self.config = config self.metadata_config = metadata_config @@ -182,8 +188,10 @@ class SQLSource(Source): self.engine = create_engine(self.connection_string, **self.sql_config.options) self.connection = self.engine.connect() if self.config.data_profiler_enabled: - self.data_profiler = DataProfiler(status=self.status, - connection_str=self.connection_string) + self.data_profiler = DataProfiler( + status=self.status, + connection_str=self.connection_string, + ) def prepare(self): pass @@ -223,26 +231,32 @@ class SQLSource(Source): if self.config.include_views: yield from self.fetch_views(inspector, schema) - def fetch_tables(self, - inspector: Inspector, - schema: str) -> Iterable[OMetaDatabaseAndTable]: + def fetch_tables( + self, + inspector: Inspector, + schema: str + ) -> Iterable[OMetaDatabaseAndTable]: for table_name in inspector.get_table_names(schema): try: schema, table_name = self.standardize_schema_table_names(schema, table_name) if not self.sql_config.filter_pattern.included(table_name): - self.status.filter('{}.{}'.format(self.config.get_service_name(), table_name), - "Table pattern not allowed") + self.status.filter( + '{}.{}'.format(self.config.get_service_name(), table_name), + "Table pattern not allowed" + ) continue self.status.scanned('{}.{}'.format(self.config.get_service_name(), table_name)) description = _get_table_description(schema, table_name, inspector) table_columns = self._get_columns(schema, table_name, inspector) - table_entity = Table(id=uuid.uuid4(), - name=table_name, - tableType='Regular', - description=description if description is not None else ' ', - columns=table_columns) + table_entity = Table( + id=uuid.uuid4(), + name=table_name, + tableType='Regular', + description=description if description is not None else ' ', + columns=table_columns + ) if self.sql_config.generate_sample_data: table_data = self.fetch_sample_data(schema, table_name) table_entity.sampleData = table_data @@ -251,41 +265,61 @@ class SQLSource(Source): profile = self.run_data_profiler(table_name, schema) table_entity.tableProfile = profile - table_and_db = OMetaDatabaseAndTable(table=table_entity, database=self._get_database(schema)) + table_and_db = OMetaDatabaseAndTable( + table=table_entity, database=self._get_database(schema) + ) yield table_and_db except ValidationError as err: logger.error(err) self.status.failures.append('{}.{}'.format(self.config.service_name, table_name)) continue - def fetch_views(self, - inspector: Inspector, - schema: str) -> Iterable[OMetaDatabaseAndTable]: + def fetch_views( + self, + inspector: Inspector, + schema: str + ) -> Iterable[OMetaDatabaseAndTable]: for view_name in inspector.get_view_names(schema): try: + if self.config.scheme == "bigquery": + schema, view_name = self.standardize_schema_table_names(schema, view_name) if not self.sql_config.filter_pattern.included(view_name): - self.status.filter('{}.{}'.format(self.config.get_service_name(), view_name), - "View pattern not allowed") + self.status.filter( + '{}.{}'.format(self.config.get_service_name(), view_name), + "View pattern not allowed" + ) continue try: - view_definition = inspector.get_view_definition(view_name, schema) + + if self.config.scheme == "bigquery": + view_definition = inspector.get_view_definition( + f"{self.config.project_id}.{schema}.{view_name}" + ) + else: + view_definition = inspector.get_view_definition( + view_name, schema + ) view_definition = "" if view_definition is None else str(view_definition) except NotImplementedError: view_definition = "" description = _get_table_description(schema, view_name, inspector) table_columns = self._get_columns(schema, view_name, inspector) - table = Table(id=uuid.uuid4(), - name=view_name, - tableType='View', - description=description if description is not None else ' ', - columns=table_columns, - viewDefinition=view_definition) + table = Table( + id=uuid.uuid4(), + name=view_name, + tableType='View', + description=description if description is not None else ' ', + columns=table_columns, + viewDefinition=view_definition + ) if self.sql_config.generate_sample_data: table_data = self.fetch_sample_data(schema, view_name) table.sampleData = table_data - table_and_db = OMetaDatabaseAndTable(table=table, database=self._get_database(schema)) + table_and_db = OMetaDatabaseAndTable( + table=table, database=self._get_database(schema) + ) yield table_and_db except ValidationError as err: logger.error(err) @@ -293,13 +327,16 @@ class SQLSource(Source): continue def _get_database(self, schema: str) -> Database: - return Database(name=schema, - service=EntityReference(id=self.service.id, type=self.config.service_type)) + return Database( + name=schema, + service=EntityReference(id=self.service.id, type=self.config.service_type) + ) def _get_columns(self, schema: str, table: str, inspector: Inspector) -> List[Column]: pk_constraints = inspector.get_pk_constraint(table, schema) pk_columns = pk_constraints['column_constraints'] if len( - pk_constraints) > 0 and "column_constraints" in pk_constraints.keys() else {} + pk_constraints + ) > 0 and "column_constraints" in pk_constraints.keys() else {} unique_constraints = [] try: unique_constraints = inspector.get_unique_constraints(table, schema) @@ -329,11 +366,15 @@ class SQLSource(Source): col_constraint = ColumnConstraint.PRIMARY_KEY elif column['name'] in unique_columns: col_constraint = ColumnConstraint.UNIQUE - table_columns.append(Column(name=column['name'], - description=column.get("comment", None), - columnDataType=col_type, - columnConstraint=col_constraint, - ordinalPosition=row_order)) + table_columns.append( + Column( + name=column['name'], + description=column.get("comment", None), + columnDataType=col_type, + columnConstraint=col_constraint, + ordinalPosition=row_order + ) + ) row_order = row_order + 1 return table_columns @@ -345,14 +386,21 @@ class SQLSource(Source): ) -> TableProfile: dataset_name = f"{schema}.{table}" self.status.scanned(f"profile of {dataset_name}") - logger.info(f"Running Profiling for {dataset_name}. " - f"If you haven't configured offset and limit this process can take longer") + logger.info( + f"Running Profiling for {dataset_name}. " + f"If you haven't configured offset and limit this process can take longer" + + ) + if self.config.scheme == "bigquery": + table = dataset_name profile = self.data_profiler.run_profiler( dataset_name=dataset_name, schema=schema, table=table, limit=self.sql_config.data_profiler_limit, - offset=self.sql_config.data_profiler_offset) + offset=self.sql_config.data_profiler_offset, + project_id=self.config.project_id if self.config.scheme == "bigquery" else None + ) logger.debug(f"Finished profiling {dataset_name}") return profile