feat(ingest): lib - add better support for working with urns (#4172)

Co-authored-by: Xu Wang <xu.wang@grandrounds.com>
This commit is contained in:
Xu Wang 2022-02-22 19:39:24 -08:00 committed by GitHub
parent ede31c4177
commit aa3363bcc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 330 additions and 0 deletions

View File

@ -0,0 +1,31 @@
from typing import List
from datahub.utilities.urns.error import InvalidUrnError
from datahub.utilities.urns.urn import Urn
class DataPlatformUrn(Urn):
"""
expected dataset urn format: urn:li:dataPlatform:<platform_name>. example: "urn:li:dataPlatform:hive"
"""
ENTITY_TYPE: str = "dataPlatform"
def __init__(self, entity_type: str, entity_id: List[str], domain: str = "li"):
super().__init__(entity_type, entity_id, domain)
@classmethod
def create_from_string(cls, urn_str: str) -> "DataPlatformUrn":
urn: Urn = super().create_from_string(urn_str)
return cls(urn.get_type(), urn.get_entity_id(), urn.get_domain())
@classmethod
def create_from_id(cls, platform_id: str) -> "DataPlatformUrn":
return cls(DataPlatformUrn.ENTITY_TYPE, [platform_id])
@staticmethod
def _validate_entity_type(entity_type: str) -> None:
if entity_type != DataPlatformUrn.ENTITY_TYPE:
raise InvalidUrnError(
f"Entity type should be {DataPlatformUrn.ENTITY_TYPE} but found {entity_type}"
)

View File

@ -0,0 +1,89 @@
from typing import List, Set
from datahub.metadata.schema_classes import FabricTypeClass
from datahub.utilities.urns.data_platform_urn import DataPlatformUrn
from datahub.utilities.urns.error import InvalidUrnError
from datahub.utilities.urns.urn import Urn
class DatasetUrn(Urn):
"""
expected dataset urn format: urn:li:dataset:(<platform_urn_str>,<table_name>,env). example:
urn:li:dataset:(urn:li:dataPlatform:hive,member,prod)
"""
ENTITY_TYPE: str = "dataset"
VALID_FABRIC_SET: Set[str] = set(
[
str(getattr(FabricTypeClass, attr)).upper()
for attr in dir(FabricTypeClass)
if not callable(getattr(FabricTypeClass, attr)) and not attr.startswith("_")
]
)
def __init__(self, entity_type: str, entity_id: List[str], domain: str = "li"):
super().__init__(entity_type, entity_id, domain)
@classmethod
def create_from_string(cls, urn_str: str) -> "DatasetUrn":
"""
Create a DatasetUrn from the its string representation
:param urn_str: the string representation of the DatasetUrn
:return: DatasetUrn of the given string representation
:raises InvalidUrnError is the string representation is in invalid format
"""
urn: Urn = super().create_from_string(urn_str)
return cls(urn.get_type(), urn.get_entity_id(), urn.get_domain())
def get_data_platform_urn(self) -> DataPlatformUrn:
"""
:return: the DataPlatformUrn of where the Dataset is created
"""
return DataPlatformUrn.create_from_string(self.get_entity_id()[0])
def get_dataset_name(self) -> str:
"""
:return: the dataset name from this DatasetUrn
"""
return self.get_entity_id()[1]
def get_env(self) -> str:
"""
:return: the environment where the Dataset is created
"""
return self.get_entity_id()[2]
@classmethod
def create_from_ids(
cls, platform_id: str, table_name: str, env: str
) -> "DatasetUrn":
entity_id: List[str] = [
str(DataPlatformUrn.create_from_id(platform_id)),
table_name,
env,
]
return cls(DatasetUrn.ENTITY_TYPE, entity_id)
@staticmethod
def _validate_entity_type(entity_type: str) -> None:
if entity_type != DatasetUrn.ENTITY_TYPE:
raise InvalidUrnError(
f"Entity type should be {DatasetUrn.ENTITY_TYPE} but found {entity_type}"
)
@staticmethod
def _validate_entity_id(entity_id: List[str]) -> None:
# expected entity id format (<platform_urn>,<table_name>,<env>)
if len(entity_id) != 3:
raise InvalidUrnError(
f"Expect 3 parts in the entity id but found {entity_id}"
)
platform_urn_str = entity_id[0]
DataPlatformUrn.validate(platform_urn_str)
env = entity_id[2].upper()
if env not in DatasetUrn.VALID_FABRIC_SET:
raise InvalidUrnError(
f"Invalid env:{env}. Allowed evn are {DatasetUrn.VALID_FABRIC_SET}"
)

View File

@ -0,0 +1,3 @@
class InvalidUrnError(Exception):
def __init__(self, msg: str):
super().__init__(msg)

View File

@ -0,0 +1,129 @@
from typing import List
from datahub.utilities.urns.error import InvalidUrnError
class Urn:
"""
URNs are Globally Unique Identifiers (GUID) used to represent an entity.
It will be in format of urn:<domain>:<type>:<id>
"""
URN_PREFIX: str = "urn"
# all the Datahub urn use li domain for now.
LI_DOMAIN: str = "li"
_entity_type: str
_domain: str
_entity_id: List[str]
def __init__(
self, entity_type: str, entity_id: List[str], urn_domain: str = LI_DOMAIN
):
if len(entity_id) == 0:
raise InvalidUrnError("Empty entity id.")
self._validate_entity_type(entity_type)
self._validate_entity_id(entity_id)
self._entity_type = entity_type
self._domain = urn_domain
self._entity_id = entity_id
@classmethod
def create_from_string(cls, urn_str: str) -> "Urn":
"""
Create a Urn from the its string representation
:param urn_str: the string representation of the Urn
:return: Urn of the given string representation
:raises InvalidUrnError if the string representation is in invalid format
"""
# expect urn string in format of urn:<domain>:<type>:<id>
cls.validate(urn_str)
parts: List[str] = urn_str.split(":", 3)
return cls(parts[2], cls._get_entity_id_from_str(parts[3]), parts[1])
@classmethod
def validate(cls, urn_str: str) -> None:
"""
Validate if a string is in valid Urn format
:param urn_str: to be validated urn string
:raises InvalidUrnError if the string representation is in invalid format
"""
parts: List[str] = urn_str.split(":", 3)
if len(parts) != 4:
raise InvalidUrnError(
f"Invalid urn string: {urn_str}. Expect 4 parts from urn string but found {len(parts)}"
)
if "" in parts:
raise InvalidUrnError(
f"Invalid urn string: {urn_str}. There should not be empty parts in urn string."
)
if parts[0] != Urn.URN_PREFIX:
raise InvalidUrnError(
f'Invalid urn string: {urn_str}. Expect urn starting with "urn" but found {parts[0]}'
)
if "" in cls._get_entity_id_from_str(parts[3]):
raise InvalidUrnError(
f"Invalid entity id in urn string: {urn_str}. There should not be empty parts in entity id."
)
cls._validate_entity_type(parts[2])
cls._validate_entity_id(cls._get_entity_id_from_str(parts[3]))
def get_type(self) -> str:
return self._entity_type
def get_entity_id(self) -> List[str]:
return self._entity_id
def get_entity_id_as_string(self) -> str:
"""
:return: string representation of the entity ids. If there are more than one part in the entity id part, it will
return in this format (<part1>,<part2>,...)
"""
return self._entity_id_to_string()
def get_domain(self) -> str:
return self._domain
@staticmethod
def _get_entity_id_from_str(entity_id: str) -> List[str]:
if not (entity_id.startswith("(") and entity_id.endswith(")")):
return [entity_id]
return [sub_id.strip() for sub_id in entity_id[1:-1].split(",")]
@staticmethod
def _validate_entity_type(entity_type: str) -> None:
pass
@staticmethod
def _validate_entity_id(entity_id: List[str]) -> None:
pass
def __str__(self) -> str:
return f"{self.URN_PREFIX}:{self._domain}:{self._entity_type}:{self._entity_id_to_string()}"
def _entity_id_to_string(self) -> str:
if len(self._entity_id) == 1:
return self._entity_id[0]
result = ""
for part in self._entity_id:
result = result + part + ","
return f"({result[:-1]})"
def __hash__(self) -> int:
return hash((self._domain, self._entity_type) + tuple(self._entity_id))
def __eq__(self, other: object) -> bool:
if not isinstance(other, Urn):
return False
return (
self._entity_id == other._entity_id
and self._domain == other._domain
and self._entity_type == other._entity_type
)

View File

@ -0,0 +1,42 @@
import unittest
from datahub.utilities.urns.data_platform_urn import DataPlatformUrn
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.error import InvalidUrnError
class TestDatasetUrn(unittest.TestCase):
def test_parse_urn(self) -> None:
dataset_urn_str = "urn:li:dataset:(urn:li:dataPlatform:abc,def,prod)"
dataset_urn = DatasetUrn.create_from_string(dataset_urn_str)
assert (
dataset_urn.get_data_platform_urn()
== DataPlatformUrn.create_from_string("urn:li:dataPlatform:abc")
)
assert dataset_urn.get_dataset_name() == "def"
assert dataset_urn.get_env() == "prod"
assert (
dataset_urn.__str__() == "urn:li:dataset:(urn:li:dataPlatform:abc,def,prod)"
)
assert dataset_urn == DatasetUrn(
"dataset", ["urn:li:dataPlatform:abc", "def", "prod"]
)
def test_invalid_urn(self) -> None:
with self.assertRaises(InvalidUrnError):
DatasetUrn.create_from_string(
"urn:li:abc:(urn:li:dataPlatform:abc,def,prod)"
)
with self.assertRaises(InvalidUrnError):
DatasetUrn.create_from_string(
"urn:li:dataset:(urn:li:user:abc,dataset,prod)"
)
with self.assertRaises(InvalidUrnError):
DatasetUrn.create_from_string("urn:li:dataset:(urn:li:user:abc,dataset)")
with self.assertRaises(InvalidUrnError):
DatasetUrn.create_from_string(
"urn:li:dataset:(urn:li:user:abc,dataset,invalidEnv)"
)

View File

@ -0,0 +1,36 @@
import unittest
from datahub.utilities.urns.error import InvalidUrnError
from datahub.utilities.urns.urn import Urn
class TestUrn(unittest.TestCase):
def test_parse_urn(self) -> None:
simple_urn_str = "urn:li:dataPlatform:abc"
urn = Urn.create_from_string(simple_urn_str)
assert urn.get_entity_id_as_string() == "abc"
assert urn.get_entity_id() == ["abc"]
assert urn.get_type() == "dataPlatform"
assert urn.get_domain() == "li"
assert urn.__str__() == simple_urn_str
assert urn == Urn("dataPlatform", ["abc"])
complex_urn_str = "urn:li:dataset:(urn:li:dataPlatform:abc, def, prod)"
urn = Urn.create_from_string(complex_urn_str)
assert urn.get_entity_id_as_string() == "(urn:li:dataPlatform:abc,def,prod)"
assert urn.get_entity_id() == ["urn:li:dataPlatform:abc", "def", "prod"]
assert urn.get_type() == "dataset"
assert urn.__str__() == "urn:li:dataset:(urn:li:dataPlatform:abc,def,prod)"
def test_invalid_urn(self) -> None:
with self.assertRaises(InvalidUrnError):
Urn.create_from_string("urn:li:abc")
with self.assertRaises(InvalidUrnError):
Urn.create_from_string("urn:li:abc:")
with self.assertRaises(InvalidUrnError):
Urn.create_from_string("urn:li:abc:()")
with self.assertRaises(InvalidUrnError):
Urn.create_from_string("urn:li:abc:(abc,)")