Fixes 11924 : Add pgspider support (#12159)

* Add PGSpider service connector

* add sqlalchemy-pgspider to setup.py

* Delete unnecessary code and move files.

* Change to work as a Postgres connector sub-module

* Rebuild PGSpiderLineage class into a set of funcs

---------

Co-authored-by: Pham Ngoc Son <son1.phamngoc@toshiba.co.jp>
This commit is contained in:
Megumi AIKAWA 2023-07-05 16:18:59 +09:00 committed by GitHub
parent e9ff1e0f05
commit 9aede5e821
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1028 additions and 1 deletions

View File

@ -204,6 +204,7 @@ plugins: Dict[str, Set[str]] = {
"nifi": {}, # uses requests
"okta": {"okta~=2.3"},
"oracle": {"cx_Oracle>=8.3.0,<9", "oracledb~=1.2"},
"pgspider": {"psycopg2-binary", "sqlalchemy-pgspider"},
"pinotdb": {"pinotdb~=0.3"},
"postgres": {
VERSIONS["pymysql"],

View File

@ -14,13 +14,22 @@ Postgres lineage module
from typing import Iterable
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.services.connections.database.postgresConnection import (
PostgresScheme,
)
from metadata.ingestion.lineage.models import Dialect
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.ingestion.source.database.postgres.pgspider.lineage import (
get_lineage_from_multi_tenant_table,
)
from metadata.ingestion.source.database.postgres.queries import POSTGRES_SQL_STATEMENT
from metadata.ingestion.source.database.postgres.query_parser import (
PostgresQueryParserSource,
)
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class PostgresLineageSource(PostgresQueryParserSource, LineageSource):
@ -58,3 +67,13 @@ class PostgresLineageSource(PostgresQueryParserSource, LineageSource):
for lineage_request in lineages or []:
yield lineage_request
if self.service_connection.scheme == PostgresScheme.pgspider_psycopg2:
lineages = get_lineage_from_multi_tenant_table(
self.metadata,
connection=self.service_connection,
service_name=self.config.serviceName,
)
for lineage_request in lineages or []:
yield lineage_request

View File

@ -0,0 +1,118 @@
# Copyright 2021 Collate
# Portions Copyright(c) 2023, TOSHIBA CORPORATION
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
PGSpider lineage module
"""
from typing import Iterable, Iterator, Optional
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.entityLineage import (
ColumnLineage,
EntitiesEdge,
LineageDetails,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.lineage.sql_lineage import search_table_entities
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.postgres.pgspider.queries import (
PGSPIDER_GET_CHILD_TABLES,
PGSPIDER_GET_MULTI_TENANT_TABLES,
)
def _get_multi_tenant_tables(connection) -> Iterable[any]:
"""
Get list of multi tenant tables from PGSpider
"""
sql = PGSPIDER_GET_MULTI_TENANT_TABLES
with get_connection(connection).connect() as conn:
rows = conn.execute(sql)
return rows
def _get_child_tables(connection, multi_tenant_table: str) -> Iterable[any]:
"""
Get list of child foreign tables of a multi-tenant table
"""
sql = PGSPIDER_GET_CHILD_TABLES.format(multi_tenant_table=multi_tenant_table)
with get_connection(connection).connect() as conn:
rows = conn.execute(sql)
return rows
# For column level lineage, find all pairs of columns which have
# the same name and create LineageDetails.
def _get_column_lineages(source_entity, target_entity):
column_lineages = []
for source_column in source_entity.columns:
for target_column in target_entity.columns:
if source_column.name == target_column.name:
column_lineages.append(
ColumnLineage(
fromColumns=[source_column.fullyQualifiedName.__root__],
toColumn=target_column.fullyQualifiedName.__root__,
)
)
break
return column_lineages
def get_lineage_from_multi_tenant_table(
metadata: OpenMetadata,
connection: any,
service_name: str,
) -> Optional[Iterator[AddLineageRequest]]:
"""
For PGSpider, firstly, get list of multi-tenant tables.
Next, get child foreign tables of each multi-tenant tables.
Get entities of source and target table to create Lineage request.
"""
for multi_tenant_table in _get_multi_tenant_tables(connection):
database = multi_tenant_table["database"]
schema = multi_tenant_table["nspname"]
target_table = multi_tenant_table["relname"]
target_entities = search_table_entities(
metadata=metadata,
service_name=service_name,
database=database,
database_schema=schema,
table=target_table,
)
for child_foreign_table in _get_child_tables(connection, target_table):
source_entities = search_table_entities(
metadata=metadata,
service_name=service_name,
database=database,
database_schema=schema,
table=child_foreign_table["relname"],
)
for target_entity in target_entities:
for source_entity in source_entities:
column_lineages = _get_column_lineages(source_entity, target_entity)
lineage_details = LineageDetails(
columnsLineage=column_lineages,
)
yield AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=source_entity.id, type="table"
),
toEntity=EntityReference(id=target_entity.id, type="table"),
lineageDetails=lineage_details,
)
)

View File

@ -0,0 +1,45 @@
# Copyright(c) 2023, TOSHIBA CORPORATION
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
SQL Queries used during ingestion
"""
import textwrap
PGSPIDER_GET_MULTI_TENANT_TABLES = textwrap.dedent(
"""
SELECT
c.relname, ns.nspname, current_database() as database
FROM
pg_class c
JOIN pg_namespace ns ON c.relnamespace = ns.oid
JOIN pg_foreign_table ft ON c.oid = ft.ftrelid
JOIN pg_foreign_server fs ON ft.ftserver = fs.oid
JOIN pg_foreign_data_wrapper fdw ON fs.srvfdw = fdw.oid
WHERE
fdw.fdwname = 'pgspider_core_fdw'
"""
)
PGSPIDER_GET_CHILD_TABLES = textwrap.dedent(
"""
WITH srv AS
(SELECT srvname FROM pg_foreign_table ft
JOIN pg_foreign_server fs ON ft.ftserver = fs.oid GROUP BY srvname ORDER BY srvname),
regex_pattern AS
(SELECT '^' || relname || '\\_\\_' || srv.srvname || '\\_\\_[0-9]+$' regex FROM pg_class
CROSS JOIN srv where relname = '{multi_tenant_table}')
SELECT relname FROM pg_class
WHERE (relname ~ (SELECT string_agg(regex, '|') FROM regex_pattern))
AND (relname NOT LIKE '%%\\_%%\\_seq')
ORDER BY relname;
"""
)

View File

@ -0,0 +1,15 @@
[
[
{
"relname": "test1__post_svr__0"
},
{
"relname": "test1__post_svr__1"
}
],
[
{
"relname": "test2__post_svr__0"
}
]
]

View File

@ -0,0 +1,12 @@
[
{
"relname": "test1",
"nspname": "public",
"database": "pgspider"
},
{
"relname": "test2",
"nspname": "public",
"database": "pgspider"
}
]

View File

@ -0,0 +1,817 @@
# Copyright 2021 Collate
# Copyright(c) 2023, TOSHIBA CORPORATION
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
PGSpider Lineage Unit Test
"""
import json
from pathlib import Path
from unittest import TestCase
from unittest.mock import patch
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.table import Column, DataType, Table
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.entityLineage import (
ColumnLineage,
EntitiesEdge,
LineageDetails,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.source.database.postgres.lineage import PostgresLineageSource
from metadata.ingestion.source.database.postgres.pgspider.lineage import (
get_lineage_from_multi_tenant_table,
)
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
mock_multi_tenant_file_path = (
Path(__file__).parent / "resources/datasets/pgspider_multi_tenant_tables.json"
)
with open(mock_multi_tenant_file_path, encoding="utf-8") as file:
mock_multi_tenant_data: dict = json.load(file)
mock_child_file_path = (
Path(__file__).parent / "resources/datasets/pgspider_child_tables.json"
)
with open(mock_child_file_path, encoding="utf-8") as file:
mock_child_data = json.load(file)
EXPECTED_PGSPIDER_DETAILS_1 = [
AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id="e3e1649a-97f4-4849-bc02-d8d67eab9722", type="table"
),
toEntity=EntityReference(
id="b3f7df8e-50de-4555-a497-c7e170f4de8e", type="table"
),
lineageDetails=LineageDetails(
columnsLineage=[
ColumnLineage(
fromColumns=[
"local_pgspider1.pgspider.public.test1__post_svr__0.id"
],
toColumn="local_pgspider1.pgspider.public.test1.id",
)
]
),
),
),
AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id="02f020df-ef8c-4156-9d02-a2ff40b9649b", type="table"
),
toEntity=EntityReference(
id="b3f7df8e-50de-4555-a497-c7e170f4de8e", type="table"
),
lineageDetails=LineageDetails(
columnsLineage=[
ColumnLineage(
fromColumns=[
"local_pgspider1.pgspider.public.test1__post_svr__1.id"
],
toColumn="local_pgspider1.pgspider.public.test1.id",
)
]
),
),
),
AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id="57ba2523-5424-467f-992a-afe29dc7e23d", type="table"
),
toEntity=EntityReference(
id="a68492cc-af89-4031-8b8e-bc31f2cedcd5", type="table"
),
lineageDetails=LineageDetails(
columnsLineage=[
ColumnLineage(
fromColumns=[
"local_pgspider1.pgspider.public.test2__post_svr__0.a"
],
toColumn="local_pgspider1.pgspider.public.test2.a",
),
ColumnLineage(
fromColumns=[
"local_pgspider1.pgspider.public.test2__post_svr__0.b"
],
toColumn="local_pgspider1.pgspider.public.test2.b",
),
ColumnLineage(
fromColumns=[
"local_pgspider1.pgspider.public.test2__post_svr__0.c"
],
toColumn="local_pgspider1.pgspider.public.test2.c",
),
]
),
),
),
]
EXPECTED_PGSPIDER_DETAILS_2 = [
AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id="e3e1649a-97f4-4849-bc02-d8d67eab9722", type="table"
),
toEntity=EntityReference(
id="b3f7df8e-50de-4555-a497-c7e170f4de8e", type="table"
),
lineageDetails=LineageDetails(
columnsLineage=[
ColumnLineage(
fromColumns=[
"local_pgspider1.pgspider.public.test1__post_svr__0.id"
],
toColumn="local_pgspider1.pgspider.public.test1.id",
)
]
),
),
),
AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id="02f020df-ef8c-4156-9d02-a2ff40b9649b", type="table"
),
toEntity=EntityReference(
id="b3f7df8e-50de-4555-a497-c7e170f4de8e", type="table"
),
lineageDetails=LineageDetails(
columnsLineage=[
ColumnLineage(
fromColumns=[
"local_pgspider1.pgspider.public.test1__post_svr__1.id"
],
toColumn="local_pgspider1.pgspider.public.test1.id",
)
]
),
),
),
AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id="57ba2523-5424-467f-992a-afe29dc7e23d", type="table"
),
toEntity=EntityReference(
id="a68492cc-af89-4031-8b8e-bc31f2cedcd5", type="table"
),
lineageDetails=LineageDetails(
columnsLineage=[
ColumnLineage(
fromColumns=[
"local_pgspider1.pgspider.public.test2__post_svr__0.a"
],
toColumn="local_pgspider1.pgspider.public.test2.a",
),
ColumnLineage(
fromColumns=[
"local_pgspider1.pgspider.public.test2__post_svr__0.b"
],
toColumn="local_pgspider1.pgspider.public.test2.b",
),
]
),
),
),
]
EXPECTED_PGSPIDER_DETAILS_3 = [
AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id="e3e1649a-97f4-4849-bc02-d8d67eab9722", type="table"
),
toEntity=EntityReference(
id="b3f7df8e-50de-4555-a497-c7e170f4de8e", type="table"
),
lineageDetails=LineageDetails(
sqlQuery=None, columnsLineage=[], pipeline=None
),
),
),
AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id="02f020df-ef8c-4156-9d02-a2ff40b9649b", type="table"
),
toEntity=EntityReference(
id="b3f7df8e-50de-4555-a497-c7e170f4de8e", type="table"
),
lineageDetails=LineageDetails(
sqlQuery=None, columnsLineage=[], pipeline=None
),
),
),
AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id="57ba2523-5424-467f-992a-afe29dc7e23d", type="table"
),
toEntity=EntityReference(
id="a68492cc-af89-4031-8b8e-bc31f2cedcd5", type="table"
),
lineageDetails=LineageDetails(
sqlQuery=None, columnsLineage=[], pipeline=None
),
),
),
]
mock_pgspider_config = {
"source": {
"type": "pgspider-lineage",
"serviceName": "local_pgspider1",
"serviceConnection": {
"config": {
"type": "Postgres",
"scheme": "pgspider+psycopg2",
"username": "openmetadata_user",
"hostPort": "localhost:4813",
"database": "pgspider",
}
},
"sourceConfig": {
"config": {
"type": "DatabaseLineage",
"queryLogDuration": 1,
"resultLimit": 10000,
}
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGc"
"iOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE"
"2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXB"
"iEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fN"
"r3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3u"
"d-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
},
}
table_entities_1 = [
[
Table(
id="b3f7df8e-50de-4555-a497-c7e170f4de8e",
name="test1",
columns=[
Column(
name="id",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1.id",
),
Column(
name="__spd_url",
dataType=DataType.TEXT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1.__spd_url",
),
],
)
],
[
Table(
id="e3e1649a-97f4-4849-bc02-d8d67eab9722",
name="test1__post_svr__0",
columns=[
Column(
name="id",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1__post_svr__0.id",
)
],
)
],
[
Table(
id="02f020df-ef8c-4156-9d02-a2ff40b9649b",
name="test1__post_svr__1",
columns=[
Column(
name="id",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1__post_svr__1.id",
)
],
)
],
[
Table(
id="a68492cc-af89-4031-8b8e-bc31f2cedcd5",
name="test2",
columns=[
Column(
name="a",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2.a",
),
Column(
name="b",
dataType=DataType.TEXT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2.b",
),
Column(
name="c",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2.c",
),
Column(
name="__spd_url",
dataType=DataType.TEXT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2.__spd_url",
),
],
)
],
[
Table(
id="57ba2523-5424-467f-992a-afe29dc7e23d",
name="test2__post_svr__0",
columns=[
Column(
name="a",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2__post_svr__0.a",
),
Column(
name="b",
dataType=DataType.TEXT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2__post_svr__0.b",
),
Column(
name="c",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2__post_svr__0.c",
),
],
)
],
]
table_entities_2 = [
[
Table(
id="b3f7df8e-50de-4555-a497-c7e170f4de8e",
name="test1",
columns=[
Column(
name="id",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1.id",
),
Column(
name="__spd_url",
dataType=DataType.TEXT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1.__spd_url",
),
],
)
],
[
Table(
id="e3e1649a-97f4-4849-bc02-d8d67eab9722",
name="test1__post_svr__0",
columns=[
Column(
name="id",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1__post_svr__0.id",
),
Column(
name="a",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1__post_svr__0.a",
),
],
)
],
[
Table(
id="02f020df-ef8c-4156-9d02-a2ff40b9649b",
name="test1__post_svr__1",
columns=[
Column(
name="id",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1__post_svr__1.id",
),
Column(
name="b",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1__post_svr__0.b",
),
],
)
],
[
Table(
id="a68492cc-af89-4031-8b8e-bc31f2cedcd5",
name="test2",
columns=[
Column(
name="a",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2.a",
),
Column(
name="b",
dataType=DataType.TEXT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2.b",
),
Column(
name="c",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2.c",
),
Column(
name="__spd_url",
dataType=DataType.TEXT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2.__spd_url",
),
],
)
],
[
Table(
id="57ba2523-5424-467f-992a-afe29dc7e23d",
name="test2__post_svr__0",
columns=[
Column(
name="a",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2__post_svr__0.a",
),
Column(
name="b",
dataType=DataType.TEXT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2__post_svr__0.b",
),
Column(
name="d",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2__post_svr__0.d",
),
],
)
],
]
table_entities_3 = [
[
Table(
id="b3f7df8e-50de-4555-a497-c7e170f4de8e",
name="test1",
columns=[
Column(
name="id",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1.id",
),
Column(
name="__spd_url",
dataType=DataType.TEXT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1.__spd_url",
),
],
)
],
[
Table(
id="e3e1649a-97f4-4849-bc02-d8d67eab9722",
name="test1__post_svr__0",
columns=[
Column(
name="a",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1__post_svr__0.a",
)
],
)
],
[
Table(
id="02f020df-ef8c-4156-9d02-a2ff40b9649b",
name="test1__post_svr__1",
columns=[
Column(
name="a",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test1__post_svr__1.a",
)
],
)
],
[
Table(
id="a68492cc-af89-4031-8b8e-bc31f2cedcd5",
name="test2",
columns=[
Column(
name="a",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2.a",
),
Column(
name="b",
dataType=DataType.TEXT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2.b",
),
Column(
name="c",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2.c",
),
Column(
name="__spd_url",
dataType=DataType.TEXT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2.__spd_url",
),
],
)
],
[
Table(
id="57ba2523-5424-467f-992a-afe29dc7e23d",
name="test2__post_svr__0",
columns=[
Column(
name="d",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2__post_svr__0.d",
),
Column(
name="e",
dataType=DataType.TEXT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2__post_svr__0.e",
),
Column(
name="f",
dataType=DataType.INT,
fullyQualifiedName="local_pgspider1.pgspider.public.test2__post_svr__0.f",
),
],
)
],
]
class PGSpiderLineageUnitTests(TestCase):
"""
Implements the necessary methods to extract
PGSpider lineage test
"""
def __init__(self, methodName) -> None:
super().__init__(methodName)
config = OpenMetadataWorkflowConfig.parse_obj(mock_pgspider_config)
self.postgres = PostgresLineageSource.create(
mock_pgspider_config["source"],
config.workflowConfig.openMetadataServerConfig,
)
print(type(self.postgres))
@patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage._get_multi_tenant_tables"
)
def test_next_record_1(self, multi_tenant_tables):
"""
Verify normal case:
There is multi-tenant table and foreign table.
All columns (except __spd_url) of multi tenant tables and child foreign tables are matched.
"""
"""
Mock valid values for get_multi_tenant_tables and get_child_tables
"""
multi_tenant_tables.return_value = mock_multi_tenant_data
with patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage.search_table_entities"
) as source_entities, patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage._get_child_tables"
) as child_tables:
child_tables.side_effect = mock_child_data
source_entities.side_effect = table_entities_1
requests = []
for record in get_lineage_from_multi_tenant_table(
self.postgres.metadata,
connection=self.postgres.service_connection,
service_name=self.postgres.config.serviceName,
):
if isinstance(record, AddLineageRequest):
requests.append(record)
"""Validate each AddLineageRequest"""
for _, (expected, original) in enumerate(
zip(EXPECTED_PGSPIDER_DETAILS_1, requests)
):
self.assertEqual(expected, original)
@patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage._get_multi_tenant_tables"
)
def test_next_record_2(self, multi_tenant_tables):
"""
Verify normal case:
There is multi-tenant table and foreign table.
Some columns (not at all) of multi tenant tables and child foreign tables are matched.
"""
"""
Mock valid values for get_multi_tenant_tables and get_child_tables
"""
multi_tenant_tables.return_value = mock_multi_tenant_data
with patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage.search_table_entities"
) as source_entities, patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage._get_child_tables"
) as child_tables:
child_tables.side_effect = mock_child_data
source_entities.side_effect = table_entities_2
requests = []
for record in get_lineage_from_multi_tenant_table(
self.postgres.metadata,
connection=self.postgres.service_connection,
service_name=self.postgres.config.serviceName,
):
if isinstance(record, AddLineageRequest):
requests.append(record)
"""Validate each AddLineageRequest"""
for _, (expected, original) in enumerate(
zip(EXPECTED_PGSPIDER_DETAILS_2, requests)
):
self.assertEqual(expected, original)
@patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage._get_multi_tenant_tables"
)
def test_next_record_3(self, multi_tenant_tables):
"""
Verify normal case:
There is multi-tenant table and foreign table.
There is no column of multi tenant tables and child foreign tables are matched.
"""
"""
Mock valid values for get_multi_tenant_tables and get_child_tables
"""
multi_tenant_tables.return_value = mock_multi_tenant_data
with patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage.search_table_entities"
) as source_entities, patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage._get_child_tables"
) as child_tables:
child_tables.side_effect = mock_child_data
source_entities.side_effect = table_entities_3
requests = []
for record in get_lineage_from_multi_tenant_table(
self.postgres.metadata,
connection=self.postgres.service_connection,
service_name=self.postgres.config.serviceName,
):
if isinstance(record, AddLineageRequest):
requests.append(record)
"""Validate each AddLineageRequest"""
for _, (expected, original) in enumerate(
zip(EXPECTED_PGSPIDER_DETAILS_3, requests)
):
self.assertEqual(expected, original)
@patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage._get_multi_tenant_tables"
)
def test_next_record_4(self, multi_tenant_tables):
"""
Verify abnormal case:
There is no multi-tenant.
"""
"""
Mock valid values for get_multi_tenant_tables and get_child_tables
"""
multi_tenant_tables.return_value = []
with patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage.search_table_entities"
) as source_entities, patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage._get_child_tables"
) as child_tables:
child_tables.return_value = mock_child_data
source_entities.return_value = []
requests = []
for record in get_lineage_from_multi_tenant_table(
self.postgres.metadata,
connection=self.postgres.service_connection,
service_name=self.postgres.config.serviceName,
):
if isinstance(record, AddLineageRequest):
requests.append(record)
"""Validate number of AddLineageRequest"""
self.assertEqual(0, len(requests))
@patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage._get_multi_tenant_tables"
)
def test_next_record_5(self, multi_tenant_tables):
"""
Verify abnormal case:
There are multi-tenant tables.
There is no corresponding child foreign tables of specified multi tenant table.
"""
"""
Mock valid values for get_multi_tenant_tables and get_child_tables
"""
multi_tenant_tables.return_value = mock_multi_tenant_data
with patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage.search_table_entities"
) as source_entities, patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage._get_child_tables"
) as child_tables:
child_tables.return_value = []
requests = []
for record in get_lineage_from_multi_tenant_table(
self.postgres.metadata,
connection=self.postgres.service_connection,
service_name=self.postgres.config.serviceName,
):
if isinstance(record, AddLineageRequest):
requests.append(record)
"""Validate number of AddLineageRequest"""
self.assertEqual(0, len(requests))
@patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage._get_multi_tenant_tables"
)
def test_next_record_6(self, multi_tenant_tables):
"""
Verify abnormal case:
There are multi tenant tables and child foreign tables in remote PGSpider.
All multi tenant tables and child foreign tables have not been ingested into open-metadata.
"""
"""
Mock valid values for get_multi_tenant_tables and get_child_tables
"""
multi_tenant_tables.return_value = mock_multi_tenant_data
with patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage.search_table_entities"
) as source_entities, patch(
"metadata.ingestion.source.database.postgres.pgspider.lineage._get_child_tables"
) as child_tables:
child_tables.side_effect = mock_child_data
source_entities.return_value = []
requests = []
for record in get_lineage_from_multi_tenant_table(
self.postgres.metadata,
connection=self.postgres.service_connection,
service_name=self.postgres.config.serviceName,
):
if isinstance(record, AddLineageRequest):
requests.append(record)
"""Validate number of AddLineageRequest"""
self.assertEqual(0, len(requests))

View File

@ -18,7 +18,7 @@
"description": "SQLAlchemy driver scheme options.",
"type": "string",
"enum": [
"postgresql+psycopg2"
"postgresql+psycopg2", "pgspider+psycopg2"
],
"default": "postgresql+psycopg2"
}