diff --git a/metadata-ingestion/src/datahub/utilities/urns/__init__.py b/metadata-ingestion/src/datahub/utilities/urns/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/metadata-ingestion/src/datahub/utilities/urns/data_platform_urn.py b/metadata-ingestion/src/datahub/utilities/urns/data_platform_urn.py new file mode 100644 index 0000000000..8732592b80 --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/urns/data_platform_urn.py @@ -0,0 +1,31 @@ +from typing import List + +from datahub.utilities.urns.error import InvalidUrnError +from datahub.utilities.urns.urn import Urn + + +class DataPlatformUrn(Urn): + """ + expected dataset urn format: urn:li:dataPlatform:. example: "urn:li:dataPlatform:hive" + """ + + ENTITY_TYPE: str = "dataPlatform" + + def __init__(self, entity_type: str, entity_id: List[str], domain: str = "li"): + super().__init__(entity_type, entity_id, domain) + + @classmethod + def create_from_string(cls, urn_str: str) -> "DataPlatformUrn": + urn: Urn = super().create_from_string(urn_str) + return cls(urn.get_type(), urn.get_entity_id(), urn.get_domain()) + + @classmethod + def create_from_id(cls, platform_id: str) -> "DataPlatformUrn": + return cls(DataPlatformUrn.ENTITY_TYPE, [platform_id]) + + @staticmethod + def _validate_entity_type(entity_type: str) -> None: + if entity_type != DataPlatformUrn.ENTITY_TYPE: + raise InvalidUrnError( + f"Entity type should be {DataPlatformUrn.ENTITY_TYPE} but found {entity_type}" + ) diff --git a/metadata-ingestion/src/datahub/utilities/urns/dataset_urn.py b/metadata-ingestion/src/datahub/utilities/urns/dataset_urn.py new file mode 100644 index 0000000000..6563c465a8 --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/urns/dataset_urn.py @@ -0,0 +1,89 @@ +from typing import List, Set + +from datahub.metadata.schema_classes import FabricTypeClass +from datahub.utilities.urns.data_platform_urn import DataPlatformUrn +from datahub.utilities.urns.error import InvalidUrnError +from datahub.utilities.urns.urn import Urn + + +class DatasetUrn(Urn): + """ + expected dataset urn format: urn:li:dataset:(,,env). example: + urn:li:dataset:(urn:li:dataPlatform:hive,member,prod) + """ + + ENTITY_TYPE: str = "dataset" + VALID_FABRIC_SET: Set[str] = set( + [ + str(getattr(FabricTypeClass, attr)).upper() + for attr in dir(FabricTypeClass) + if not callable(getattr(FabricTypeClass, attr)) and not attr.startswith("_") + ] + ) + + def __init__(self, entity_type: str, entity_id: List[str], domain: str = "li"): + super().__init__(entity_type, entity_id, domain) + + @classmethod + def create_from_string(cls, urn_str: str) -> "DatasetUrn": + """ + Create a DatasetUrn from the its string representation + :param urn_str: the string representation of the DatasetUrn + :return: DatasetUrn of the given string representation + :raises InvalidUrnError is the string representation is in invalid format + """ + urn: Urn = super().create_from_string(urn_str) + return cls(urn.get_type(), urn.get_entity_id(), urn.get_domain()) + + def get_data_platform_urn(self) -> DataPlatformUrn: + """ + :return: the DataPlatformUrn of where the Dataset is created + """ + return DataPlatformUrn.create_from_string(self.get_entity_id()[0]) + + def get_dataset_name(self) -> str: + """ + :return: the dataset name from this DatasetUrn + """ + return self.get_entity_id()[1] + + def get_env(self) -> str: + """ + :return: the environment where the Dataset is created + """ + return self.get_entity_id()[2] + + @classmethod + def create_from_ids( + cls, platform_id: str, table_name: str, env: str + ) -> "DatasetUrn": + entity_id: List[str] = [ + str(DataPlatformUrn.create_from_id(platform_id)), + table_name, + env, + ] + return cls(DatasetUrn.ENTITY_TYPE, entity_id) + + @staticmethod + def _validate_entity_type(entity_type: str) -> None: + if entity_type != DatasetUrn.ENTITY_TYPE: + raise InvalidUrnError( + f"Entity type should be {DatasetUrn.ENTITY_TYPE} but found {entity_type}" + ) + + @staticmethod + def _validate_entity_id(entity_id: List[str]) -> None: + # expected entity id format (,,) + if len(entity_id) != 3: + raise InvalidUrnError( + f"Expect 3 parts in the entity id but found {entity_id}" + ) + + platform_urn_str = entity_id[0] + + DataPlatformUrn.validate(platform_urn_str) + env = entity_id[2].upper() + if env not in DatasetUrn.VALID_FABRIC_SET: + raise InvalidUrnError( + f"Invalid env:{env}. Allowed evn are {DatasetUrn.VALID_FABRIC_SET}" + ) diff --git a/metadata-ingestion/src/datahub/utilities/urns/error.py b/metadata-ingestion/src/datahub/utilities/urns/error.py new file mode 100644 index 0000000000..12b7c02ab2 --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/urns/error.py @@ -0,0 +1,3 @@ +class InvalidUrnError(Exception): + def __init__(self, msg: str): + super().__init__(msg) diff --git a/metadata-ingestion/src/datahub/utilities/urns/urn.py b/metadata-ingestion/src/datahub/utilities/urns/urn.py new file mode 100644 index 0000000000..ea357b9101 --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/urns/urn.py @@ -0,0 +1,129 @@ +from typing import List + +from datahub.utilities.urns.error import InvalidUrnError + + +class Urn: + """ + URNs are Globally Unique Identifiers (GUID) used to represent an entity. + It will be in format of urn::: + """ + + URN_PREFIX: str = "urn" + # all the Datahub urn use li domain for now. + LI_DOMAIN: str = "li" + + _entity_type: str + _domain: str + _entity_id: List[str] + + def __init__( + self, entity_type: str, entity_id: List[str], urn_domain: str = LI_DOMAIN + ): + if len(entity_id) == 0: + raise InvalidUrnError("Empty entity id.") + self._validate_entity_type(entity_type) + self._validate_entity_id(entity_id) + self._entity_type = entity_type + self._domain = urn_domain + self._entity_id = entity_id + + @classmethod + def create_from_string(cls, urn_str: str) -> "Urn": + """ + Create a Urn from the its string representation + :param urn_str: the string representation of the Urn + :return: Urn of the given string representation + :raises InvalidUrnError if the string representation is in invalid format + """ + + # expect urn string in format of urn::: + cls.validate(urn_str) + parts: List[str] = urn_str.split(":", 3) + + return cls(parts[2], cls._get_entity_id_from_str(parts[3]), parts[1]) + + @classmethod + def validate(cls, urn_str: str) -> None: + """ + Validate if a string is in valid Urn format + :param urn_str: to be validated urn string + :raises InvalidUrnError if the string representation is in invalid format + """ + parts: List[str] = urn_str.split(":", 3) + if len(parts) != 4: + raise InvalidUrnError( + f"Invalid urn string: {urn_str}. Expect 4 parts from urn string but found {len(parts)}" + ) + + if "" in parts: + raise InvalidUrnError( + f"Invalid urn string: {urn_str}. There should not be empty parts in urn string." + ) + + if parts[0] != Urn.URN_PREFIX: + raise InvalidUrnError( + f'Invalid urn string: {urn_str}. Expect urn starting with "urn" but found {parts[0]}' + ) + + if "" in cls._get_entity_id_from_str(parts[3]): + raise InvalidUrnError( + f"Invalid entity id in urn string: {urn_str}. There should not be empty parts in entity id." + ) + + cls._validate_entity_type(parts[2]) + cls._validate_entity_id(cls._get_entity_id_from_str(parts[3])) + + def get_type(self) -> str: + return self._entity_type + + def get_entity_id(self) -> List[str]: + return self._entity_id + + def get_entity_id_as_string(self) -> str: + """ + :return: string representation of the entity ids. If there are more than one part in the entity id part, it will + return in this format (,,...) + """ + return self._entity_id_to_string() + + def get_domain(self) -> str: + return self._domain + + @staticmethod + def _get_entity_id_from_str(entity_id: str) -> List[str]: + if not (entity_id.startswith("(") and entity_id.endswith(")")): + return [entity_id] + return [sub_id.strip() for sub_id in entity_id[1:-1].split(",")] + + @staticmethod + def _validate_entity_type(entity_type: str) -> None: + pass + + @staticmethod + def _validate_entity_id(entity_id: List[str]) -> None: + pass + + def __str__(self) -> str: + return f"{self.URN_PREFIX}:{self._domain}:{self._entity_type}:{self._entity_id_to_string()}" + + def _entity_id_to_string(self) -> str: + if len(self._entity_id) == 1: + return self._entity_id[0] + result = "" + for part in self._entity_id: + result = result + part + "," + return f"({result[:-1]})" + + def __hash__(self) -> int: + return hash((self._domain, self._entity_type) + tuple(self._entity_id)) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, Urn): + return False + + return ( + self._entity_id == other._entity_id + and self._domain == other._domain + and self._entity_type == other._entity_type + ) diff --git a/metadata-ingestion/tests/unit/test_dataset_urn.py b/metadata-ingestion/tests/unit/test_dataset_urn.py new file mode 100644 index 0000000000..e1e37409d8 --- /dev/null +++ b/metadata-ingestion/tests/unit/test_dataset_urn.py @@ -0,0 +1,42 @@ +import unittest + +from datahub.utilities.urns.data_platform_urn import DataPlatformUrn +from datahub.utilities.urns.dataset_urn import DatasetUrn +from datahub.utilities.urns.error import InvalidUrnError + + +class TestDatasetUrn(unittest.TestCase): + def test_parse_urn(self) -> None: + dataset_urn_str = "urn:li:dataset:(urn:li:dataPlatform:abc,def,prod)" + dataset_urn = DatasetUrn.create_from_string(dataset_urn_str) + assert ( + dataset_urn.get_data_platform_urn() + == DataPlatformUrn.create_from_string("urn:li:dataPlatform:abc") + ) + assert dataset_urn.get_dataset_name() == "def" + assert dataset_urn.get_env() == "prod" + assert ( + dataset_urn.__str__() == "urn:li:dataset:(urn:li:dataPlatform:abc,def,prod)" + ) + assert dataset_urn == DatasetUrn( + "dataset", ["urn:li:dataPlatform:abc", "def", "prod"] + ) + + def test_invalid_urn(self) -> None: + with self.assertRaises(InvalidUrnError): + DatasetUrn.create_from_string( + "urn:li:abc:(urn:li:dataPlatform:abc,def,prod)" + ) + + with self.assertRaises(InvalidUrnError): + DatasetUrn.create_from_string( + "urn:li:dataset:(urn:li:user:abc,dataset,prod)" + ) + + with self.assertRaises(InvalidUrnError): + DatasetUrn.create_from_string("urn:li:dataset:(urn:li:user:abc,dataset)") + + with self.assertRaises(InvalidUrnError): + DatasetUrn.create_from_string( + "urn:li:dataset:(urn:li:user:abc,dataset,invalidEnv)" + ) diff --git a/metadata-ingestion/tests/unit/test_urn.py b/metadata-ingestion/tests/unit/test_urn.py new file mode 100644 index 0000000000..8860b0652d --- /dev/null +++ b/metadata-ingestion/tests/unit/test_urn.py @@ -0,0 +1,36 @@ +import unittest + +from datahub.utilities.urns.error import InvalidUrnError +from datahub.utilities.urns.urn import Urn + + +class TestUrn(unittest.TestCase): + def test_parse_urn(self) -> None: + simple_urn_str = "urn:li:dataPlatform:abc" + urn = Urn.create_from_string(simple_urn_str) + assert urn.get_entity_id_as_string() == "abc" + assert urn.get_entity_id() == ["abc"] + assert urn.get_type() == "dataPlatform" + assert urn.get_domain() == "li" + assert urn.__str__() == simple_urn_str + assert urn == Urn("dataPlatform", ["abc"]) + + complex_urn_str = "urn:li:dataset:(urn:li:dataPlatform:abc, def, prod)" + urn = Urn.create_from_string(complex_urn_str) + assert urn.get_entity_id_as_string() == "(urn:li:dataPlatform:abc,def,prod)" + assert urn.get_entity_id() == ["urn:li:dataPlatform:abc", "def", "prod"] + assert urn.get_type() == "dataset" + assert urn.__str__() == "urn:li:dataset:(urn:li:dataPlatform:abc,def,prod)" + + def test_invalid_urn(self) -> None: + with self.assertRaises(InvalidUrnError): + Urn.create_from_string("urn:li:abc") + + with self.assertRaises(InvalidUrnError): + Urn.create_from_string("urn:li:abc:") + + with self.assertRaises(InvalidUrnError): + Urn.create_from_string("urn:li:abc:()") + + with self.assertRaises(InvalidUrnError): + Urn.create_from_string("urn:li:abc:(abc,)")