#14930 bigquery support for pk, fk and column view description (#15042)

This commit is contained in:
NiharDoshi99 2024-02-07 16:49:27 +05:30 committed by GitHub
parent cb59514978
commit 2b56e34b19
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 671 additions and 11 deletions

View File

@ -12,7 +12,7 @@
"""
Source connection helper
"""
import traceback
from typing import Any
from pydantic import BaseModel
@ -26,7 +26,14 @@ from metadata.generated.schema.security.credentials.gcpValues import (
SingleProjectId,
)
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.bigquery.queries import (
BIGQUERY_FOREIGN_CONSTRAINTS,
BIGQUERY_TABLE_CONSTRAINTS,
)
from metadata.utils.bigquery_utils import get_bigquery_client
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class InspectorWrapper(BaseModel):
@ -64,3 +71,63 @@ def get_inspector_details(
inspector = inspect(engine)
return InspectorWrapper(client=client, engine=engine, inspector=inspector)
def get_pk_constraint(
self, connection, table_name, schema=None, **kw
): # pylint: disable=unused-argument
"""
This function overrides to get primary key constraint
"""
try:
table_constraints = connection.engine.execute(
BIGQUERY_TABLE_CONSTRAINTS.format(
project_id=connection.engine.url.host,
schema_name=schema,
table_name=table_name,
)
)
col_name = []
for table_constraint in table_constraints:
col_name.append(table_constraint.column_name)
return {"constrained_columns": tuple(col_name)}
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Error while fetching primary key constraint error for table [{schema}.{table_name}]: {exc}"
)
return {"constrained_columns": []}
def get_foreign_keys(
self, connection, table_name, schema=None, **kw
): # pylint: disable=unused-argument
"""
This function overrides to get foreign key constraint
"""
try:
table_constraints = connection.engine.execute(
BIGQUERY_FOREIGN_CONSTRAINTS.format(
project_id=connection.engine.url.host,
schema_name=schema,
table_name=table_name,
)
)
col_name = []
for table_constraint in table_constraints:
col_name.append(
{
"name": table_constraint.name,
"referred_schema": table_constraint.referred_schema,
"referred_table": table_constraint.referred_table,
"constrained_columns": [table_constraint.constrained_columns],
"referred_columns": [table_constraint.referred_columns],
}
)
return col_name
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Error while fetching foreign key constraint error for table [{schema}.{table_name}]: {exc}"
)
return []

View File

@ -56,7 +56,11 @@ from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_test_connection_fn
from metadata.ingestion.source.database.bigquery.helper import get_inspector_details
from metadata.ingestion.source.database.bigquery.helper import (
get_foreign_keys,
get_inspector_details,
get_pk_constraint,
)
from metadata.ingestion.source.database.bigquery.models import (
STORED_PROC_LANGUAGE_MAP,
BigQueryStoredProcedure,
@ -194,6 +198,8 @@ def _build_formatted_table_id(table):
BigQueryDialect._build_formatted_table_id = ( # pylint: disable=protected-access
_build_formatted_table_id
)
BigQueryDialect.get_pk_constraint = get_pk_constraint
BigQueryDialect.get_foreign_keys = get_foreign_keys
class BigquerySource(

View File

@ -58,6 +58,28 @@ BIGQUERY_TABLE_AND_TYPE = textwrap.dedent(
"""
)
BIGQUERY_TABLE_CONSTRAINTS = textwrap.dedent(
"""
SELECT *
FROM `{project_id}`.{schema_name}.INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE
WHERE table_name = '{table_name}' AND constraint_name LIKE '%pk$';
"""
)
BIGQUERY_FOREIGN_CONSTRAINTS = textwrap.dedent(
"""
SELECT
c.table_name AS referred_table,
r.table_schema as referred_schema,
r.constraint_name as name,
c.column_name as referred_columns,
c.column_name as constrained_columns
FROM `{project_id}`.{schema_name}.INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE c
JOIN `{project_id}`.{schema_name}.INFORMATION_SCHEMA.TABLE_CONSTRAINTS r ON c.constraint_name = r.constraint_name
WHERE r.constraint_type = 'FOREIGN KEY' AND r.table_name='{table_name}';
"""
)
BIGQUERY_GET_STORED_PROCEDURES = textwrap.dedent(
"""
SELECT

View File

@ -14,13 +14,43 @@ bigquery unit tests
"""
# pylint: disable=line-too-long
import types
from typing import Dict
from unittest import TestCase
from unittest.mock import Mock, patch
from metadata.generated.schema.entity.data.table import TableType
from sqlalchemy import Integer, String
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import (
Column,
Table,
TableConstraint,
TableType,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.basic import (
EntityName,
FullyQualifiedEntityName,
SourceUrl,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.bigquery.lineage import BigqueryLineageSource
from metadata.ingestion.source.database.bigquery.metadata import BigquerySource
@ -31,7 +61,7 @@ mock_bq_config = {
"serviceConnection": {
"config": {"type": "BigQuery", "credentials": {"gcpConfig": {}}}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
"sourceConfig": {"config": {"type": "DatabaseMetadata", "includeTags": False}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
@ -56,8 +86,465 @@ MOCK_SCHEMA_NAME = "test_omd"
MOCK_TABLE_NAME = "customer_products"
EXPECTED_URL = "https://console.cloud.google.com/bigquery?project=random-project-id&ws=!1m5!1m4!4m3!1srandom-project-id!2stest_omd!3scustomer_products"
MOCK_DATABASE_SERVICE = DatabaseService(
id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb",
name="bigquery_source_test",
connection=DatabaseConnection(),
serviceType=DatabaseServiceType.Hive,
)
MOCK_DATABASE_SCHEMA = DatabaseSchema(
id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb",
name="sample_schema",
fullyQualifiedName="bigquery_source_test.random-project-id.sample_schema",
service=EntityReference(id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb", type="database"),
database=EntityReference(
id="a58b1856-729c-493b-bc87-6d2269b43ec0",
type="database",
),
)
MOCK_TABLE = Table(
id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb",
name=EntityName(__root__="customers"),
displayName=None,
description=None,
tableType="Regular",
columns=[
Column(
name="customer_id",
displayName=None,
dataType="INT",
arrayDataType=None,
dataLength=1,
precision=None,
scale=None,
dataTypeDisplay="INTEGER",
description=None,
fullyQualifiedName=None,
tags=None,
constraint="PRIMARY_KEY",
ordinalPosition=None,
jsonSchema=None,
children=None,
profile=None,
customMetrics=None,
),
Column(
name="first_name",
displayName=None,
dataType="STRING",
arrayDataType=None,
dataLength=1,
precision=None,
scale=None,
dataTypeDisplay="VARCHAR",
description=None,
fullyQualifiedName=None,
tags=None,
constraint="NULL",
ordinalPosition=None,
jsonSchema=None,
children=None,
profile=None,
customMetrics=None,
),
Column(
name="last_name",
displayName=None,
dataType="STRING",
arrayDataType=None,
dataLength=1,
precision=None,
scale=None,
dataTypeDisplay="VARCHAR",
description=None,
fullyQualifiedName=None,
tags=None,
constraint="NULL",
ordinalPosition=None,
jsonSchema=None,
children=None,
profile=None,
customMetrics=None,
),
],
tableConstraints=[],
tablePartition=None,
tableProfilerConfig=None,
owner=None,
databaseSchema=EntityReference(
id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb", type="databaseSchema"
),
tags=[],
viewDefinition=None,
retentionPeriod=None,
extension=None,
sourceUrl=SourceUrl(
__root__="https://console.cloud.google.com/bigquery?project=random-project-id&ws=!1m5!1m4!4m3!1srandom-project-id!2ssample_schema!3scustomers"
),
domain=None,
dataProducts=None,
fileFormat=None,
lifeCycle=None,
sourceHash=None,
)
EXPECTED_DATABASE = [
CreateDatabaseRequest(
name=EntityName(__root__="random-project-id"),
displayName=None,
description=None,
tags=[],
owner=None,
service=FullyQualifiedEntityName(__root__="bigquery_source_test"),
dataProducts=None,
default=False,
retentionPeriod=None,
extension=None,
sourceUrl=SourceUrl(
__root__="https://console.cloud.google.com/bigquery?project=random-project-id"
),
domain=None,
lifeCycle=None,
sourceHash=None,
)
]
EXPTECTED_DATABASE_SCHEMA = [
CreateDatabaseSchemaRequest(
name=EntityName(__root__="sample_schema"),
displayName=None,
description="",
owner=None,
database=FullyQualifiedEntityName(
__root__="bigquery_source_test.random-project-id"
),
dataProducts=None,
tags=None,
retentionPeriod=None,
extension=None,
sourceUrl=SourceUrl(
__root__="https://console.cloud.google.com/bigquery?project=random-project-id&ws=!1m4!1m3!3m2!1srandom-project-id!2ssample_schema"
),
domain=None,
lifeCycle=None,
sourceHash=None,
)
]
MOCK_TABLE_NAMES = [tuple(("customers", "Regular")), tuple(("orders", "Regular"))]
MOCK_COLUMN_DATA = [
[
{
"name": "customer_id",
"type": Integer(),
"nullable": True,
"comment": None,
"default": None,
"precision": None,
"scale": None,
"max_length": None,
"system_data_type": "INTEGER",
"is_complex": False,
"policy_tags": None,
},
{
"name": "first_name",
"type": String(),
"nullable": True,
"comment": None,
"default": None,
"precision": None,
"scale": None,
"max_length": None,
"system_data_type": "VARCHAR",
"is_complex": False,
"policy_tags": None,
},
{
"name": "last_name",
"type": String(),
"nullable": True,
"comment": None,
"default": None,
"precision": None,
"scale": None,
"max_length": None,
"system_data_type": "VARCHAR",
"is_complex": False,
"policy_tags": None,
},
],
[
{
"name": "order_id",
"type": Integer(),
"nullable": True,
"comment": None,
"default": None,
"precision": None,
"scale": None,
"max_length": None,
"system_data_type": "INTEGER",
"is_complex": False,
"policy_tags": None,
},
{
"name": "customer_id",
"type": Integer(),
"nullable": True,
"comment": None,
"default": None,
"precision": None,
"scale": None,
"max_length": None,
"system_data_type": "INTEGER",
"is_complex": False,
"policy_tags": None,
},
{
"name": "status",
"type": String(),
"nullable": True,
"comment": None,
"default": None,
"precision": None,
"scale": None,
"max_length": None,
"system_data_type": "VARCHAR",
"is_complex": False,
"policy_tags": None,
},
],
]
MOCK_PK_CONSTRAINT: Dict[str, Dict] = {
"customers": dict({"constrained_columns": ("customer_id",)}),
"orders": dict({"constrained_columns": ()}),
}
MOCK_FK_CONSTRAINT = {
"customers": [],
"orders": [
{
"name": "orders.fk$1",
"referred_schema": "demo_dbt_jaffle",
"referred_table": "customers",
"constrained_columns": ["customer_id"],
"referred_columns": ["customer_id"],
}
],
}
EXPECTED_TABLE = [
[
CreateTableRequest(
name=EntityName(__root__="customers"),
displayName=None,
description=None,
tableType="Regular",
columns=[
Column(
name="customer_id",
displayName=None,
dataType="INT",
arrayDataType=None,
dataLength=1,
precision=None,
scale=None,
dataTypeDisplay="INTEGER",
description=None,
fullyQualifiedName=None,
tags=None,
constraint="PRIMARY_KEY",
ordinalPosition=None,
jsonSchema=None,
children=None,
profile=None,
customMetrics=None,
),
Column(
name="first_name",
displayName=None,
dataType="STRING",
arrayDataType=None,
dataLength=1,
precision=None,
scale=None,
dataTypeDisplay="VARCHAR",
description=None,
fullyQualifiedName=None,
tags=None,
constraint="NULL",
ordinalPosition=None,
jsonSchema=None,
children=None,
profile=None,
customMetrics=None,
),
Column(
name="last_name",
displayName=None,
dataType="STRING",
arrayDataType=None,
dataLength=1,
precision=None,
scale=None,
dataTypeDisplay="VARCHAR",
description=None,
fullyQualifiedName=None,
tags=None,
constraint="NULL",
ordinalPosition=None,
jsonSchema=None,
children=None,
profile=None,
customMetrics=None,
),
],
tableConstraints=[],
tablePartition=None,
tableProfilerConfig=None,
owner=None,
databaseSchema=FullyQualifiedEntityName(
__root__="bigquery_source_test.random-project-id.sample_schema"
),
tags=[],
viewDefinition=None,
retentionPeriod=None,
extension=None,
sourceUrl=SourceUrl(
__root__="https://console.cloud.google.com/bigquery?project=random-project-id&ws=!1m5!1m4!4m3!1srandom-project-id!2ssample_schema!3scustomers"
),
domain=None,
dataProducts=None,
fileFormat=None,
lifeCycle=None,
sourceHash=None,
)
],
[
CreateTableRequest(
name=EntityName(__root__="orders"),
displayName=None,
description=None,
tableType="Regular",
columns=[
Column(
name="order_id",
displayName=None,
dataType="INT",
arrayDataType=None,
dataLength=1,
precision=None,
scale=None,
dataTypeDisplay="INTEGER",
description=None,
fullyQualifiedName=None,
tags=None,
constraint="NULL",
ordinalPosition=None,
jsonSchema=None,
children=None,
profile=None,
customMetrics=None,
),
Column(
name="customer_id",
displayName=None,
dataType="INT",
arrayDataType=None,
dataLength=1,
precision=None,
scale=None,
dataTypeDisplay="INTEGER",
description=None,
fullyQualifiedName=None,
tags=None,
constraint="NULL",
ordinalPosition=None,
jsonSchema=None,
children=None,
profile=None,
customMetrics=None,
),
Column(
name="status",
displayName=None,
dataType="STRING",
arrayDataType=None,
dataLength=1,
precision=None,
scale=None,
dataTypeDisplay="VARCHAR",
description=None,
fullyQualifiedName=None,
tags=None,
constraint="NULL",
ordinalPosition=None,
jsonSchema=None,
children=None,
profile=None,
customMetrics=None,
),
],
tableConstraints=[
TableConstraint(
constraintType="FOREIGN_KEY",
columns=["customer_id"],
referredColumns=[
FullyQualifiedEntityName(
__root__="bigquery_source_test.random-project-id.sample_schema.customers.customer_id"
)
],
)
],
tablePartition=None,
tableProfilerConfig=None,
owner=None,
databaseSchema=FullyQualifiedEntityName(
__root__="bigquery_source_test.random-project-id.sample_schema"
),
tags=[],
viewDefinition=None,
retentionPeriod=None,
extension=None,
sourceUrl=SourceUrl(
__root__="https://console.cloud.google.com/bigquery?project=random-project-id&ws=!1m5!1m4!4m3!1srandom-project-id!2ssample_schema!3sorders"
),
domain=None,
dataProducts=None,
fileFormat=None,
lifeCycle=None,
sourceHash=None,
)
],
]
MOCK_TABLE_CONSTRAINT = [
[],
[
TableConstraint(
constraintType="FOREIGN_KEY",
columns=["customer_id"],
referredColumns=[
FullyQualifiedEntityName(
__root__="bigquery_source_test.random-project-id.sample_schema.customers.customer_id"
)
],
)
],
]
class BigqueryUnitTest(TestCase):
"""
Implements the necessary methods to extract
Bigquery Unit Test
"""
@patch(
"metadata.ingestion.source.database.bigquery.metadata.BigquerySource._test_connection"
)
@ -71,12 +558,24 @@ class BigqueryUnitTest(TestCase):
super().__init__(methodName)
get_connection.return_value = Mock()
test_connection.return_value = False
set_project_id.return_value = False
set_project_id.return_value = "random-project-id"
self.config = OpenMetadataWorkflowConfig.parse_obj(mock_bq_config)
self.bq_source = BigquerySource.create(
mock_bq_config["source"],
self.config.workflowConfig.openMetadataServerConfig,
self.metadata = OpenMetadata(
OpenMetadataConnection.parse_obj(
mock_bq_config["workflowConfig"]["openMetadataServerConfig"]
)
)
self.bq_source = BigquerySource.create(mock_bq_config["source"], self.metadata)
self.bq_source.context.__dict__[
"database_service"
] = MOCK_DATABASE_SERVICE.name.__root__
self.bq_source.inspector = types.SimpleNamespace()
self.bq_source.inspector.get_pk_constraint = lambda table_name, schema: []
self.bq_source.inspector.get_unique_constraints = (
lambda table_name, schema_name: []
)
self.bq_source.inspector.get_foreign_keys = lambda table_name, schema: []
self.bq_source.inspector.get_columns = lambda table_name, schema, db_name: []
def test_source_url(self):
self.assertEqual(
@ -89,8 +588,74 @@ class BigqueryUnitTest(TestCase):
EXPECTED_URL,
)
@patch(
"metadata.ingestion.source.database.database_service.DatabaseServiceSource.get_database_tag_labels"
)
def test_yield_database(self, get_database_tag_labels):
get_database_tag_labels.return_value = []
assert EXPECTED_DATABASE == [
either.right for either in self.bq_source.yield_database(MOCK_DB_NAME)
]
def test_yield_database_schema(self):
assert EXPTECTED_DATABASE_SCHEMA == [
either.right
for either in self.bq_source.yield_database_schema(
schema_name=MOCK_DATABASE_SCHEMA.name.__root__
)
]
@patch(
"metadata.ingestion.source.database.bigquery.metadata.BigquerySource.get_tag_labels"
)
@patch(
"metadata.ingestion.source.database.bigquery.metadata.BigquerySource.get_table_partition_details"
)
@patch(
"metadata.ingestion.source.database.common_db_source.CommonDbSourceService._get_foreign_constraints"
)
def test_get_columns_with_constraints(
self, _get_foreign_constraints, get_table_partition_details, get_tag_labels
):
"""
Test different constraint type ingested as expected
"""
get_tag_labels.return_value = []
get_table_partition_details.return_value = False, None
self.bq_source.context.__dict__["database"] = MOCK_DB_NAME
self.bq_source.context.__dict__[
"database_schema"
] = MOCK_DATABASE_SCHEMA.name.__root__
for i, table in enumerate(MOCK_TABLE_NAMES):
_get_foreign_constraints.return_value = MOCK_TABLE_CONSTRAINT[i]
self.bq_source.inspector.get_pk_constraint = (
lambda table_name, schema: MOCK_PK_CONSTRAINT[
table[0]
] # pylint: disable=cell-var-from-loop
)
self.bq_source.inspector.get_foreign_keys = (
lambda table_name, schema: MOCK_FK_CONSTRAINT[
table[0]
] # pylint: disable=cell-var-from-loop
)
self.bq_source.inspector.get_columns = (
lambda table_name, schema, db_name: MOCK_COLUMN_DATA[
i
] # pylint: disable=cell-var-from-loop
)
assert EXPECTED_TABLE[i] == [
either.right for either in self.bq_source.yield_table(table)
]
class BigqueryLineageSourceTest(TestCase):
"""
Implements the necessary methods to extract
Bigquery Lineage Test
"""
@patch("metadata.ingestion.source.database.bigquery.connection.get_connection")
@patch("metadata.ingestion.source.database.bigquery.connection.test_connection")
@patch(
@ -99,9 +664,9 @@ class BigqueryLineageSourceTest(TestCase):
def __init__(
self,
methodName,
set_project_id_lineage,
test_connection,
get_connection,
set_project_id_lineage, # pylint: disable=unused-argument
test_connection, # pylint: disable=unused-argument
get_connection, # pylint: disable=unused-argument
) -> None:
super().__init__(methodName)