mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-10-31 18:48:35 +00:00 
			
		
		
		
	 d8e2187980
			
		
	
	
		d8e2187980
		
			
		
	
	
	
	
		
			
			* pydantic v2 * pydanticv2 * fix parser * fix annotated * fix model dumping * mysql ingestion * clean root models * clean root models * bump airflow * bump airflow * bump airflow * optionals * optionals * optionals * jdk * airflow migrate * fab provider * fab provider * fab provider * some more fixes * fixing tests and imports * model_dump and model_validate * model_dump and model_validate * model_dump and model_validate * union * pylint * pylint * integration tests * fix CostAnalysisReportData * integration tests * tests * missing defaults * missing defaults
		
			
				
	
	
		
			320 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			320 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #  Copyright 2021 Collate
 | |
| #  Licensed under the Apache License, Version 2.0 (the "License");
 | |
| #  you may not use this file except in compliance with the License.
 | |
| #  You may obtain a copy of the License at
 | |
| #  http://www.apache.org/licenses/LICENSE-2.0
 | |
| #  Unless required by applicable law or agreed to in writing, software
 | |
| #  distributed under the License is distributed on an "AS IS" BASIS,
 | |
| #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| #  See the License for the specific language governing permissions and
 | |
| #  limitations under the License.
 | |
| 
 | |
| """
 | |
| OpenMetadata high-level API Lineage test
 | |
| """
 | |
| from unittest import TestCase
 | |
| 
 | |
| from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
 | |
| from metadata.generated.schema.entity.data.dashboard import Dashboard
 | |
| from metadata.generated.schema.entity.data.dashboardDataModel import DashboardDataModel
 | |
| from metadata.generated.schema.entity.data.database import Database
 | |
| from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
 | |
| from metadata.generated.schema.entity.data.pipeline import Pipeline
 | |
| from metadata.generated.schema.entity.data.table import Table
 | |
| from metadata.generated.schema.entity.services.dashboardService import DashboardService
 | |
| from metadata.generated.schema.entity.services.databaseService import DatabaseService
 | |
| from metadata.generated.schema.entity.services.pipelineService import PipelineService
 | |
| from metadata.generated.schema.type.entityLineage import (
 | |
|     ColumnLineage,
 | |
|     EntitiesEdge,
 | |
|     EntityLineage,
 | |
|     LineageDetails,
 | |
| )
 | |
| from metadata.generated.schema.type.entityReference import EntityReference
 | |
| 
 | |
| from ..integration_base import (
 | |
|     generate_name,
 | |
|     get_create_entity,
 | |
|     get_create_service,
 | |
|     int_admin_ometa,
 | |
| )
 | |
| 
 | |
| 
 | |
| class OMetaLineageTest(TestCase):
 | |
|     """
 | |
|     Run this integration test with the local API available
 | |
|     Install the ingestion package before running the tests
 | |
|     """
 | |
| 
 | |
|     service_entity_id = None
 | |
| 
 | |
|     metadata = int_admin_ometa()
 | |
| 
 | |
|     assert metadata.health_check()
 | |
| 
 | |
|     db_service_name = generate_name()
 | |
|     pipeline_service_name = generate_name()
 | |
|     dashboard_service_name = generate_name()
 | |
| 
 | |
|     db_service = get_create_service(entity=DatabaseService, name=db_service_name)
 | |
|     pipeline_service = get_create_service(
 | |
|         entity=PipelineService, name=pipeline_service_name
 | |
|     )
 | |
|     dashboard_service = get_create_service(
 | |
|         entity=DashboardService, name=dashboard_service_name
 | |
|     )
 | |
| 
 | |
|     @classmethod
 | |
|     def setUpClass(cls) -> None:
 | |
|         """
 | |
|         Prepare ingredients
 | |
|         """
 | |
| 
 | |
|         cls.db_service_entity: DatabaseService = cls.metadata.create_or_update(
 | |
|             data=cls.db_service
 | |
|         )
 | |
|         cls.pipeline_service_entity: PipelineService = cls.metadata.create_or_update(
 | |
|             data=cls.pipeline_service
 | |
|         )
 | |
|         cls.dashboard_service_entity: DashboardService = cls.metadata.create_or_update(
 | |
|             data=cls.dashboard_service
 | |
|         )
 | |
| 
 | |
|         create_db_entity: Database = cls.metadata.create_or_update(
 | |
|             data=get_create_entity(
 | |
|                 entity=Database,
 | |
|                 reference=cls.db_service_entity.fullyQualifiedName,
 | |
|                 name=generate_name(),
 | |
|             )
 | |
|         )
 | |
| 
 | |
|         create_schema_entity = cls.metadata.create_or_update(
 | |
|             data=get_create_entity(
 | |
|                 entity=DatabaseSchema,
 | |
|                 reference=create_db_entity.fullyQualifiedName,
 | |
|                 name=generate_name(),
 | |
|             )
 | |
|         )
 | |
| 
 | |
|         cls.table1 = get_create_entity(
 | |
|             name=generate_name(),
 | |
|             entity=Table,
 | |
|             reference=create_schema_entity.fullyQualifiedName,
 | |
|         )
 | |
| 
 | |
|         cls.table1_entity = cls.metadata.create_or_update(data=cls.table1)
 | |
|         cls.table2 = get_create_entity(
 | |
|             name=generate_name(),
 | |
|             entity=Table,
 | |
|             reference=create_schema_entity.fullyQualifiedName,
 | |
|         )
 | |
| 
 | |
|         cls.table2_entity = cls.metadata.create_or_update(data=cls.table2)
 | |
| 
 | |
|         cls.pipeline = get_create_entity(
 | |
|             name=generate_name(),
 | |
|             entity=Pipeline,
 | |
|             reference=cls.pipeline_service_entity.fullyQualifiedName,
 | |
|         )
 | |
| 
 | |
|         cls.pipeline_entity = cls.metadata.create_or_update(data=cls.pipeline)
 | |
| 
 | |
|         cls.dashboard = get_create_entity(
 | |
|             name=generate_name(),
 | |
|             entity=Dashboard,
 | |
|             reference=cls.dashboard_service_entity.fullyQualifiedName,
 | |
|         )
 | |
|         cls.dashboard_entity = cls.metadata.create_or_update(data=cls.dashboard)
 | |
| 
 | |
|         cls.dashboard_datamodel = get_create_entity(
 | |
|             name=generate_name(),
 | |
|             entity=DashboardDataModel,
 | |
|             reference=cls.dashboard_service_entity.fullyQualifiedName,
 | |
|         )
 | |
|         cls.dashboard_datamodel_entity = cls.metadata.create_or_update(
 | |
|             data=cls.dashboard_datamodel
 | |
|         )
 | |
| 
 | |
|     @classmethod
 | |
|     def tearDownClass(cls) -> None:
 | |
|         """
 | |
|         Clean up
 | |
|         """
 | |
| 
 | |
|         db_service_id = str(
 | |
|             cls.metadata.get_by_name(
 | |
|                 entity=DatabaseService, fqn=cls.db_service_name
 | |
|             ).id.root
 | |
|         )
 | |
| 
 | |
|         pipeline_service_id = str(
 | |
|             cls.metadata.get_by_name(
 | |
|                 entity=PipelineService, fqn=cls.pipeline_service_name
 | |
|             ).id.root
 | |
|         )
 | |
| 
 | |
|         dashboard_service_id = str(
 | |
|             cls.metadata.get_by_name(
 | |
|                 entity=DashboardService, fqn=cls.dashboard_service_name
 | |
|             ).id.root
 | |
|         )
 | |
| 
 | |
|         cls.metadata.delete(
 | |
|             entity=PipelineService,
 | |
|             entity_id=pipeline_service_id,
 | |
|             recursive=True,
 | |
|             hard_delete=True,
 | |
|         )
 | |
|         cls.metadata.delete(
 | |
|             entity=DatabaseService,
 | |
|             entity_id=db_service_id,
 | |
|             recursive=True,
 | |
|             hard_delete=True,
 | |
|         )
 | |
|         cls.metadata.delete(
 | |
|             entity=DashboardService,
 | |
|             entity_id=dashboard_service_id,
 | |
|             recursive=True,
 | |
|             hard_delete=True,
 | |
|         )
 | |
| 
 | |
|     def test_create(self):
 | |
|         """
 | |
|         We can create a Lineage and get the origin node lineage info back
 | |
|         """
 | |
| 
 | |
|         from_id = str(self.table1_entity.id.root)
 | |
|         to_id = str(self.table2_entity.id.root)
 | |
| 
 | |
|         res = self.metadata.add_lineage(
 | |
|             data=AddLineageRequest(
 | |
|                 edge=EntitiesEdge(
 | |
|                     fromEntity=EntityReference(id=self.table1_entity.id, type="table"),
 | |
|                     toEntity=EntityReference(id=self.table2_entity.id, type="table"),
 | |
|                     lineageDetails=LineageDetails(description="test lineage"),
 | |
|                 ),
 | |
|             )
 | |
|         )
 | |
| 
 | |
|         # Check that we get the origin ID in the entity
 | |
|         self.assertEqual(from_id, res["entity"]["id"])
 | |
| 
 | |
|         # Check that the toEntity is a node in the origin lineage
 | |
|         node_id = next(
 | |
|             iter([node["id"] for node in res["nodes"] if node["id"] == to_id]), None
 | |
|         )
 | |
|         self.assertIsNotNone(node_id)
 | |
| 
 | |
|         # Add pipeline to the lineage edge
 | |
|         linage_request_1 = AddLineageRequest(
 | |
|             edge=EntitiesEdge(
 | |
|                 fromEntity=EntityReference(id=self.table1_entity.id, type="table"),
 | |
|                 toEntity=EntityReference(id=self.table2_entity.id, type="table"),
 | |
|                 lineageDetails=LineageDetails(
 | |
|                     description="test lineage",
 | |
|                     pipeline=EntityReference(
 | |
|                         id=self.pipeline_entity.id, type="pipeline"
 | |
|                     ),
 | |
|                 ),
 | |
|             ),
 | |
|         )
 | |
| 
 | |
|         res = self.metadata.add_lineage(data=linage_request_1, check_patch=True)
 | |
| 
 | |
|         res["entity"]["id"] = str(res["entity"]["id"])
 | |
|         self.assertEqual(len(res["downstreamEdges"]), 1)
 | |
|         self.assertEqual(
 | |
|             res["downstreamEdges"][0]["lineageDetails"]["pipeline"]["id"],
 | |
|             str(self.pipeline_entity.id.root),
 | |
|         )
 | |
| 
 | |
|         # Add a column to the lineage edge
 | |
|         linage_request_2 = AddLineageRequest(
 | |
|             edge=EntitiesEdge(
 | |
|                 fromEntity=EntityReference(id=self.table1_entity.id, type="table"),
 | |
|                 toEntity=EntityReference(id=self.table2_entity.id, type="table"),
 | |
|                 lineageDetails=LineageDetails(
 | |
|                     description="test lineage",
 | |
|                     columnsLineage=[
 | |
|                         ColumnLineage(
 | |
|                             fromColumns=[
 | |
|                                 f"{self.table1_entity.fullyQualifiedName.root}.id"
 | |
|                             ],
 | |
|                             toColumn=f"{self.table2_entity.fullyQualifiedName.root}.id",
 | |
|                         )
 | |
|                     ],
 | |
|                 ),
 | |
|             ),
 | |
|         )
 | |
| 
 | |
|         res = self.metadata.add_lineage(data=linage_request_2, check_patch=True)
 | |
| 
 | |
|         res["entity"]["id"] = str(res["entity"]["id"])
 | |
|         self.assertEqual(len(res["downstreamEdges"]), 1)
 | |
|         self.assertEqual(
 | |
|             res["downstreamEdges"][0]["lineageDetails"]["pipeline"]["id"],
 | |
|             str(self.pipeline_entity.id.root),
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             len(res["downstreamEdges"][0]["lineageDetails"]["columnsLineage"]), 1
 | |
|         )
 | |
| 
 | |
|         # Add a new column to the lineage edge
 | |
|         linage_request_2 = AddLineageRequest(
 | |
|             edge=EntitiesEdge(
 | |
|                 fromEntity=EntityReference(id=self.table1_entity.id, type="table"),
 | |
|                 toEntity=EntityReference(id=self.table2_entity.id, type="table"),
 | |
|                 lineageDetails=LineageDetails(
 | |
|                     description="test lineage",
 | |
|                     columnsLineage=[
 | |
|                         ColumnLineage(
 | |
|                             fromColumns=[
 | |
|                                 f"{self.table1_entity.fullyQualifiedName.root}.name"
 | |
|                             ],
 | |
|                             toColumn=f"{self.table2_entity.fullyQualifiedName.root}.name",
 | |
|                         )
 | |
|                     ],
 | |
|                 ),
 | |
|             ),
 | |
|         )
 | |
| 
 | |
|         res = self.metadata.add_lineage(data=linage_request_2, check_patch=True)
 | |
| 
 | |
|         res["entity"]["id"] = str(res["entity"]["id"])
 | |
|         self.assertEqual(len(res["downstreamEdges"]), 1)
 | |
|         self.assertEqual(
 | |
|             res["downstreamEdges"][0]["lineageDetails"]["pipeline"]["id"],
 | |
|             str(self.pipeline_entity.id.root),
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             len(res["downstreamEdges"][0]["lineageDetails"]["columnsLineage"]), 2
 | |
|         )
 | |
| 
 | |
|     def test_table_datamodel_lineage(self):
 | |
|         """We can create and get lineage for a table to a dashboard datamodel"""
 | |
| 
 | |
|         from_id = str(self.table1_entity.id.root)
 | |
| 
 | |
|         res = self.metadata.add_lineage(
 | |
|             data=AddLineageRequest(
 | |
|                 edge=EntitiesEdge(
 | |
|                     fromEntity=EntityReference(id=self.table1_entity.id, type="table"),
 | |
|                     toEntity=EntityReference(
 | |
|                         id=self.dashboard_datamodel_entity.id, type="dashboardDataModel"
 | |
|                     ),
 | |
|                     lineageDetails=LineageDetails(description="test lineage"),
 | |
|                 ),
 | |
|             )
 | |
|         )
 | |
| 
 | |
|         # Check that we get the origin ID in the entity
 | |
|         self.assertEqual(from_id, res["entity"]["id"])
 | |
| 
 | |
|         # use the SDK to get the lineage
 | |
|         datamodel_lineage = self.metadata.get_lineage_by_name(
 | |
|             entity=DashboardDataModel,
 | |
|             fqn=self.dashboard_datamodel_entity.fullyQualifiedName.root,
 | |
|         )
 | |
|         entity_lineage = EntityLineage.parse_obj(datamodel_lineage)
 | |
|         self.assertEqual(from_id, str(entity_lineage.upstreamEdges[0].fromEntity.root))
 |