mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-10 16:25:37 +00:00
This commit is contained in:
parent
d09bca26f6
commit
1e56c76c0e
@ -22,13 +22,14 @@ from pydantic import BaseModel
|
||||
|
||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||
from metadata.generated.schema.entity.services.databaseService import DatabaseService
|
||||
from metadata.generated.schema.type.basic import FullyQualifiedEntityName, Uuid
|
||||
from metadata.generated.schema.type.entityLineage import ColumnLineage, EntitiesEdge
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
|
||||
from metadata.ingestion.lineage.parser import LINEAGE_PARSING_TIMEOUT
|
||||
from metadata.ingestion.models.patch_request import build_patch
|
||||
from metadata.ingestion.ometa.client import REST, APIError
|
||||
from metadata.ingestion.ometa.utils import get_entity_type, quote
|
||||
from metadata.ingestion.ometa.utils import get_entity_type, model_str, quote
|
||||
from metadata.utils.logger import ometa_logger
|
||||
from metadata.utils.lru_cache import LRU_CACHE_SIZE, LRUCache
|
||||
|
||||
@ -260,7 +261,7 @@ class OMetaLineageMixin(Generic[T]):
|
||||
def get_lineage_by_id(
|
||||
self,
|
||||
entity: Union[Type[T], str],
|
||||
entity_id: str,
|
||||
entity_id: Union[str, Uuid],
|
||||
up_depth: int = 1,
|
||||
down_depth: int = 1,
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
@ -272,13 +273,16 @@ class OMetaLineageMixin(Generic[T]):
|
||||
:param down_depth: Downstream depth of lineage (default=1, min=0, max=3)
|
||||
"""
|
||||
return self._get_lineage(
|
||||
entity=entity, path=entity_id, up_depth=up_depth, down_depth=down_depth
|
||||
entity=entity,
|
||||
path=model_str(entity_id),
|
||||
up_depth=up_depth,
|
||||
down_depth=down_depth,
|
||||
)
|
||||
|
||||
def get_lineage_by_name(
|
||||
self,
|
||||
entity: Union[Type[T], str],
|
||||
fqn: str,
|
||||
fqn: Union[str, FullyQualifiedEntityName],
|
||||
up_depth: int = 1,
|
||||
down_depth: int = 1,
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
@ -291,7 +295,7 @@ class OMetaLineageMixin(Generic[T]):
|
||||
"""
|
||||
return self._get_lineage(
|
||||
entity=entity,
|
||||
path=f"name/{quote(fqn)}",
|
||||
path=f"name/{quote(model_str(fqn))}",
|
||||
up_depth=up_depth,
|
||||
down_depth=down_depth,
|
||||
)
|
||||
|
@ -288,6 +288,30 @@ class OMetaLineageTest(TestCase):
|
||||
len(res["downstreamEdges"][0]["lineageDetails"]["columnsLineage"]), 2
|
||||
)
|
||||
|
||||
# We can get lineage by ID
|
||||
lineage_id = self.metadata.get_lineage_by_id(
|
||||
entity=Table, entity_id=self.table2_entity.id.root
|
||||
)
|
||||
assert lineage_id["entity"]["id"] == str(self.table2_entity.id.root)
|
||||
|
||||
# Same thing works if we pass directly the Uuid
|
||||
lineage_uuid = self.metadata.get_lineage_by_id(
|
||||
entity=Table, entity_id=self.table2_entity.id
|
||||
)
|
||||
assert lineage_uuid["entity"]["id"] == str(self.table2_entity.id.root)
|
||||
|
||||
# We can also get lineage by name
|
||||
lineage_str = self.metadata.get_lineage_by_name(
|
||||
entity=Table, fqn=self.table2_entity.fullyQualifiedName.root
|
||||
)
|
||||
assert lineage_str["entity"]["id"] == str(self.table2_entity.id.root)
|
||||
|
||||
# Or passing the FQN
|
||||
lineage_fqn = self.metadata.get_lineage_by_name(
|
||||
entity=Table, fqn=self.table2_entity.fullyQualifiedName
|
||||
)
|
||||
assert lineage_fqn["entity"]["id"] == str(self.table2_entity.id.root)
|
||||
|
||||
def test_delete_by_source(self):
|
||||
"""
|
||||
Test case for deleting lineage by source.
|
||||
|
Loading…
x
Reference in New Issue
Block a user