OpenMetadata/ingestion/tests/unit/test_pgspider_lineage_unit.py
2025-04-03 10:39:47 +05:30

815 lines
28 KiB
Python

# Copyright 2021 Collate
# Copyright(c) 2023, TOSHIBA CORPORATION
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
PGSpider Lineage Unit Test
"""
import json
from pathlib import Path
from unittest import TestCase
from unittest.mock import patch
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.table import Column, DataType, Table
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.entityLineage import (
ColumnLineage,
EntitiesEdge,
LineageDetails,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.source.database.postgres.lineage import PostgresLineageSource
from metadata.ingestion.source.database.postgres.pgspider.lineage import (
get_lineage_from_multi_tenant_table,
)
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
mock_multi_tenant_file_path = (
Path(__file__).parent / "resources/datasets/pgspider_multi_tenant_tables.json"
)
with open(mock_multi_tenant_file_path, encoding="utf-8") as file:
mock_multi_tenant_data: dict = json.load(file)
mock_child_file_path = (
Path(__file__).parent / "resources/datasets/pgspider_child_tables.json"
)
with open(mock_child_file_path, encoding="utf-8") as file:
mock_child_data = json.load(file)
EXPECTED_PGSPIDER_DETAILS_1 = [
AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id="e3e1649a-97f4-4849-bc02-d8d67eab9722", type="table"
),
toEntity=EntityReference(
id="b3f7df8e-50de-4555-a497-c7e170f4de8e", type="table"
),
lineageDetails=LineageDetails(
columnsLineage=[
ColumnLineage(
fromColumns=[
"local_pgspider1.pgspider.public.test1__post_svr__0.id"
],
toColumn="local_pgspider1.pgspider.public.test1.id",
)
]
),
),
),
AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id="02f020df-ef8c-4156-9d02-a2ff40b9649b", type="table"
),
toEntity=EntityReference(
id="b3f7df8e-50de-4555-a497-c7e170f4de8e", type="table"
),
lineageDetails=LineageDetails(
columnsLineage=[
ColumnLineage(
fromColumns=[
"local_pgspider1.pgspider.public.test1__post_svr__1.id"
],
toColumn="local_pgspider1.pgspider.public.test1.id",
)
]
),
),
),
AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id="57ba2523-5424-467f-992a-afe29dc7e23d", type="table"
),
toEntity=EntityReference(
id="a68492cc-af89-4031-8b8e-bc31f2cedcd5", type="table"
),
lineageDetails=LineageDetails(
columnsLineage=[
ColumnLineage(
fromColumns=[
"local_pgspider1.pgspider.public.test2__post_svr__0.a"
],
toColumn="local_pgspider1.pgspider.public.test2.a",
),
ColumnLineage(
fromColumns=[
"local_pgspider1.pgspider.public.test2__post_svr__0.b"
],
toColumn="local_pgspider1.pgspider.public.test2.b",
),
ColumnLineage(
fromColumns=[
"local_pgspider1.pgspider.public.test2__post_svr__0.c"
],
toColumn="local_pgspider1.pgspider.public.test2.c",
),
]
),
),
),
]
EXPECTED_PGSPIDER_DETAILS_2 = [
AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id="e3e1649a-97f4-4849-bc02-d8d67eab9722", type="table"
),
toEntity=EntityReference(
id="b3f7df8e-50de-4555-a497-c7e170f4de8e", type="table"
),
lineageDetails=LineageDetails(
columnsLineage=[
ColumnLineage(
fromColumns=[
"local_pgspider1.pgspider.public.test1__post_svr__0.id"
],
toColumn="local_pgspider1.pgspider.public.test1.id",
)
]
),
),
),
AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id="02f020df-ef8c-4156-9d02-a2ff40b9649b", type="table"
),
toEntity=EntityReference(
id="b3f7df8e-50de-4555-a497-c7e170f4de8e", type="table"
),
lineageDetails=LineageDetails(
columnsLineage=[
ColumnLineage(
fromColumns=[
"local_pgspider1.pgspider.public.test1__post_svr__1.id"
],
toColumn="local_pgspider1.pgspider.public.test1.id",
)
]
),
),
),
AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id="57ba2523-5424-467f-992a-afe29dc7e23d", type="table"
),
toEntity=EntityReference(
id="a68492cc-af89-4031-8b8e-bc31f2cedcd5", type="table"
),
lineageDetails=LineageDetails(
columnsLineage=[
ColumnLineage(
fromColumns=[
"local_pgspider1.pgspider.public.test2__post_svr__0.a"
],
toColumn="local_pgspider1.pgspider.public.test2.a",
),
ColumnLineage(
fromColumns=[
"local_pgspider1.pgspider.public.test2__post_svr__0.b"
],
toColumn="local_pgspider1.pgspider.public.test2.b",
),
]
),
),
),
]
EXPECTED_PGSPIDER_DETAILS_3 = [
AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id="e3e1649a-97f4-4849-bc02-d8d67eab9722", type="table"
),
toEntity=EntityReference(
id="b3f7df8e-50de-4555-a497-c7e170f4de8e", type="table"
),
lineageDetails=LineageDetails(
sqlQuery=None, columnsLineage=[], pipeline=None
),
),
),
AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id="02f020df-ef8c-4156-9d02-a2ff40b9649b", type="table"
),
toEntity=EntityReference(
id="b3f7df8e-50de-4555-a497-c7e170f4de8e", type="table"
),
lineageDetails=LineageDetails(
sqlQuery=None, columnsLineage=[], pipeline=None
),
),
),
AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id="57ba2523-5424-467f-992a-afe29dc7e23d", type="table"
),
toEntity=EntityReference(
id="a68492cc-af89-4031-8b8e-bc31f2cedcd5", type="table"
),
lineageDetails=LineageDetails(
sqlQuery=None, columnsLineage=[], pipeline=None
),
),
),
]
mock_pgspider_config = {
"source": {
"type": "pgspider-lineage",
"serviceName": "local_pgspider1",
"serviceConnection": {
"config": {
"type": "Postgres",
"scheme": "pgspider+psycopg2",
"username": "openmetadata_user",
"hostPort": "localhost:4813",
"database": "pgspider",
}
},
"sourceConfig": {
"config": {
"type": "DatabaseLineage",
"queryLogDuration": 1,
"resultLimit": 10000,
}
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGc"
"iOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE"
"2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXB"
"iEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fN"
"r3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3u"
"d-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
},
}
table_entities_1 = [
[
Table(
id="b3f7df8e-50de-4555-a497-c7e170f4de8e",
name="test1",
columns=[
Column(
name="id",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1.id",
),
Column(
name="__spd_url",
dataType=DataType.TEXT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1.__spd_url",
),
],
)
],
[
Table(
id="e3e1649a-97f4-4849-bc02-d8d67eab9722",
name="test1__post_svr__0",
columns=[
Column(
name="id",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1__post_svr__0.id",
)
],
)
],
[
Table(
id="02f020df-ef8c-4156-9d02-a2ff40b9649b",
name="test1__post_svr__1",
columns=[
Column(
name="id",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1__post_svr__1.id",
)
],
)
],
[
Table(
id="a68492cc-af89-4031-8b8e-bc31f2cedcd5",
name="test2",
columns=[
Column(
name="a",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2.a",
),
Column(
name="b",
dataType=DataType.TEXT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2.b",
),
Column(
name="c",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2.c",
),
Column(
name="__spd_url",
dataType=DataType.TEXT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2.__spd_url",
),
],
)
],
[
Table(
id="57ba2523-5424-467f-992a-afe29dc7e23d",
name="test2__post_svr__0",
columns=[
Column(
name="a",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2__post_svr__0.a",
),
Column(
name="b",
dataType=DataType.TEXT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2__post_svr__0.b",
),
Column(
name="c",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2__post_svr__0.c",
),
],
)
],
]
table_entities_2 = [
[
Table(
id="b3f7df8e-50de-4555-a497-c7e170f4de8e",
name="test1",
columns=[
Column(
name="id",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1.id",
),
Column(
name="__spd_url",
dataType=DataType.TEXT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1.__spd_url",
),
],
)
],
[
Table(
id="e3e1649a-97f4-4849-bc02-d8d67eab9722",
name="test1__post_svr__0",
columns=[
Column(
name="id",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1__post_svr__0.id",
),
Column(
name="a",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1__post_svr__0.a",
),
],
)
],
[
Table(
id="02f020df-ef8c-4156-9d02-a2ff40b9649b",
name="test1__post_svr__1",
columns=[
Column(
name="id",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1__post_svr__1.id",
),
Column(
name="b",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1__post_svr__0.b",
),
],
)
],
[
Table(
id="a68492cc-af89-4031-8b8e-bc31f2cedcd5",
name="test2",
columns=[
Column(
name="a",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2.a",
),
Column(
name="b",
dataType=DataType.TEXT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2.b",
),
Column(
name="c",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2.c",
),
Column(
name="__spd_url",
dataType=DataType.TEXT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2.__spd_url",
),
],
)
],
[
Table(
id="57ba2523-5424-467f-992a-afe29dc7e23d",
name="test2__post_svr__0",
columns=[
Column(
name="a",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2__post_svr__0.a",
),
Column(
name="b",
dataType=DataType.TEXT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2__post_svr__0.b",
),
Column(
name="d",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2__post_svr__0.d",
),
],
)
],
]
table_entities_3 = [
[
Table(
id="b3f7df8e-50de-4555-a497-c7e170f4de8e",
name="test1",
columns=[
Column(
name="id",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1.id",
),
Column(
name="__spd_url",
dataType=DataType.TEXT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1.__spd_url",
),
],
)
],
[
Table(
id="e3e1649a-97f4-4849-bc02-d8d67eab9722",
name="test1__post_svr__0",
columns=[
Column(
name="a",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1__post_svr__0.a",
)
],
)
],
[
Table(
id="02f020df-ef8c-4156-9d02-a2ff40b9649b",
name="test1__post_svr__1",
columns=[
Column(
name="a",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1__post_svr__1.a",
)
],
)
],
[
Table(
id="a68492cc-af89-4031-8b8e-bc31f2cedcd5",
name="test2",
columns=[
Column(
name="a",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2.a",
),
Column(
name="b",
dataType=DataType.TEXT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2.b",
),
Column(
name="c",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2.c",
),
Column(
name="__spd_url",
dataType=DataType.TEXT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2.__spd_url",
),
],
)
],
[
Table(
id="57ba2523-5424-467f-992a-afe29dc7e23d",
name="test2__post_svr__0",
columns=[
Column(
name="d",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2__post_svr__0.d",
),
Column(
name="e",
dataType=DataType.TEXT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2__post_svr__0.e",
),
Column(
name="f",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2__post_svr__0.f",
),
],
)
],
]
class PGSpiderLineageUnitTests(TestCase):
"""
Implements the necessary methods to extract
PGSpider lineage test
"""
def __init__(self, methodName) -> None:
super().__init__(methodName)
config = OpenMetadataWorkflowConfig.model_validate(mock_pgspider_config)
with patch(
"metadata.ingestion.source.database.postgres.lineage.PostgresLineageSource.test_connection"
):
self.postgres = PostgresLineageSource.create(
mock_pgspider_config["source"],
config.workflowConfig.openMetadataServerConfig,
)
print(type(self.postgres))
@patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage._get_multi_tenant_tables"
)
def test_next_record_1(self, multi_tenant_tables):
"""
Verify normal case:
There is multi-tenant table and foreign table.
All columns (except __spd_url) of multi tenant tables and child foreign tables are matched.
"""
"""
Mock valid values for get_multi_tenant_tables and get_child_tables
"""
multi_tenant_tables.return_value = mock_multi_tenant_data
with patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage.search_table_entities"
) as source_entities, patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage._get_child_tables"
) as child_tables:
child_tables.side_effect = mock_child_data
source_entities.side_effect = table_entities_1
requests = []
for record in get_lineage_from_multi_tenant_table(
self.postgres.metadata,
connection=self.postgres.service_connection,
service_name=self.postgres.config.serviceName,
):
if isinstance(record, AddLineageRequest):
requests.append(record)
"""Validate each AddLineageRequest"""
for _, (expected, original) in enumerate(
zip(EXPECTED_PGSPIDER_DETAILS_1, requests)
):
self.assertEqual(expected, original)
@patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage._get_multi_tenant_tables"
)
def test_next_record_2(self, multi_tenant_tables):
"""
Verify normal case:
There is multi-tenant table and foreign table.
Some columns (not at all) of multi tenant tables and child foreign tables are matched.
"""
"""
Mock valid values for get_multi_tenant_tables and get_child_tables
"""
multi_tenant_tables.return_value = mock_multi_tenant_data
with patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage.search_table_entities"
) as source_entities, patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage._get_child_tables"
) as child_tables:
child_tables.side_effect = mock_child_data
source_entities.side_effect = table_entities_2
requests = []
for record in get_lineage_from_multi_tenant_table(
self.postgres.metadata,
connection=self.postgres.service_connection,
service_name=self.postgres.config.serviceName,
):
if isinstance(record, AddLineageRequest):
requests.append(record)
"""Validate each AddLineageRequest"""
for _, (expected, original) in enumerate(
zip(EXPECTED_PGSPIDER_DETAILS_2, requests)
):
self.assertEqual(expected, original)
@patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage._get_multi_tenant_tables"
)
def test_next_record_3(self, multi_tenant_tables):
"""
Verify normal case:
There is multi-tenant table and foreign table.
There is no column of multi tenant tables and child foreign tables are matched.
"""
"""
Mock valid values for get_multi_tenant_tables and get_child_tables
"""
multi_tenant_tables.return_value = mock_multi_tenant_data
with patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage.search_table_entities"
) as source_entities, patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage._get_child_tables"
) as child_tables:
child_tables.side_effect = mock_child_data
source_entities.side_effect = table_entities_3
requests = []
for record in get_lineage_from_multi_tenant_table(
self.postgres.metadata,
connection=self.postgres.service_connection,
service_name=self.postgres.config.serviceName,
):
if isinstance(record, AddLineageRequest):
requests.append(record)
"""Validate each AddLineageRequest"""
for _, (expected, original) in enumerate(
zip(EXPECTED_PGSPIDER_DETAILS_3, requests)
):
self.assertEqual(expected, original)
@patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage._get_multi_tenant_tables"
)
def test_next_record_4(self, multi_tenant_tables):
"""
Verify abnormal case:
There is no multi-tenant.
"""
"""
Mock valid values for get_multi_tenant_tables and get_child_tables
"""
multi_tenant_tables.return_value = []
with patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage.search_table_entities"
) as source_entities, patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage._get_child_tables"
) as child_tables:
child_tables.return_value = mock_child_data
source_entities.return_value = []
requests = []
for record in get_lineage_from_multi_tenant_table(
self.postgres.metadata,
connection=self.postgres.service_connection,
service_name=self.postgres.config.serviceName,
):
if isinstance(record, AddLineageRequest):
requests.append(record)
"""Validate number of AddLineageRequest"""
self.assertEqual(0, len(requests))
@patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage._get_multi_tenant_tables"
)
def test_next_record_5(self, multi_tenant_tables):
"""
Verify abnormal case:
There are multi-tenant tables.
There is no corresponding child foreign tables of specified multi tenant table.
"""
"""
Mock valid values for get_multi_tenant_tables and get_child_tables
"""
multi_tenant_tables.return_value = mock_multi_tenant_data
with patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage.search_table_entities"
) as source_entities, patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage._get_child_tables"
) as child_tables:
child_tables.return_value = []
requests = []
for record in get_lineage_from_multi_tenant_table(
self.postgres.metadata,
connection=self.postgres.service_connection,
service_name=self.postgres.config.serviceName,
):
if isinstance(record, AddLineageRequest):
requests.append(record)
"""Validate number of AddLineageRequest"""
self.assertEqual(0, len(requests))
@patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage._get_multi_tenant_tables"
)
def test_next_record_6(self, multi_tenant_tables):
"""
Verify abnormal case:
There are multi tenant tables and child foreign tables in remote PGSpider.
All multi tenant tables and child foreign tables have not been ingested into open-metadata.
"""
"""
Mock valid values for get_multi_tenant_tables and get_child_tables
"""
multi_tenant_tables.return_value = mock_multi_tenant_data
with patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage.search_table_entities"
) as source_entities, patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage._get_child_tables"
) as child_tables:
child_tables.side_effect = mock_child_data
source_entities.return_value = []
requests = []
for record in get_lineage_from_multi_tenant_table(
self.postgres.metadata,
connection=self.postgres.service_connection,
service_name=self.postgres.config.serviceName,
):
if isinstance(record, AddLineageRequest):
requests.append(record)
"""Validate number of AddLineageRequest"""
self.assertEqual(0, len(requests))