diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/helper.py b/ingestion/src/metadata/ingestion/source/database/bigquery/helper.py index ebba9613b84..6bbae67b602 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/helper.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/helper.py @@ -12,7 +12,7 @@ """ Source connection helper """ - +import traceback from typing import Any from pydantic import BaseModel @@ -26,7 +26,14 @@ from metadata.generated.schema.security.credentials.gcpValues import ( SingleProjectId, ) from metadata.ingestion.source.connections import get_connection +from metadata.ingestion.source.database.bigquery.queries import ( + BIGQUERY_FOREIGN_CONSTRAINTS, + BIGQUERY_TABLE_CONSTRAINTS, +) from metadata.utils.bigquery_utils import get_bigquery_client +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() class InspectorWrapper(BaseModel): @@ -64,3 +71,63 @@ def get_inspector_details( inspector = inspect(engine) return InspectorWrapper(client=client, engine=engine, inspector=inspector) + + +def get_pk_constraint( + self, connection, table_name, schema=None, **kw +): # pylint: disable=unused-argument + """ + This function overrides to get primary key constraint + """ + try: + table_constraints = connection.engine.execute( + BIGQUERY_TABLE_CONSTRAINTS.format( + project_id=connection.engine.url.host, + schema_name=schema, + table_name=table_name, + ) + ) + col_name = [] + for table_constraint in table_constraints: + col_name.append(table_constraint.column_name) + return {"constrained_columns": tuple(col_name)} + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Error while fetching primary key constraint error for table [{schema}.{table_name}]: {exc}" + ) + return {"constrained_columns": []} + + +def get_foreign_keys( + self, connection, table_name, schema=None, **kw +): # pylint: disable=unused-argument + """ + This function overrides to get foreign key constraint + """ + try: + table_constraints = connection.engine.execute( + BIGQUERY_FOREIGN_CONSTRAINTS.format( + project_id=connection.engine.url.host, + schema_name=schema, + table_name=table_name, + ) + ) + col_name = [] + for table_constraint in table_constraints: + col_name.append( + { + "name": table_constraint.name, + "referred_schema": table_constraint.referred_schema, + "referred_table": table_constraint.referred_table, + "constrained_columns": [table_constraint.constrained_columns], + "referred_columns": [table_constraint.referred_columns], + } + ) + return col_name + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Error while fetching foreign key constraint error for table [{schema}.{table_name}]: {exc}" + ) + return [] diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py index 1abd2ab9b1a..28fc2665948 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py @@ -56,7 +56,11 @@ from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_test_connection_fn -from metadata.ingestion.source.database.bigquery.helper import get_inspector_details +from metadata.ingestion.source.database.bigquery.helper import ( + get_foreign_keys, + get_inspector_details, + get_pk_constraint, +) from metadata.ingestion.source.database.bigquery.models import ( STORED_PROC_LANGUAGE_MAP, BigQueryStoredProcedure, @@ -194,6 +198,8 @@ def _build_formatted_table_id(table): BigQueryDialect._build_formatted_table_id = ( # pylint: disable=protected-access _build_formatted_table_id ) +BigQueryDialect.get_pk_constraint = get_pk_constraint +BigQueryDialect.get_foreign_keys = get_foreign_keys class BigquerySource( diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/queries.py b/ingestion/src/metadata/ingestion/source/database/bigquery/queries.py index 02de4f2c098..f4423d09cbb 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/queries.py @@ -58,6 +58,28 @@ BIGQUERY_TABLE_AND_TYPE = textwrap.dedent( """ ) +BIGQUERY_TABLE_CONSTRAINTS = textwrap.dedent( + """ + SELECT * + FROM `{project_id}`.{schema_name}.INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE + WHERE table_name = '{table_name}' AND constraint_name LIKE '%pk$'; + """ +) + +BIGQUERY_FOREIGN_CONSTRAINTS = textwrap.dedent( + """ + SELECT + c.table_name AS referred_table, + r.table_schema as referred_schema, + r.constraint_name as name, + c.column_name as referred_columns, + c.column_name as constrained_columns + FROM `{project_id}`.{schema_name}.INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE c + JOIN `{project_id}`.{schema_name}.INFORMATION_SCHEMA.TABLE_CONSTRAINTS r ON c.constraint_name = r.constraint_name + WHERE r.constraint_type = 'FOREIGN KEY' AND r.table_name='{table_name}'; + """ +) + BIGQUERY_GET_STORED_PROCEDURES = textwrap.dedent( """ SELECT diff --git a/ingestion/tests/unit/topology/database/test_bigquery.py b/ingestion/tests/unit/topology/database/test_bigquery.py index e152198e559..44dfcbc3c96 100644 --- a/ingestion/tests/unit/topology/database/test_bigquery.py +++ b/ingestion/tests/unit/topology/database/test_bigquery.py @@ -14,13 +14,43 @@ bigquery unit tests """ # pylint: disable=line-too-long +import types +from typing import Dict from unittest import TestCase from unittest.mock import Mock, patch -from metadata.generated.schema.entity.data.table import TableType +from sqlalchemy import Integer, String + +from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest +from metadata.generated.schema.api.data.createDatabaseSchema import ( + CreateDatabaseSchemaRequest, +) +from metadata.generated.schema.api.data.createTable import CreateTableRequest +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema +from metadata.generated.schema.entity.data.table import ( + Column, + Table, + TableConstraint, + TableType, +) +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseConnection, + DatabaseService, + DatabaseServiceType, +) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, ) +from metadata.generated.schema.type.basic import ( + EntityName, + FullyQualifiedEntityName, + SourceUrl, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.bigquery.lineage import BigqueryLineageSource from metadata.ingestion.source.database.bigquery.metadata import BigquerySource @@ -31,7 +61,7 @@ mock_bq_config = { "serviceConnection": { "config": {"type": "BigQuery", "credentials": {"gcpConfig": {}}} }, - "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, + "sourceConfig": {"config": {"type": "DatabaseMetadata", "includeTags": False}}, }, "sink": {"type": "metadata-rest", "config": {}}, "workflowConfig": { @@ -56,8 +86,465 @@ MOCK_SCHEMA_NAME = "test_omd" MOCK_TABLE_NAME = "customer_products" EXPECTED_URL = "https://console.cloud.google.com/bigquery?project=random-project-id&ws=!1m5!1m4!4m3!1srandom-project-id!2stest_omd!3scustomer_products" +MOCK_DATABASE_SERVICE = DatabaseService( + id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb", + name="bigquery_source_test", + connection=DatabaseConnection(), + serviceType=DatabaseServiceType.Hive, +) + +MOCK_DATABASE_SCHEMA = DatabaseSchema( + id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb", + name="sample_schema", + fullyQualifiedName="bigquery_source_test.random-project-id.sample_schema", + service=EntityReference(id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb", type="database"), + database=EntityReference( + id="a58b1856-729c-493b-bc87-6d2269b43ec0", + type="database", + ), +) + +MOCK_TABLE = Table( + id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb", + name=EntityName(__root__="customers"), + displayName=None, + description=None, + tableType="Regular", + columns=[ + Column( + name="customer_id", + displayName=None, + dataType="INT", + arrayDataType=None, + dataLength=1, + precision=None, + scale=None, + dataTypeDisplay="INTEGER", + description=None, + fullyQualifiedName=None, + tags=None, + constraint="PRIMARY_KEY", + ordinalPosition=None, + jsonSchema=None, + children=None, + profile=None, + customMetrics=None, + ), + Column( + name="first_name", + displayName=None, + dataType="STRING", + arrayDataType=None, + dataLength=1, + precision=None, + scale=None, + dataTypeDisplay="VARCHAR", + description=None, + fullyQualifiedName=None, + tags=None, + constraint="NULL", + ordinalPosition=None, + jsonSchema=None, + children=None, + profile=None, + customMetrics=None, + ), + Column( + name="last_name", + displayName=None, + dataType="STRING", + arrayDataType=None, + dataLength=1, + precision=None, + scale=None, + dataTypeDisplay="VARCHAR", + description=None, + fullyQualifiedName=None, + tags=None, + constraint="NULL", + ordinalPosition=None, + jsonSchema=None, + children=None, + profile=None, + customMetrics=None, + ), + ], + tableConstraints=[], + tablePartition=None, + tableProfilerConfig=None, + owner=None, + databaseSchema=EntityReference( + id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb", type="databaseSchema" + ), + tags=[], + viewDefinition=None, + retentionPeriod=None, + extension=None, + sourceUrl=SourceUrl( + __root__="https://console.cloud.google.com/bigquery?project=random-project-id&ws=!1m5!1m4!4m3!1srandom-project-id!2ssample_schema!3scustomers" + ), + domain=None, + dataProducts=None, + fileFormat=None, + lifeCycle=None, + sourceHash=None, +) + +EXPECTED_DATABASE = [ + CreateDatabaseRequest( + name=EntityName(__root__="random-project-id"), + displayName=None, + description=None, + tags=[], + owner=None, + service=FullyQualifiedEntityName(__root__="bigquery_source_test"), + dataProducts=None, + default=False, + retentionPeriod=None, + extension=None, + sourceUrl=SourceUrl( + __root__="https://console.cloud.google.com/bigquery?project=random-project-id" + ), + domain=None, + lifeCycle=None, + sourceHash=None, + ) +] +EXPTECTED_DATABASE_SCHEMA = [ + CreateDatabaseSchemaRequest( + name=EntityName(__root__="sample_schema"), + displayName=None, + description="", + owner=None, + database=FullyQualifiedEntityName( + __root__="bigquery_source_test.random-project-id" + ), + dataProducts=None, + tags=None, + retentionPeriod=None, + extension=None, + sourceUrl=SourceUrl( + __root__="https://console.cloud.google.com/bigquery?project=random-project-id&ws=!1m4!1m3!3m2!1srandom-project-id!2ssample_schema" + ), + domain=None, + lifeCycle=None, + sourceHash=None, + ) +] + +MOCK_TABLE_NAMES = [tuple(("customers", "Regular")), tuple(("orders", "Regular"))] + +MOCK_COLUMN_DATA = [ + [ + { + "name": "customer_id", + "type": Integer(), + "nullable": True, + "comment": None, + "default": None, + "precision": None, + "scale": None, + "max_length": None, + "system_data_type": "INTEGER", + "is_complex": False, + "policy_tags": None, + }, + { + "name": "first_name", + "type": String(), + "nullable": True, + "comment": None, + "default": None, + "precision": None, + "scale": None, + "max_length": None, + "system_data_type": "VARCHAR", + "is_complex": False, + "policy_tags": None, + }, + { + "name": "last_name", + "type": String(), + "nullable": True, + "comment": None, + "default": None, + "precision": None, + "scale": None, + "max_length": None, + "system_data_type": "VARCHAR", + "is_complex": False, + "policy_tags": None, + }, + ], + [ + { + "name": "order_id", + "type": Integer(), + "nullable": True, + "comment": None, + "default": None, + "precision": None, + "scale": None, + "max_length": None, + "system_data_type": "INTEGER", + "is_complex": False, + "policy_tags": None, + }, + { + "name": "customer_id", + "type": Integer(), + "nullable": True, + "comment": None, + "default": None, + "precision": None, + "scale": None, + "max_length": None, + "system_data_type": "INTEGER", + "is_complex": False, + "policy_tags": None, + }, + { + "name": "status", + "type": String(), + "nullable": True, + "comment": None, + "default": None, + "precision": None, + "scale": None, + "max_length": None, + "system_data_type": "VARCHAR", + "is_complex": False, + "policy_tags": None, + }, + ], +] + +MOCK_PK_CONSTRAINT: Dict[str, Dict] = { + "customers": dict({"constrained_columns": ("customer_id",)}), + "orders": dict({"constrained_columns": ()}), +} + +MOCK_FK_CONSTRAINT = { + "customers": [], + "orders": [ + { + "name": "orders.fk$1", + "referred_schema": "demo_dbt_jaffle", + "referred_table": "customers", + "constrained_columns": ["customer_id"], + "referred_columns": ["customer_id"], + } + ], +} + +EXPECTED_TABLE = [ + [ + CreateTableRequest( + name=EntityName(__root__="customers"), + displayName=None, + description=None, + tableType="Regular", + columns=[ + Column( + name="customer_id", + displayName=None, + dataType="INT", + arrayDataType=None, + dataLength=1, + precision=None, + scale=None, + dataTypeDisplay="INTEGER", + description=None, + fullyQualifiedName=None, + tags=None, + constraint="PRIMARY_KEY", + ordinalPosition=None, + jsonSchema=None, + children=None, + profile=None, + customMetrics=None, + ), + Column( + name="first_name", + displayName=None, + dataType="STRING", + arrayDataType=None, + dataLength=1, + precision=None, + scale=None, + dataTypeDisplay="VARCHAR", + description=None, + fullyQualifiedName=None, + tags=None, + constraint="NULL", + ordinalPosition=None, + jsonSchema=None, + children=None, + profile=None, + customMetrics=None, + ), + Column( + name="last_name", + displayName=None, + dataType="STRING", + arrayDataType=None, + dataLength=1, + precision=None, + scale=None, + dataTypeDisplay="VARCHAR", + description=None, + fullyQualifiedName=None, + tags=None, + constraint="NULL", + ordinalPosition=None, + jsonSchema=None, + children=None, + profile=None, + customMetrics=None, + ), + ], + tableConstraints=[], + tablePartition=None, + tableProfilerConfig=None, + owner=None, + databaseSchema=FullyQualifiedEntityName( + __root__="bigquery_source_test.random-project-id.sample_schema" + ), + tags=[], + viewDefinition=None, + retentionPeriod=None, + extension=None, + sourceUrl=SourceUrl( + __root__="https://console.cloud.google.com/bigquery?project=random-project-id&ws=!1m5!1m4!4m3!1srandom-project-id!2ssample_schema!3scustomers" + ), + domain=None, + dataProducts=None, + fileFormat=None, + lifeCycle=None, + sourceHash=None, + ) + ], + [ + CreateTableRequest( + name=EntityName(__root__="orders"), + displayName=None, + description=None, + tableType="Regular", + columns=[ + Column( + name="order_id", + displayName=None, + dataType="INT", + arrayDataType=None, + dataLength=1, + precision=None, + scale=None, + dataTypeDisplay="INTEGER", + description=None, + fullyQualifiedName=None, + tags=None, + constraint="NULL", + ordinalPosition=None, + jsonSchema=None, + children=None, + profile=None, + customMetrics=None, + ), + Column( + name="customer_id", + displayName=None, + dataType="INT", + arrayDataType=None, + dataLength=1, + precision=None, + scale=None, + dataTypeDisplay="INTEGER", + description=None, + fullyQualifiedName=None, + tags=None, + constraint="NULL", + ordinalPosition=None, + jsonSchema=None, + children=None, + profile=None, + customMetrics=None, + ), + Column( + name="status", + displayName=None, + dataType="STRING", + arrayDataType=None, + dataLength=1, + precision=None, + scale=None, + dataTypeDisplay="VARCHAR", + description=None, + fullyQualifiedName=None, + tags=None, + constraint="NULL", + ordinalPosition=None, + jsonSchema=None, + children=None, + profile=None, + customMetrics=None, + ), + ], + tableConstraints=[ + TableConstraint( + constraintType="FOREIGN_KEY", + columns=["customer_id"], + referredColumns=[ + FullyQualifiedEntityName( + __root__="bigquery_source_test.random-project-id.sample_schema.customers.customer_id" + ) + ], + ) + ], + tablePartition=None, + tableProfilerConfig=None, + owner=None, + databaseSchema=FullyQualifiedEntityName( + __root__="bigquery_source_test.random-project-id.sample_schema" + ), + tags=[], + viewDefinition=None, + retentionPeriod=None, + extension=None, + sourceUrl=SourceUrl( + __root__="https://console.cloud.google.com/bigquery?project=random-project-id&ws=!1m5!1m4!4m3!1srandom-project-id!2ssample_schema!3sorders" + ), + domain=None, + dataProducts=None, + fileFormat=None, + lifeCycle=None, + sourceHash=None, + ) + ], +] + + +MOCK_TABLE_CONSTRAINT = [ + [], + [ + TableConstraint( + constraintType="FOREIGN_KEY", + columns=["customer_id"], + referredColumns=[ + FullyQualifiedEntityName( + __root__="bigquery_source_test.random-project-id.sample_schema.customers.customer_id" + ) + ], + ) + ], +] + class BigqueryUnitTest(TestCase): + """ + Implements the necessary methods to extract + Bigquery Unit Test + """ + @patch( "metadata.ingestion.source.database.bigquery.metadata.BigquerySource._test_connection" ) @@ -71,12 +558,24 @@ class BigqueryUnitTest(TestCase): super().__init__(methodName) get_connection.return_value = Mock() test_connection.return_value = False - set_project_id.return_value = False + set_project_id.return_value = "random-project-id" self.config = OpenMetadataWorkflowConfig.parse_obj(mock_bq_config) - self.bq_source = BigquerySource.create( - mock_bq_config["source"], - self.config.workflowConfig.openMetadataServerConfig, + self.metadata = OpenMetadata( + OpenMetadataConnection.parse_obj( + mock_bq_config["workflowConfig"]["openMetadataServerConfig"] + ) ) + self.bq_source = BigquerySource.create(mock_bq_config["source"], self.metadata) + self.bq_source.context.__dict__[ + "database_service" + ] = MOCK_DATABASE_SERVICE.name.__root__ + self.bq_source.inspector = types.SimpleNamespace() + self.bq_source.inspector.get_pk_constraint = lambda table_name, schema: [] + self.bq_source.inspector.get_unique_constraints = ( + lambda table_name, schema_name: [] + ) + self.bq_source.inspector.get_foreign_keys = lambda table_name, schema: [] + self.bq_source.inspector.get_columns = lambda table_name, schema, db_name: [] def test_source_url(self): self.assertEqual( @@ -89,8 +588,74 @@ class BigqueryUnitTest(TestCase): EXPECTED_URL, ) + @patch( + "metadata.ingestion.source.database.database_service.DatabaseServiceSource.get_database_tag_labels" + ) + def test_yield_database(self, get_database_tag_labels): + get_database_tag_labels.return_value = [] + assert EXPECTED_DATABASE == [ + either.right for either in self.bq_source.yield_database(MOCK_DB_NAME) + ] + + def test_yield_database_schema(self): + assert EXPTECTED_DATABASE_SCHEMA == [ + either.right + for either in self.bq_source.yield_database_schema( + schema_name=MOCK_DATABASE_SCHEMA.name.__root__ + ) + ] + + @patch( + "metadata.ingestion.source.database.bigquery.metadata.BigquerySource.get_tag_labels" + ) + @patch( + "metadata.ingestion.source.database.bigquery.metadata.BigquerySource.get_table_partition_details" + ) + @patch( + "metadata.ingestion.source.database.common_db_source.CommonDbSourceService._get_foreign_constraints" + ) + def test_get_columns_with_constraints( + self, _get_foreign_constraints, get_table_partition_details, get_tag_labels + ): + """ + Test different constraint type ingested as expected + """ + + get_tag_labels.return_value = [] + get_table_partition_details.return_value = False, None + self.bq_source.context.__dict__["database"] = MOCK_DB_NAME + self.bq_source.context.__dict__[ + "database_schema" + ] = MOCK_DATABASE_SCHEMA.name.__root__ + + for i, table in enumerate(MOCK_TABLE_NAMES): + _get_foreign_constraints.return_value = MOCK_TABLE_CONSTRAINT[i] + self.bq_source.inspector.get_pk_constraint = ( + lambda table_name, schema: MOCK_PK_CONSTRAINT[ + table[0] + ] # pylint: disable=cell-var-from-loop + ) + self.bq_source.inspector.get_foreign_keys = ( + lambda table_name, schema: MOCK_FK_CONSTRAINT[ + table[0] + ] # pylint: disable=cell-var-from-loop + ) + self.bq_source.inspector.get_columns = ( + lambda table_name, schema, db_name: MOCK_COLUMN_DATA[ + i + ] # pylint: disable=cell-var-from-loop + ) + assert EXPECTED_TABLE[i] == [ + either.right for either in self.bq_source.yield_table(table) + ] + class BigqueryLineageSourceTest(TestCase): + """ + Implements the necessary methods to extract + Bigquery Lineage Test + """ + @patch("metadata.ingestion.source.database.bigquery.connection.get_connection") @patch("metadata.ingestion.source.database.bigquery.connection.test_connection") @patch( @@ -99,9 +664,9 @@ class BigqueryLineageSourceTest(TestCase): def __init__( self, methodName, - set_project_id_lineage, - test_connection, - get_connection, + set_project_id_lineage, # pylint: disable=unused-argument + test_connection, # pylint: disable=unused-argument + get_connection, # pylint: disable=unused-argument ) -> None: super().__init__(methodName)