Added lineage in cli e2e (#19216)

This commit is contained in:
Akash Verma 2025-01-09 14:59:49 +05:30 committed by GitHub
parent 1d2774ac29
commit 4cad5762ad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 149 additions and 2 deletions

View File

@ -72,6 +72,31 @@ class ProfilerConfigBuilder(BaseBuilder):
return self.config return self.config
class LineageConfigBuilder(BaseBuilder):
"""Builder class for the Lineage config"""
# pylint: disable=invalid-name
def __init__(self, config: dict, config_args: dict) -> None:
super().__init__(config, config_args)
self.resultLimit = self.config_args.get("resultLimit", 1000)
self.queryLogDuration = self.config_args.get("queryLogDuration", 1)
# pylint: enable=invalid-name
def build(self) -> dict:
"""build lineage config"""
self.config["source"]["type"] = self.config_args["source"]
self.config["source"]["sourceConfig"] = {
"config": {
"type": "DatabaseLineage",
"queryLogDuration": 1,
"resultLimit": 10000,
"processQueryLineage": True,
"processStoredProcedureLineage": True,
}
}
return self.config
class AutoClassificationConfigBuilder(BaseBuilder): class AutoClassificationConfigBuilder(BaseBuilder):
"""Builder class for the AutoClassification config""" """Builder class for the AutoClassification config"""
@ -206,6 +231,7 @@ def builder_factory(builder, config: dict, config_args: dict):
"""Factory method to return the builder class""" """Factory method to return the builder class"""
builder_classes = { builder_classes = {
E2EType.PROFILER.value: ProfilerConfigBuilder, E2EType.PROFILER.value: ProfilerConfigBuilder,
E2EType.LINEAGE.value: LineageConfigBuilder,
E2EType.DATA_QUALITY.value: DataQualityConfigBuilder, E2EType.DATA_QUALITY.value: DataQualityConfigBuilder,
E2EType.INGEST_DB_FILTER_SCHEMA.value: SchemaConfigBuilder, E2EType.INGEST_DB_FILTER_SCHEMA.value: SchemaConfigBuilder,
E2EType.INGEST_DB_FILTER_TABLE.value: TableConfigBuilder, E2EType.INGEST_DB_FILTER_TABLE.value: TableConfigBuilder,

View File

@ -23,6 +23,7 @@ class E2EType(Enum):
INGEST = "ingest" INGEST = "ingest"
PROFILER = "profiler" PROFILER = "profiler"
LINEAGE = "lineage"
PROFILER_PROCESSOR = "profiler-processor" PROFILER_PROCESSOR = "profiler-processor"
AUTO_CLASSIFICATION = "auto-classification" AUTO_CLASSIFICATION = "auto-classification"
DATA_QUALITY = "test" DATA_QUALITY = "test"

View File

@ -211,6 +211,20 @@ class CliDBBase(TestCase):
This test will need to be implemented on the database specific test classes This test will need to be implemented on the database specific test classes
""" """
self.delete_table_and_view()
self.create_table_and_view()
self.build_config_file(
E2EType.INGEST_DB_FILTER_SCHEMA,
{"includes": self.get_includes_schemas()},
)
self.run_command()
self.build_config_file(
E2EType.LINEAGE,
{"source": f"{self.get_connector_name()}-lineage"},
)
result = self.run_command()
sink_status, source_status = self.retrieve_statuses(result)
self.assert_for_test_lineage(source_status, sink_status)
@pytest.mark.order(12) @pytest.mark.order(12)
def test_profiler_with_time_partition(self) -> None: def test_profiler_with_time_partition(self) -> None:
@ -326,6 +340,12 @@ class CliDBBase(TestCase):
) -> None: ) -> None:
raise NotImplementedError() raise NotImplementedError()
@abstractmethod
def assert_for_test_lineage(
self, source_status: Status, sink_status: Status
) -> None:
raise NotImplementedError()
@abstractmethod @abstractmethod
def assert_for_table_with_profiler( def assert_for_table_with_profiler(
self, source_status: Status, sink_status: Status self, source_status: Status, sink_status: Status

View File

@ -100,6 +100,23 @@ class CliCommonDB:
# of https://github.com/open-metadata/OpenMetadata/pull/18558 # of https://github.com/open-metadata/OpenMetadata/pull/18558
# we need to introduce Lineage E2E base and add view lineage check there. # we need to introduce Lineage E2E base and add view lineage check there.
def assert_for_test_lineage(self, source_status: Status, sink_status: Status):
self.assertEqual(len(source_status.failures), 0)
self.assertEqual(len(source_status.warnings), 0)
self.assertEqual(len(sink_status.failures), 0)
self.assertEqual(len(sink_status.warnings), 0)
self.assertGreaterEqual(len(sink_status.records), 1)
lineage_data = self.retrieve_lineage(self.fqn_created_table())
retrieved_view_column_lineage_count = len(
lineage_data["downstreamEdges"][0]["lineageDetails"]["columnsLineage"]
)
self.assertEqual(
retrieved_view_column_lineage_count, self.view_column_lineage_count()
)
retrieved_lineage_node = lineage_data["nodes"][0]["fullyQualifiedName"]
self.assertEqual(retrieved_lineage_node, self.expected_lineage_node())
def assert_auto_classification_sample_data( def assert_auto_classification_sample_data(
self, source_status: Status, sink_status: Status self, source_status: Status, sink_status: Status
): ):
@ -206,6 +223,10 @@ class CliCommonDB:
def view_column_lineage_count(self) -> int: def view_column_lineage_count(self) -> int:
raise NotImplementedError() raise NotImplementedError()
@abstractmethod
def expected_lineage_node(self) -> str:
raise NotImplementedError()
@staticmethod @staticmethod
@abstractmethod @abstractmethod
def fqn_created_table() -> str: def fqn_created_table() -> str:

View File

@ -66,6 +66,9 @@ class AthenaCliTest(CliCommonDB.TestSuite):
def view_column_lineage_count(self) -> int: def view_column_lineage_count(self) -> int:
pass pass
def expected_lineage_node(self) -> str:
pass
@staticmethod @staticmethod
def fqn_created_table() -> str: def fqn_created_table() -> str:
return "e2e_athena.database_name.e2e_db.customers" return "e2e_athena.database_name.e2e_db.customers"
@ -130,6 +133,10 @@ class AthenaCliTest(CliCommonDB.TestSuite):
sink_status, source_status = self.retrieve_statuses(result) sink_status, source_status = self.retrieve_statuses(result)
self.assert_for_table_with_profiler(source_status, sink_status) self.assert_for_table_with_profiler(source_status, sink_status)
@pytest.mark.order(11)
def test_lineage(self) -> None:
pytest.skip("Lineage not configured. Skipping Test")
def assert_for_vanilla_ingestion( def assert_for_vanilla_ingestion(
self, source_status: Status, sink_status: Status self, source_status: Status, sink_status: Status
) -> None: ) -> None:

View File

@ -98,6 +98,9 @@ class BigqueryCliTest(CliCommonDB.TestSuite, SQACommonMethods):
def view_column_lineage_count(self) -> int: def view_column_lineage_count(self) -> int:
return 2 return 2
def expected_lineage_node(self) -> str:
return "local_bigquery.open-metadata-beta.exclude_me.view_orders"
@staticmethod @staticmethod
def _expected_profiled_tables() -> int: def _expected_profiled_tables() -> int:
return 2 return 2

View File

@ -74,6 +74,9 @@ class BigqueryCliTest(CliCommonDB.TestSuite, SQACommonMethods):
def view_column_lineage_count(self) -> int: def view_column_lineage_count(self) -> int:
return 2 return 2
def expected_lineage_node(self) -> str:
return "local_bigquery_multiple.modified-leaf-330420.do_not_touch.view_orders"
@staticmethod @staticmethod
def _expected_profiled_tables() -> int: def _expected_profiled_tables() -> int:
return 2 return 2

View File

@ -63,6 +63,9 @@ class DatalakeCliTest(CliCommonDB.TestSuite):
def view_column_lineage_count(self) -> int: def view_column_lineage_count(self) -> int:
pass pass
def expected_lineage_node(self) -> str:
pass
@staticmethod @staticmethod
def fqn_created_table() -> str: def fqn_created_table() -> str:
return 'aws_datalake.default.aws-datalake-e2e."sales/sales.csv"' return 'aws_datalake.default.aws-datalake-e2e."sales/sales.csv"'
@ -128,3 +131,7 @@ class DatalakeCliTest(CliCommonDB.TestSuite):
result = self.run_command("profile") result = self.run_command("profile")
sink_status, source_status = self.retrieve_statuses(result) sink_status, source_status = self.retrieve_statuses(result)
self.assert_for_table_with_profiler(source_status, sink_status) self.assert_for_table_with_profiler(source_status, sink_status)
@pytest.mark.order(11)
def test_lineage(self) -> None:
pytest.skip("Lineage not configured. Skipping Test")

View File

@ -15,6 +15,7 @@ Test Redshift connector with CLI
from pathlib import Path from pathlib import Path
from typing import List from typing import List
import pytest
from sqlalchemy.engine import Engine from sqlalchemy.engine import Engine
from metadata.ingestion.api.status import Status from metadata.ingestion.api.status import Status
@ -62,6 +63,10 @@ class DbtCliTest(CliDBTBase.TestSuite):
"local_redshift.dev.dbt_cli_e2e.orders", "local_redshift.dev.dbt_cli_e2e.orders",
] ]
@pytest.mark.order(11)
def test_lineage(self) -> None:
pytest.skip("Lineage not configured. Skipping Test")
def assert_for_vanilla_ingestion( def assert_for_vanilla_ingestion(
self, source_status: Status, sink_status: Status self, source_status: Status, sink_status: Status
) -> None: ) -> None:

View File

@ -107,7 +107,10 @@ class HiveCliTest(CliCommonDB.TestSuite, SQACommonMethods):
"""view was created from `CREATE VIEW xyz AS (SELECT * FROM abc)` """view was created from `CREATE VIEW xyz AS (SELECT * FROM abc)`
which does not propagate column lineage which does not propagate column lineage
""" """
return None return 3
def expected_lineage_node(self) -> str:
return "e2e_hive.default.e2e_cli_tests.view_persons"
@staticmethod @staticmethod
def fqn_created_table() -> str: def fqn_created_table() -> str:

View File

@ -15,6 +15,8 @@ Test Metabase connector with CLI
from pathlib import Path from pathlib import Path
from typing import List from typing import List
import pytest
from .base.test_cli import PATH_TO_RESOURCES from .base.test_cli import PATH_TO_RESOURCES
from .common.test_cli_dashboard import CliCommonDashboard from .common.test_cli_dashboard import CliCommonDashboard
@ -77,3 +79,7 @@ class MetabaseCliTest(CliCommonDashboard.TestSuite):
def expected_dashboards_and_charts_after_patch(self) -> int: def expected_dashboards_and_charts_after_patch(self) -> int:
return 0 return 0
@pytest.mark.order(11)
def test_lineage(self) -> None:
pytest.skip("Lineage not configured. Skipping Test")

View File

@ -84,6 +84,9 @@ class MSSQLCliTest(CliCommonDB.TestSuite, SQACommonMethods):
def view_column_lineage_count(self) -> int: def view_column_lineage_count(self) -> int:
return 4 return 4
def expected_lineage_node(self) -> str:
return "mssql.e2e_cli_tests.dbo.view_persons"
@staticmethod @staticmethod
def fqn_created_table() -> str: def fqn_created_table() -> str:
return "mssql.e2e_cli_tests.dbo.persons" return "mssql.e2e_cli_tests.dbo.persons"

View File

@ -91,6 +91,9 @@ class MysqlCliTest(CliCommonDB.TestSuite, SQACommonMethods):
def view_column_lineage_count(self) -> int: def view_column_lineage_count(self) -> int:
return 22 return 22
def expected_lineage_node(self) -> str:
return "local_mysql.default.openmetadata_db.view_persons"
@staticmethod @staticmethod
def fqn_created_table() -> str: def fqn_created_table() -> str:
return "local_mysql.default.openmetadata_db.persons" return "local_mysql.default.openmetadata_db.persons"

View File

@ -101,6 +101,9 @@ SELECT * from names
""" """
return 12 return 12
def expected_lineage_node(self) -> str:
return "e2e_oracle.default.admin.admin_emp_view"
@staticmethod @staticmethod
def fqn_created_table() -> str: def fqn_created_table() -> str:
return "e2e_oracle.default.admin.ADMIN_EMP" return "e2e_oracle.default.admin.ADMIN_EMP"

View File

@ -106,7 +106,10 @@ class PostgresCliTest(CliCommonDB.TestSuite, SQACommonMethods):
return len(self.insert_data_queries) return len(self.insert_data_queries)
def view_column_lineage_count(self) -> int: def view_column_lineage_count(self) -> int:
return None return 22
def expected_lineage_node(self) -> str:
return "local_postgres.E2EDB.public.view_all_datatypes"
@staticmethod @staticmethod
def fqn_created_table() -> str: def fqn_created_table() -> str:

View File

@ -15,6 +15,8 @@ Test PowerBI connector with CLI
from pathlib import Path from pathlib import Path
from typing import List from typing import List
import pytest
from .base.test_cli import PATH_TO_RESOURCES from .base.test_cli import PATH_TO_RESOURCES
from .common.test_cli_dashboard import CliCommonDashboard from .common.test_cli_dashboard import CliCommonDashboard
@ -75,3 +77,7 @@ class PowerBICliTest(CliCommonDashboard.TestSuite):
def expected_dashboards_and_charts_after_patch(self) -> int: def expected_dashboards_and_charts_after_patch(self) -> int:
return 0 return 0
@pytest.mark.order(11)
def test_lineage(self) -> None:
pytest.skip("Lineage not configured. Skipping Test")

View File

@ -14,6 +14,8 @@ Test Quicksight connector with CLI
""" """
from typing import List from typing import List
import pytest
from metadata.ingestion.api.status import Status from metadata.ingestion.api.status import Status
from .common.test_cli_dashboard import CliCommonDashboard from .common.test_cli_dashboard import CliCommonDashboard
@ -75,6 +77,10 @@ class QuicksightCliTest(CliCommonDashboard.TestSuite):
def expected_dashboards_and_charts_after_patch(self) -> int: def expected_dashboards_and_charts_after_patch(self) -> int:
return 7 return 7
@pytest.mark.order(11)
def test_lineage(self) -> None:
pytest.skip("Lineage not configured. Skipping Test")
def assert_for_vanilla_ingestion( def assert_for_vanilla_ingestion(
self, source_status: Status, sink_status: Status self, source_status: Status, sink_status: Status
) -> None: ) -> None:

View File

@ -14,6 +14,8 @@ Test Redash connector with CLI
""" """
from typing import List from typing import List
import pytest
from .common.test_cli_dashboard import CliCommonDashboard from .common.test_cli_dashboard import CliCommonDashboard
@ -65,3 +67,7 @@ class RedashCliTest(CliCommonDashboard.TestSuite):
def expected_dashboards_and_charts_after_patch(self) -> int: def expected_dashboards_and_charts_after_patch(self) -> int:
return 1 return 1
@pytest.mark.order(11)
def test_lineage(self) -> None:
pytest.skip("Lineage not configured. Skipping Test")

View File

@ -110,6 +110,9 @@ class RedshiftCliTest(CliCommonDB.TestSuite, SQACommonMethods):
""" """
return 9 return 9
def expected_lineage_node(self) -> str:
return "e2e_redshift.e2e_cli_tests.dbt_jaffle.view_listing"
@staticmethod @staticmethod
def fqn_created_table() -> str: def fqn_created_table() -> str:
return "e2e_redshift.e2e_cli_tests.dbt_jaffle.listing" return "e2e_redshift.e2e_cli_tests.dbt_jaffle.listing"

View File

@ -180,6 +180,9 @@ class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods):
def view_column_lineage_count(self) -> int: def view_column_lineage_count(self) -> int:
return 2 return 2
def expected_lineage_node(self) -> str:
return "e2e_snowflake.E2E_DB.E2E_TEST.view_persons"
@staticmethod @staticmethod
def fqn_created_table() -> str: def fqn_created_table() -> str:
return "e2e_snowflake.E2E_DB.E2E_TEST.PERSONS" return "e2e_snowflake.E2E_DB.E2E_TEST.PERSONS"

View File

@ -15,6 +15,8 @@ Test Tableau connector with CLI
from pathlib import Path from pathlib import Path
from typing import List from typing import List
import pytest
from metadata.ingestion.api.status import Status from metadata.ingestion.api.status import Status
from .base.test_cli import PATH_TO_RESOURCES from .base.test_cli import PATH_TO_RESOURCES
@ -78,6 +80,10 @@ class TableauCliTest(CliCommonDashboard.TestSuite):
def expected_dashboards_and_charts_after_patch(self) -> int: def expected_dashboards_and_charts_after_patch(self) -> int:
return 4 return 4
@pytest.mark.order(11)
def test_lineage(self) -> None:
pytest.skip("Lineage not configured. Skipping Test")
# Overriding the method since for Tableau we don't expect lineage to be shown on this assert. # Overriding the method since for Tableau we don't expect lineage to be shown on this assert.
# This differs from the base case # This differs from the base case
def assert_not_including(self, source_status: Status, sink_status: Status): def assert_not_including(self, source_status: Status, sink_status: Status):

View File

@ -73,6 +73,9 @@ class VerticaCliTest(CliCommonDB.TestSuite, SQACommonMethods):
def view_column_lineage_count(self) -> int: def view_column_lineage_count(self) -> int:
return 2 return 2
def expected_lineage_node(self) -> str:
return "e2e_vertica.VMart.public.vendor_dimension_v"
@staticmethod @staticmethod
def fqn_created_table() -> str: def fqn_created_table() -> str:
return "e2e_vertica.VMart.public.vendor_dimension_new" return "e2e_vertica.VMart.public.vendor_dimension_new"