Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

534 lines
21 KiB
Python
Raw Permalink Normal View History

import copy
import json
import unittest
from pathlib import Path
from unittest.mock import MagicMock, Mock, patch
from uuid import UUID
from metadata.generated.schema.entity.data.pipeline import Pipeline, Task
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.connections.pipeline.openLineageConnection import (
ConsumerOffsets,
SecurityProtocol,
)
from metadata.generated.schema.entity.services.pipelineService import (
PipelineConnection,
PipelineService,
PipelineServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.generated.schema.type.entityLineage import ColumnLineage
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.source.pipeline.openlineage.metadata import OpenlineageSource
from metadata.ingestion.source.pipeline.openlineage.models import OpenLineageEvent
from metadata.ingestion.source.pipeline.openlineage.utils import (
message_to_open_lineage_event,
)
# Global constants
MOCK_OL_CONFIG = {
"source": {
"type": "openlineage",
"serviceName": "openlineage_source",
"serviceConnection": {
"config": {
"type": "OpenLineage",
"brokersUrl": "testbroker:9092",
"topicName": "test-topic",
"consumerGroupName": "test-consumergroup",
"consumerOffsets": ConsumerOffsets.earliest,
"securityProtocol": SecurityProtocol.PLAINTEXT,
"sslConfig": {
"caCertificate": "",
"sslCertificate": "",
"sslKey": "",
},
"poolTimeout": 0.3,
"sessionTimeout": 1,
}
},
"sourceConfig": {"config": {"type": "PipelineMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
},
}
MOCK_SPLINE_UI_URL = "http://localhost:9090"
PIPELINE_ID = "3f784e72-5bf7-5704-8828-ae8464fe915b:lhq160w0"
MOCK_PIPELINE_URL = f"{MOCK_SPLINE_UI_URL}/app/events/overview/{PIPELINE_ID}"
MOCK_PIPELINE_SERVICE = PipelineService(
id="85811038-099a-11ed-861d-0242ac120002",
name="openlineage_source",
fullyQualifiedName=FullyQualifiedEntityName("openlineage_source"),
connection=PipelineConnection(),
serviceType=PipelineServiceType.Airflow,
)
MOCK_PIPELINE = Pipeline(
id="2aaa012e-099a-11ed-861d-0242ac120002",
name=PIPELINE_ID,
fullyQualifiedName=f"openlineage_source.{PIPELINE_ID}",
displayName="MSSQL <> Postgres",
sourceUrl=MOCK_PIPELINE_URL,
tasks=[
Task(
name=PIPELINE_ID,
displayName="jdbc postgres ssl app",
sourceUrl=MOCK_PIPELINE_URL,
)
],
service=EntityReference(
id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService"
),
)
VALID_EVENT = {
"run": {
"facets": {
"parent": {"job": {"name": "test-job", "namespace": "test-namespace"}}
}
},
"inputs": [],
"outputs": [],
"eventType": "START",
"job": {"name": "test-job", "namespace": "test-namespace"},
}
MISSING_RUN_FACETS_PARENT_JOB_NAME_EVENT = copy.deepcopy(VALID_EVENT)
del MISSING_RUN_FACETS_PARENT_JOB_NAME_EVENT["run"]["facets"]["parent"]["job"]["name"]
MALFORMED_NESTED_STRUCTURE_EVENT = copy.deepcopy(VALID_EVENT)
MALFORMED_NESTED_STRUCTURE_EVENT["run"]["facets"]["parent"]["job"] = "Not a dict"
with open(
f"{Path(__file__).parent}/../../resources/datasets/openlineage_event.json"
) as ol_file:
FULL_OL_KAFKA_EVENT = json.load(ol_file)
EXPECTED_OL_EVENT = OpenLineageEvent(
run_facet=FULL_OL_KAFKA_EVENT["run"],
job=FULL_OL_KAFKA_EVENT["job"],
event_type=FULL_OL_KAFKA_EVENT["eventType"],
inputs=FULL_OL_KAFKA_EVENT["inputs"],
outputs=FULL_OL_KAFKA_EVENT["outputs"],
)
class OpenLineageUnitTest(unittest.TestCase):
@patch(
"metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.test_connection"
)
def __init__(self, methodName, test_connection) -> None:
super().__init__(methodName)
test_connection.return_value = False
config = OpenMetadataWorkflowConfig.model_validate(MOCK_OL_CONFIG)
self.open_lineage_source = OpenlineageSource.create(
MOCK_OL_CONFIG["source"],
config.workflowConfig.openMetadataServerConfig,
)
self.open_lineage_source.context.__dict__["pipeline"] = MOCK_PIPELINE.name.root
self.open_lineage_source.context.__dict__[
"pipeline_service"
] = MOCK_PIPELINE_SERVICE.name.root
self.open_lineage_source.source_config.lineageInformation = {
"dbServiceNames": ["skun"]
}
@patch(
"metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.test_connection"
)
@patch("confluent_kafka.Consumer")
def setUp(self, mock_consumer, mock_test_connection):
mock_test_connection.return_value = False
self.mock_consumer = mock_consumer
def setup_mock_consumer_with_kafka_event(self, event):
mock_msg = MagicMock()
mock_msg.error.return_value = None
mock_msg.value.return_value = json.dumps(event).encode()
self.mock_consumer.poll.side_effect = [
mock_msg,
None,
None,
None,
None,
None,
None,
]
self.open_lineage_source.client = self.mock_consumer
def test_message_to_ol_event_valid_event(self):
"""Test conversion with a valid event."""
result = message_to_open_lineage_event(VALID_EVENT)
self.assertIsInstance(result, OpenLineageEvent)
def test_message_to_ol_event_missing_run_facets_parent_job_name(self):
"""Test conversion with missing 'run.facets.parent.job.name' field."""
with self.assertRaises(ValueError):
message_to_open_lineage_event(MISSING_RUN_FACETS_PARENT_JOB_NAME_EVENT)
def test_message_to_ol_event_malformed_nested_structure(self):
"""Test conversion with a malformed nested structure."""
with self.assertRaises(TypeError):
message_to_open_lineage_event(MALFORMED_NESTED_STRUCTURE_EVENT)
def test_poll_message_receives_message(self):
"""Test if poll_message receives a kafka message."""
self.setup_mock_consumer_with_kafka_event(VALID_EVENT)
result = self.open_lineage_source.client.poll(timeout=1)
self.assertIsNotNone(result)
self.assertEqual(json.loads(result.value().decode()), VALID_EVENT)
def read_openlineage_event_from_kafka(self, kafka_event):
self.setup_mock_consumer_with_kafka_event(kafka_event)
result_generator = self.open_lineage_source.get_pipelines_list()
results = []
try:
while True:
results.append(next(result_generator))
except StopIteration:
pass
return results[0]
def test_create_output_lineage_dict_empty_input(self):
"""Test with an empty input list."""
result = self.open_lineage_source._create_output_lineage_dict([])
self.assertEqual(result, {})
def test_create_output_lineage_dict_single_lineage_entry(self):
"""Test with a single lineage entry."""
lineage_info = [
("output_table", "input_table", "output_column", "input_column")
]
result = self.open_lineage_source._create_output_lineage_dict(lineage_info)
expected = {
"output_table": {
"input_table": [
ColumnLineage(
toColumn="output_column", fromColumns=["input_column"]
)
]
}
}
self.assertEqual(result, expected)
def test_create_output_lineage_dict_multiple_entries_different_outputs(self):
"""Test with multiple entries having different output tables."""
lineage_info = [
("output_table1", "input_table", "output_column1", "input_column"),
("output_table2", "input_table", "output_column2", "input_column"),
]
result = self.open_lineage_source._create_output_lineage_dict(lineage_info)
expected = {
"output_table1": {
"input_table": [
ColumnLineage(
toColumn="output_column1", fromColumns=["input_column"]
)
]
},
"output_table2": {
"input_table": [
ColumnLineage(
toColumn="output_column2", fromColumns=["input_column"]
)
]
},
}
self.assertEqual(result, expected)
def test_create_output_lineage_dict_multiple_entries_same_output(self):
"""Test with multiple entries sharing the same output table."""
lineage_info = [
("output_table", "input_table1", "output_column", "input_column1"),
("output_table", "input_table2", "output_column", "input_column2"),
]
result = self.open_lineage_source._create_output_lineage_dict(lineage_info)
expected = {
"output_table": {
"input_table1": [
ColumnLineage(
toColumn="output_column", fromColumns=["input_column1"]
)
],
"input_table2": [
ColumnLineage(
toColumn="output_column", fromColumns=["input_column2"]
)
],
}
}
self.assertEqual(result, expected)
def test_get_column_lineage_empty_inputs_outputs(self):
"""Test with empty input and output lists."""
inputs = []
outputs = []
result = self.open_lineage_source._get_column_lineage(inputs, outputs)
self.assertEqual(result, {})
@patch(
"metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn"
)
def test_build_ol_name_to_fqn_map_with_valid_data(self, mock_get_table_fqn):
# Mock _get_table_fqn to return a constructed FQN based on the provided table details
mock_get_table_fqn.side_effect = (
lambda table_details: f"database.schema.{table_details.name}"
)
tables = [
{"name": "schema.table1", "facets": {}, "namespace": "ns://"},
{"name": "schema.table2", "facets": {}, "namespace": "ns://"},
]
expected_map = {
"ns://schema.table1": "database.schema.table1",
"ns://schema.table2": "database.schema.table2",
}
result = self.open_lineage_source._build_ol_name_to_fqn_map(tables)
self.assertEqual(result, expected_map)
self.assertEqual(mock_get_table_fqn.call_count, 2)
@patch(
"metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn"
)
def test_build_ol_name_to_fqn_map_with_missing_fqn(self, mock_get_table_fqn):
# Mock _get_table_fqn to return None for missing FQN
mock_get_table_fqn.return_value = None
tables = [{"name": "schema.table1", "facets": {}, "namespace": "ns://"}]
expected_map = {} # Expect an empty map since FQN is missing
result = self.open_lineage_source._build_ol_name_to_fqn_map(tables)
self.assertEqual(result, expected_map)
@patch(
"metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn"
)
def test_build_ol_name_to_fqn_map_with_empty_tables(self, mock_get_table_fqn):
# No need to set up the mock specifically since it won't be called with empty input
tables = [] # No tables provided
expected_map = {} # Expect an empty map
result = self.open_lineage_source._build_ol_name_to_fqn_map(tables)
self.assertEqual(result, expected_map)
mock_get_table_fqn.assert_not_called()
@patch(
"metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn"
)
@patch(
"metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._build_ol_name_to_fqn_map"
)
def test_get_column_lineage_valid_inputs_outputs(
self, mock_build_map, mock_get_table_fqn
):
"""Test with valid input and output lists."""
# Setup
mock_get_table_fqn.side_effect = (
lambda table_details: f"database.schema.{table_details.name}"
)
mock_build_map.return_value = {
"s3a:/project-db/src_test1": "database.schema.input_table_1",
"s3a:/project-db/src_test2": "database.schema.input_table_2",
}
inputs = [
{"name": "schema.input_table1", "facets": {}, "namespace": "hive://"},
{"name": "schema.input_table2", "facets": {}, "namespace": "hive://"},
]
outputs = [
{
"name": "schema.output_table",
"facets": {
"columnLineage": {
"fields": {
"output_column1": {
"inputFields": [
{
"field": "input_column1",
"namespace": "s3a://project-db",
"name": "/src_test1",
}
]
},
"output_column2": {
"inputFields": [
{
"field": "input_column2",
"namespace": "s3a://project-db",
"name": "/src_test2",
}
]
},
}
}
},
}
]
result = self.open_lineage_source._get_column_lineage(inputs, outputs)
expected = {
"database.schema.output_table": {
"database.schema.input_table_1": [
ColumnLineage(
toColumn="database.schema.output_table.output_column1",
fromColumns=["database.schema.input_table_1.input_column1"],
)
],
"database.schema.input_table_2": [
ColumnLineage(
toColumn="database.schema.output_table.output_column2",
fromColumns=["database.schema.input_table_2.input_column2"],
)
],
}
}
self.assertEqual(result, expected)
def test_get_column_lineage__invalid_inputs_outputs_structure(self):
"""Test with invalid input and output structure."""
inputs = [{"invalid": "data"}]
outputs = [{"invalid": "data"}]
with self.assertRaises(ValueError):
self.open_lineage_source._get_column_lineage(inputs, outputs)
def test_get_table_details_with_symlinks(self):
"""Test with valid data where symlinks are present."""
data = {
"facets": {"symlinks": {"identifiers": [{"name": "project.schema.table"}]}}
}
result = self.open_lineage_source._get_table_details(data)
self.assertEqual(result.name, "table")
self.assertEqual(result.schema, "schema")
def test_get_table_details_without_symlinks(self):
"""Test with valid data but without symlinks."""
data = {"name": "schema.table"}
result = self.open_lineage_source._get_table_details(data)
self.assertEqual(result.name, "table")
self.assertEqual(result.schema, "schema")
def test_get_table_details_invalid_data_missing_symlinks_and_name(self):
"""Test with invalid data missing both symlinks and name."""
data = {}
with self.assertRaises(ValueError):
self.open_lineage_source._get_table_details(data)
def test_get_table_details_invalid_symlinks_structure(self):
"""Test with invalid symlinks structure."""
data = {"facets": {"symlinks": {"identifiers": [{}]}}}
with self.assertRaises(ValueError):
self.open_lineage_source._get_table_details(data)
def test_get_table_details_invalid_name_structure(self):
"""Test with invalid name structure."""
data = {"name": "invalidname"}
with self.assertRaises(ValueError):
self.open_lineage_source._get_table_details(data)
def test_get_pipelines_list(self):
"""Test get_pipelines_list method"""
ol_event = self.read_openlineage_event_from_kafka(FULL_OL_KAFKA_EVENT)
self.assertIsInstance(ol_event, OpenLineageEvent)
self.assertEqual(ol_event, EXPECTED_OL_EVENT)
@patch(
"metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn_from_om"
)
def test_yield_pipeline_lineage_details(self, mock_get_entity):
def t_fqn_build_side_effect(
table_details,
):
return f"testService.shopify.{table_details.name}"
def mock_get_uuid_by_name(entity, fqn):
if fqn == "testService.shopify.raw_product_catalog":
# source of table lineage
return Mock(id="69fc8906-4a4a-45ab-9a54-9cc2d399e10e")
elif fqn == "testService.shopify.fact_order_new5":
# dst of table lineage
return Mock(id="59fc8906-4a4a-45ab-9a54-9cc2d399e10e")
else:
# pipeline
z = Mock()
z.id.root = "79fc8906-4a4a-45ab-9a54-9cc2d399e10e"
return z
def extract_lineage_details(pip_results):
table_lineage = []
col_lineage = []
for r in pip_results:
table_lineage.append(
(
r.right.edge.fromEntity.id.root,
r.right.edge.toEntity.id.root,
)
)
for col in r.right.edge.lineageDetails.columnsLineage:
col_lineage.append((col.fromColumns[0].root, col.toColumn.root))
return table_lineage, col_lineage
# Set up the side effect for the mock entity FQN builder
mock_get_entity.side_effect = t_fqn_build_side_effect
ol_event = self.read_openlineage_event_from_kafka(FULL_OL_KAFKA_EVENT)
with patch.object(
OpenMetadataConnection,
"get_by_name",
create=True,
side_effect=mock_get_uuid_by_name,
):
pip_results = self.open_lineage_source.yield_pipeline_lineage_details(
ol_event
)
table_lineage, col_lineage = extract_lineage_details(pip_results)
expected_table_lineage = [
(
UUID("69fc8906-4a4a-45ab-9a54-9cc2d399e10e"),
UUID("59fc8906-4a4a-45ab-9a54-9cc2d399e10e"),
)
]
expected_col_lineage = [
(
"testService.shopify.raw_product_catalog.comments",
"testService.shopify.fact_order_new5.id",
),
(
"testService.shopify.raw_product_catalog.products",
"testService.shopify.fact_order_new5.randomid",
),
(
"testService.shopify.raw_product_catalog.platform",
"testService.shopify.fact_order_new5.zip",
),
]
self.assertEqual(col_lineage, expected_col_lineage)
self.assertEqual(table_lineage, expected_table_lineage)
if __name__ == "__main__":
unittest.main()