feat(sdk): autogenerate urn types (#9257)

This commit is contained in:
Harshal Sheth 2023-11-30 18:11:36 -05:00 committed by GitHub
parent ae1169d6d5
commit a7dc9c9d22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
48 changed files with 857 additions and 865 deletions

View File

@ -0,0 +1,7 @@
URNs
======
.. automodule:: datahub.metadata.urns
:exclude-members: LI_DOMAIN, URN_PREFIX, url_encode, validate, get_type, get_entity_id, get_entity_id_as_string, get_domain, underlying_key_aspect_type
:member-order: alphabetical
:inherited-members:

View File

@ -3,6 +3,10 @@
# For the full list of built-in configuration values, see the documentation:
# https://www.sphinx-doc.org/en/master/usage/configuration.html
# See https://stackoverflow.com/a/65147676
import builtins
builtins.__sphinx_build__ = True
# -- Project information -----------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information

View File

@ -14,6 +14,7 @@ Welcome to DataHub Python SDK's documentation!
apidocs/builder
apidocs/clients
apidocs/models
apidocs/urns
Indices and tables

View File

@ -1,4 +1,4 @@
-e ../../metadata-ingestion[datahub-rest,sql-parsing]
-e ../../metadata-ingestion[datahub-rest,sql-parser]
beautifulsoup4==4.11.2
Sphinx==6.1.3
sphinx-click==4.4.0

View File

@ -9,6 +9,9 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- Updating MySQL version for quickstarts to 8.2, may cause quickstart issues for existing instances.
- #9244: The `redshift-legacy` and `redshift-legacy-usage` sources, which have been deprecated for >6 months, have been removed. The new `redshift` source is a superset of the functionality provided by those legacy sources.
- `database_alias` config is no longer supported in SQL sources namely - Redshift, MySQL, Oracle, Postgres, Trino, Presto-on-hive. The config will automatically be ignored if it's present in your recipe. It has been deprecated since v0.9.6.
- #9257: The Python SDK urn types are now autogenerated. The new classes are largely backwards compatible with the previous, manually written classes, but many older methods are now deprecated in favor of a more uniform interface. The only breaking change is that the signature for the director constructor e.g. `TagUrn("tag", ["tag_name"])` is no longer supported, and the simpler `TagUrn("tag_name")` should be used instead.
The canonical place to import the urn classes from is `datahub.metadata.urns.*`. Other import paths, like `datahub.utilities.urns.corpuser_urn.CorpuserUrn` are retained for backwards compatibility, but are considered deprecated.
### Potential Downtime
### Deprecations
@ -23,18 +26,19 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- #9044 - GraphQL APIs for adding ownership now expect either an `ownershipTypeUrn` referencing a customer ownership type or a (deprecated) `type`. Where before adding an ownership without a concrete type was allowed, this is no longer the case. For simplicity you can use the `type` parameter which will get translated to a custom ownership type internally if one exists for the type being added.
- #9010 - In Redshift source's config `incremental_lineage` is set default to off.
- #8810 - Removed support for SQLAlchemy 1.3.x. Only SQLAlchemy 1.4.x is supported now.
- #8942 - Removed `urn:li:corpuser:datahub` owner for the `Measure`, `Dimension` and `Temporal` tags emitted
- #8942 - Removed `urn:li:corpuser:datahub` owner for the `Measure`, `Dimension` and `Temporal` tags emitted
by Looker and LookML source connectors.
- #8853 - The Airflow plugin no longer supports Airflow 2.0.x or Python 3.7. See the docs for more details.
- #8853 - Introduced the Airflow plugin v2. If you're using Airflow 2.3+, the v2 plugin will be enabled by default, and so you'll need to switch your requirements to include `pip install 'acryl-datahub-airflow-plugin[plugin-v2]'`. To continue using the v1 plugin, set the `DATAHUB_AIRFLOW_PLUGIN_USE_V1_PLUGIN` environment variable to `true`.
- #8943 - The Unity Catalog ingestion source has a new option `include_metastore`, which will cause all urns to be changed when disabled.
This is currently enabled by default to preserve compatibility, but will be disabled by default and then removed in the future.
If stateful ingestion is enabled, simply setting `include_metastore: false` will perform all required cleanup.
Otherwise, we recommend soft deleting all databricks data via the DataHub CLI:
`datahub delete --platform databricks --soft` and then reingesting with `include_metastore: false`.
This is currently enabled by default to preserve compatibility, but will be disabled by default and then removed in the future.
If stateful ingestion is enabled, simply setting `include_metastore: false` will perform all required cleanup.
Otherwise, we recommend soft deleting all databricks data via the DataHub CLI:
`datahub delete --platform databricks --soft` and then reingesting with `include_metastore: false`.
- #8846 - Changed enum values in resource filters used by policies. `RESOURCE_TYPE` became `TYPE` and `RESOURCE_URN` became `URN`.
Any existing policies using these filters (i.e. defined for particular `urns` or `types` such as `dataset`) need to be upgraded
manually, for example by retrieving their respective `dataHubPolicyInfo` aspect and changing part using filter i.e.
Any existing policies using these filters (i.e. defined for particular `urns` or `types` such as `dataset`) need to be upgraded
manually, for example by retrieving their respective `dataHubPolicyInfo` aspect and changing part using filter i.e.
```yaml
"resources": {
"filter": {
@ -49,7 +53,9 @@ manually, for example by retrieving their respective `dataHubPolicyInfo` aspect
]
}
```
into
```yaml
"resources": {
"filter": {
@ -64,22 +70,25 @@ into
]
}
```
for example, using `datahub put` command. Policies can be also removed and re-created via UI.
- #9077 - The BigQuery ingestion source by default sets `match_fully_qualified_names: true`.
This means that any `dataset_pattern` or `schema_pattern` specified will be matched on the fully
qualified dataset name, i.e. `<project_name>.<dataset_name>`. We attempt to support the old
pattern format by prepending `.*\\.` to dataset patterns lacking a period, so in most cases this
should not cause any issues. However, if you have a complex dataset pattern, we recommend you
manually convert it to the fully qualified format to avoid any potential issues.
This means that any `dataset_pattern` or `schema_pattern` specified will be matched on the fully
qualified dataset name, i.e. `<project_name>.<dataset_name>`. We attempt to support the old
pattern format by prepending `.*\\.` to dataset patterns lacking a period, so in most cases this
should not cause any issues. However, if you have a complex dataset pattern, we recommend you
manually convert it to the fully qualified format to avoid any potential issues.
- #9110 - The Unity Catalog source will now generate urns based on `env` properly. If you have
been setting `env` in your recipe to something besides `PROD`, we will now generate urns
with that new env variable, invalidating your existing urns.
been setting `env` in your recipe to something besides `PROD`, we will now generate urns
with that new env variable, invalidating your existing urns.
### Potential Downtime
### Deprecations
### Other Notable Changes
- Session token configuration has changed, all previously created session tokens will be invalid and users will be prompted to log in. Expiration time has also been shortened which may result in more login prompts with the default settings.
There should be no other interruption due to this change.
@ -88,13 +97,16 @@ with that new env variable, invalidating your existing urns.
### Breaking Changes
### Potential Downtime
- #8611 Search improvements requires reindexing indices. A `system-update` job will run which will set indices to read-only and create a backup/clone of each index. During the reindexing new components will be prevented from start-up until the reindex completes. The logs of this job will indicate a % complete per index. Depending on index sizes and infrastructure this process can take 5 minutes to hours however as a rough estimate 1 hour for every 2.3 million entities.
### Deprecations
- #8525: In LDAP ingestor, the `manager_pagination_enabled` changed to general `pagination_enabled`
- MAE Events are no longer produced. MAE events have been deprecated for over a year.
### Other Notable Changes
- In this release we now enable you to create and delete pinned announcements on your DataHub homepage! If you have the “Manage Home Page Posts” platform privilege youll see a new section in settings called “Home Page Posts” where you can create and delete text posts and link posts that your users see on the home page.
- The new search and browse experience, which was first made available in the previous release behind a feature flag, is now on by default. Check out our release notes for v0.10.5 to get more information and documentation on this new Browse experience.
- In addition to the ranking changes mentioned above, this release includes changes to the highlighting of search entities to understand why they match your query. You can also sort your results alphabetically or by last updated times, in addition to relevance. In this release, we suggest a correction if your query has a typo in it.
@ -121,12 +133,13 @@ with that new env variable, invalidating your existing urns.
This determines which Okta profile attribute is used for the corresponding DataHub user
and thus may change what DataHub users are generated by the Okta source. And in a follow up `okta_profile_to_username_regex` has been set to `.*` which taken together with previous change brings the defaults in line with OIDC.
- #8331: For all sql-based sources that support profiling, you can no longer specify
`profile_table_level_only` together with `include_field_xyz` config options to ingest
certain column-level metrics. Instead, set `profile_table_level_only` to `false` and
individually enable / disable desired field metrics.
`profile_table_level_only` together with `include_field_xyz` config options to ingest
certain column-level metrics. Instead, set `profile_table_level_only` to `false` and
individually enable / disable desired field metrics.
- #8451: The `bigquery-beta` and `snowflake-beta` source aliases have been dropped. Use `bigquery` and `snowflake` as the source type instead.
- #8472: Ingestion runs created with Pipeline.create will show up in the DataHub ingestion tab as CLI-based runs. To revert to the previous behavior of not showing these runs in DataHub, pass `no_default_report=True`.
- #8513: `snowflake` connector will use user's `email` attribute as is in urn. To revert to previous behavior disable `email_as_user_identifier` in recipe.
- #8513: `snowflake` connector will use user's `email` attribute as is in urn. To revert to previous behavior disable `email_as_user_identifier` in recipe.
### Potential Downtime
- BrowsePathsV2 upgrade will now be handled by the `system-update` job in non-blocking mode. This process generates data needed for the new search
@ -153,9 +166,11 @@ individually enable / disable desired field metrics.
### Potential Downtime
### Deprecations
- #8045: With the introduction of custom ownership types, the `Owner` aspect has been updated where the `type` field is deprecated in favor of a new field `typeUrn`. This latter field is an urn reference to the new OwnershipType entity. GraphQL endpoints have been updated to use the new field. For pre-existing ownership aspect records, DataHub now has logic to map the old field to the new field.
### Other notable Changes
- #8191: Updates GMS's health check endpoint to account for its dependency on external components. Notably, at this time, elasticsearch. This means that DataHub operators can now use GMS health status more reliably.
## 0.10.3
@ -170,6 +185,7 @@ individually enable / disable desired field metrics.
### Potential Downtime
### Deprecations
- The signature of `Source.get_workunits()` is changed from `Iterable[WorkUnit]` to the more restrictive `Iterable[MetadataWorkUnit]`.
- Legacy usage creation via the `UsageAggregation` aspect, `/usageStats?action=batchIngest` GMS endpoint, and `UsageStatsWorkUnit` metadata-ingestion class are all deprecated.

View File

@ -14,24 +14,12 @@ from datahub.metadata.schema_classes import (
EditableSchemaMetadataClass,
InstitutionalMemoryClass,
)
from datahub.utilities.urns.field_paths import get_simple_field_path_from_v2_field_path
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
def get_simple_field_path_from_v2_field_path(field_path: str) -> str:
"""A helper function to extract simple . path notation from the v2 field path"""
if not field_path.startswith("[version=2.0]"):
# not a v2, we assume this is a simple path
return field_path
# this is a v2 field path
tokens = [
t for t in field_path.split(".") if not (t.startswith("[") or t.endswith("]"))
]
return ".".join(tokens)
# Inputs -> owner, ownership_type, dataset
documentation_to_add = (
"Name of the user who was deleted. This description is updated via PythonSDK."

View File

@ -15,24 +15,12 @@ from datahub.metadata.schema_classes import (
GlobalTagsClass,
TagAssociationClass,
)
from datahub.utilities.urns.field_paths import get_simple_field_path_from_v2_field_path
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
def get_simple_field_path_from_v2_field_path(field_path: str) -> str:
"""A helper function to extract simple . path notation from the v2 field path"""
if not field_path.startswith("[version=2.0]"):
# not a v2, we assume this is a simple path
return field_path
# this is a v2 field path
tokens = [
t for t in field_path.split(".") if not (t.startswith("[") or t.endswith("]"))
]
return ".".join(tokens)
# Inputs -> the column, dataset and the tag to set
column = "user_name"
dataset_urn = make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")

View File

@ -15,24 +15,12 @@ from datahub.metadata.schema_classes import (
GlossaryTermAssociationClass,
GlossaryTermsClass,
)
from datahub.utilities.urns.field_paths import get_simple_field_path_from_v2_field_path
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
def get_simple_field_path_from_v2_field_path(field_path: str) -> str:
"""A helper function to extract simple . path notation from the v2 field path"""
if not field_path.startswith("[version=2.0]"):
# not a v2, we assume this is a simple path
return field_path
# this is a v2 field path
tokens = [
t for t in field_path.split(".") if not (t.startswith("[") or t.endswith("]"))
]
return ".".join(tokens)
# Inputs -> the column, dataset and the term to set
column = "address.zipcode"
dataset_urn = make_dataset_urn(platform="hive", name="realestate_db.sales", env="PROD")

View File

@ -5,7 +5,7 @@ from datahub.api.entities.corpgroup.corpgroup import (
CorpGroupGenerationConfig,
)
from datahub.ingestion.graph.client import DataHubGraph, DataHubGraphConfig
from datahub.utilities.urns.corpuser_urn import CorpuserUrn
from datahub.metadata.urns import CorpUserUrn
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
@ -13,10 +13,10 @@ logging.basicConfig(level=logging.INFO)
group_email = "foogroup@acryl.io"
group = CorpGroup(
id=group_email,
owners=[str(CorpuserUrn.create_from_id("datahub"))],
owners=[str(CorpUserUrn("datahub"))],
members=[
str(CorpuserUrn.create_from_id("bar@acryl.io")),
str(CorpuserUrn.create_from_id("joe@acryl.io")),
str(CorpUserUrn("bar@acryl.io")),
str(CorpUserUrn("joe@acryl.io")),
],
display_name="Foo Group",
email=group_email,

View File

@ -1,6 +1,8 @@
import collections
import copy
import json
import re
import textwrap
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple, Union
@ -115,11 +117,20 @@ def patch_schema(schema: dict, urn_arrays: Dict[str, List[Tuple[str, str]]]) ->
# Patch normal urn types.
field: avro.schema.Field
for field in nested.fields:
java_class: Optional[str] = field.props.get("java", {}).get("class")
field_props: dict = field.props # type: ignore
java_props: dict = field_props.get("java", {})
java_class: Optional[str] = java_props.get("class")
if java_class and java_class.startswith(
"com.linkedin.pegasus2avro.common.urn."
):
field.set_prop("Urn", java_class.split(".")[-1])
type = java_class.split(".")[-1]
entity_types = field_props.get("Relationship", {}).get(
"entityTypes", []
)
field.set_prop("Urn", type)
if entity_types:
field.set_prop("entityTypes", entity_types)
# Patch array urn types.
if nested.name in urn_arrays:
@ -130,7 +141,7 @@ def patch_schema(schema: dict, urn_arrays: Dict[str, List[Tuple[str, str]]]) ->
field.set_prop("Urn", type)
field.set_prop("urn_is_array", True)
return patched.to_json()
return patched.to_json() # type: ignore
def merge_schemas(schemas_obj: List[dict]) -> str:
@ -141,6 +152,7 @@ def merge_schemas(schemas_obj: List[dict]) -> str:
class NamesWithDups(avro.schema.Names):
def add_name(self, name_attr, space_attr, new_schema):
to_add = avro.schema.Name(name_attr, space_attr, self.default_namespace)
assert to_add.fullname
self.names[to_add.fullname] = new_schema
return to_add
@ -228,7 +240,6 @@ def make_load_schema_methods(schemas: Iterable[str]) -> str:
def save_raw_schemas(schema_save_dir: Path, schemas: Dict[str, dict]) -> None:
# Save raw avsc files.
schema_save_dir.mkdir()
for name, schema in schemas.items():
(schema_save_dir / f"{name}.avsc").write_text(json.dumps(schema, indent=2))
@ -333,6 +344,342 @@ KEY_ASPECTS: Dict[str, Type[_Aspect]] = {{
schema_class_file.write_text("\n".join(schema_classes_lines))
def write_urn_classes(key_aspects: List[dict], urn_dir: Path) -> None:
urn_dir.mkdir()
(urn_dir / "__init__.py").write_text("\n# This file is intentionally left empty.")
code = """
# This file contains classes corresponding to entity URNs.
from typing import ClassVar, List, Optional, Type, TYPE_CHECKING
import functools
from deprecated.sphinx import deprecated as _sphinx_deprecated
from datahub.utilities.urn_encoder import UrnEncoder
from datahub.utilities.urns._urn_base import _SpecificUrn, Urn
from datahub.utilities.urns.error import InvalidUrnError
deprecated = functools.partial(_sphinx_deprecated, version="0.12.0.2")
"""
for aspect in key_aspects:
entity_type = aspect["Aspect"]["keyForEntity"]
if aspect["Aspect"]["entityCategory"] == "internal":
continue
code += generate_urn_class(entity_type, aspect)
(urn_dir / "urn_defs.py").write_text(code)
def capitalize_entity_name(entity_name: str) -> str:
# Examples:
# corpuser -> CorpUser
# corpGroup -> CorpGroup
# mlModelDeployment -> MlModelDeployment
if entity_name == "corpuser":
return "CorpUser"
return f"{entity_name[0].upper()}{entity_name[1:]}"
def python_type(avro_type: str) -> str:
if avro_type == "string":
return "str"
elif (
isinstance(avro_type, dict)
and avro_type.get("type") == "enum"
and avro_type.get("name") == "FabricType"
):
# TODO: make this stricter using an enum
return "str"
raise ValueError(f"unknown type {avro_type}")
def field_type(field: dict) -> str:
return python_type(field["type"])
def field_name(field: dict) -> str:
manual_mapping = {
"origin": "env",
"platformName": "platform_name",
}
name: str = field["name"]
if name in manual_mapping:
return manual_mapping[name]
# If the name is mixed case, convert to snake case.
if name.lower() != name:
# Inject an underscore before each capital letter, and then convert to lowercase.
return re.sub(r"(?<!^)(?=[A-Z])", "_", name).lower()
return name
_create_from_id = """
@classmethod
@deprecated(reason="Use the constructor instead")
def create_from_id(cls, id: str) -> "{class_name}":
return cls(id)
"""
_extra_urn_methods: Dict[str, List[str]] = {
"corpGroup": [_create_from_id.format(class_name="CorpGroupUrn")],
"corpuser": [_create_from_id.format(class_name="CorpUserUrn")],
"dataFlow": [
"""
@classmethod
def create_from_ids(
cls,
orchestrator: str,
flow_id: str,
env: str,
platform_instance: Optional[str] = None,
) -> "DataFlowUrn":
return cls(
orchestrator=orchestrator,
flow_id=f"{platform_instance}.{flow_id}" if platform_instance else flow_id,
cluster=env,
)
@deprecated(reason="Use .orchestrator instead")
def get_orchestrator_name(self) -> str:
return self.orchestrator
@deprecated(reason="Use .flow_id instead")
def get_flow_id(self) -> str:
return self.flow_id
@deprecated(reason="Use .cluster instead")
def get_env(self) -> str:
return self.cluster
""",
],
"dataJob": [
"""
@classmethod
def create_from_ids(cls, data_flow_urn: str, job_id: str) -> "DataJobUrn":
return cls(data_flow_urn, job_id)
def get_data_flow_urn(self) -> "DataFlowUrn":
return DataFlowUrn.from_string(self.flow)
@deprecated(reason="Use .job_id instead")
def get_job_id(self) -> str:
return self.job_id
"""
],
"dataPlatform": [_create_from_id.format(class_name="DataPlatformUrn")],
"dataProcessInstance": [
_create_from_id.format(class_name="DataProcessInstanceUrn"),
"""
@deprecated(reason="Use .id instead")
def get_dataprocessinstance_id(self) -> str:
return self.id
""",
],
"dataset": [
"""
@classmethod
def create_from_ids(
cls,
platform_id: str,
table_name: str,
env: str,
platform_instance: Optional[str] = None,
) -> "DatasetUrn":
return DatasetUrn(
platform=platform_id,
name=f"{platform_instance}.{table_name}" if platform_instance else table_name,
env=env,
)
from datahub.utilities.urns.field_paths import get_simple_field_path_from_v2_field_path as _get_simple_field_path_from_v2_field_path
get_simple_field_path_from_v2_field_path = staticmethod(deprecated(reason='Use the function from the field_paths module instead')(_get_simple_field_path_from_v2_field_path))
def get_data_platform_urn(self) -> "DataPlatformUrn":
return DataPlatformUrn.from_string(self.platform)
@deprecated(reason="Use .name instead")
def get_dataset_name(self) -> str:
return self.name
@deprecated(reason="Use .env instead")
def get_env(self) -> str:
return self.env
"""
],
"domain": [_create_from_id.format(class_name="DomainUrn")],
"notebook": [
"""
@deprecated(reason="Use .notebook_tool instead")
def get_platform_id(self) -> str:
return self.notebook_tool
@deprecated(reason="Use .notebook_id instead")
def get_notebook_id(self) -> str:
return self.notebook_id
"""
],
"tag": [_create_from_id.format(class_name="TagUrn")],
}
def generate_urn_class(entity_type: str, key_aspect: dict) -> str:
"""Generate a class definition for this entity.
The class definition has the following structure:
- A class attribute ENTITY_TYPE, which is the entity type string.
- A class attribute URN_PARTS, which is the number of parts in the URN.
- A constructor that takes the URN parts as arguments. The field names
will match the key aspect's field names. It will also have a _allow_coercion
flag, which will allow for some normalization (e.g. upper case env).
Then, each part will be validated (including nested calls for urn subparts).
- Utilities for converting to/from the key aspect.
- Any additional methods that are required for this entity type, defined above.
These are primarily for backwards compatibility.
- Getter methods for each field.
"""
class_name = f"{capitalize_entity_name(entity_type)}Urn"
fields = copy.deepcopy(key_aspect["fields"])
if entity_type == "container":
# The annotations say guid is optional, but it is required.
# This is a quick fix of the annotations.
assert field_name(fields[0]) == "guid"
assert fields[0]["type"] == ["null", "string"]
fields[0]["type"] = "string"
_init_arg_parts: List[str] = []
for field in fields:
default = '"PROD"' if field_name(field) == "env" else None
_arg_part = f"{field_name(field)}: {field_type(field)}"
if default:
_arg_part += f" = {default}"
_init_arg_parts.append(_arg_part)
init_args = ", ".join(_init_arg_parts)
super_init_args = ", ".join(field_name(field) for field in fields)
arg_count = len(fields)
parse_ids_mapping = ", ".join(
f"{field_name(field)}=entity_ids[{i}]" for i, field in enumerate(fields)
)
key_aspect_class = f"{key_aspect['name']}Class"
to_key_aspect_args = ", ".join(
# The LHS bypasses any field name aliases.
f"{field['name']}=self.{field_name(field)}"
for field in fields
)
from_key_aspect_args = ", ".join(
f"{field_name(field)}=key_aspect.{field['name']}" for field in fields
)
init_coercion = ""
init_validation = ""
for field in fields:
init_validation += f'if not {field_name(field)}:\n raise InvalidUrnError("{field_name(field)} cannot be empty")\n'
# Generalized mechanism for validating embedded urns.
field_urn_type_class = None
if field_name(field) == "platform":
field_urn_type_class = "DataPlatformUrn"
elif field.get("Urn"):
if len(field.get("entityTypes", [])) == 1:
field_entity_type = field["entityTypes"][0]
field_urn_type_class = f"{capitalize_entity_name(field_entity_type)}Urn"
else:
field_urn_type_class = "Urn"
if field_urn_type_class:
init_validation += f"{field_name(field)} = str({field_name(field)})\n"
init_validation += (
f"assert {field_urn_type_class}.from_string({field_name(field)})\n"
)
else:
init_validation += (
f"assert not UrnEncoder.contains_reserved_char({field_name(field)})\n"
)
if field_name(field) == "env":
init_coercion += "env = env.upper()\n"
# TODO add ALL_ENV_TYPES validation
elif entity_type == "dataPlatform" and field_name(field) == "platform_name":
init_coercion += 'if platform_name.startswith("urn:li:dataPlatform:"):\n'
init_coercion += " platform_name = DataPlatformUrn.from_string(platform_name).platform_name\n"
if field_name(field) == "platform":
init_coercion += "platform = DataPlatformUrn(platform).urn()\n"
elif field_urn_type_class is None:
# For all non-urns, run the value through the UrnEncoder.
init_coercion += (
f"{field_name(field)} = UrnEncoder.encode_string({field_name(field)})\n"
)
if not init_coercion:
init_coercion = "pass"
# TODO include the docs for each field
code = f"""
if TYPE_CHECKING:
from datahub.metadata.schema_classes import {key_aspect_class}
class {class_name}(_SpecificUrn):
ENTITY_TYPE: ClassVar[str] = "{entity_type}"
URN_PARTS: ClassVar[int] = {arg_count}
def __init__(self, {init_args}, *, _allow_coercion: bool = True) -> None:
if _allow_coercion:
# Field coercion logic (if any is required).
{textwrap.indent(init_coercion.strip(), prefix=" "*4*3)}
# Validation logic.
{textwrap.indent(init_validation.strip(), prefix=" "*4*2)}
super().__init__(self.ENTITY_TYPE, [{super_init_args}])
@classmethod
def _parse_ids(cls, entity_ids: List[str]) -> "{class_name}":
if len(entity_ids) != cls.URN_PARTS:
raise InvalidUrnError(f"{class_name} should have {{cls.URN_PARTS}} parts, got {{len(entity_ids)}}: {{entity_ids}}")
return cls({parse_ids_mapping}, _allow_coercion=False)
@classmethod
def underlying_key_aspect_type(cls) -> Type["{key_aspect_class}"]:
from datahub.metadata.schema_classes import {key_aspect_class}
return {key_aspect_class}
def to_key_aspect(self) -> "{key_aspect_class}":
from datahub.metadata.schema_classes import {key_aspect_class}
return {key_aspect_class}({to_key_aspect_args})
@classmethod
def from_key_aspect(cls, key_aspect: "{key_aspect_class}") -> "{class_name}":
return cls({from_key_aspect_args})
"""
for extra_method in _extra_urn_methods.get(entity_type, []):
code += textwrap.indent(extra_method, prefix=" " * 4)
for i, field in enumerate(fields):
code += f"""
@property
def {field_name(field)}(self) -> {field_type(field)}:
return self.entity_ids[{i}]
"""
return code
@click.command()
@click.argument(
"entity_registry", type=click.Path(exists=True, dir_okay=False), required=True
@ -367,6 +714,7 @@ def generate(
if schema.get("Aspect")
}
# Copy entity registry info into the corresponding key aspect.
for entity in entities:
# This implicitly requires that all keyAspects are resolvable.
aspect = aspects[entity.keyAspect]
@ -428,6 +776,7 @@ def generate(
import importlib
from typing import TYPE_CHECKING
from datahub.utilities.docs_build import IS_SPHINX_BUILD
from datahub.utilities._custom_package_loader import get_custom_models_package
_custom_package_path = get_custom_models_package()
@ -437,16 +786,64 @@ if TYPE_CHECKING or not _custom_package_path:
# Required explicitly because __all__ doesn't include _ prefixed names.
from ._schema_classes import _Aspect, __SCHEMA_TYPES
if IS_SPHINX_BUILD:
# Set __module__ to the current module so that Sphinx will document the
# classes as belonging to this module instead of the custom package.
for _cls in list(globals().values()):
if hasattr(_cls, "__module__") and "datahub.metadata._schema_classes" in _cls.__module__:
_cls.__module__ = __name__
else:
_custom_package = importlib.import_module(_custom_package_path)
globals().update(_custom_package.__dict__)
"""
)
(Path(outdir) / "urns.py").write_text(
"""
# This is a specialized shim layer that allows us to dynamically load custom URN types from elsewhere.
import importlib
from typing import TYPE_CHECKING
from datahub.utilities.docs_build import IS_SPHINX_BUILD
from datahub.utilities._custom_package_loader import get_custom_urns_package
from datahub.utilities.urns._urn_base import Urn # noqa: F401
_custom_package_path = get_custom_urns_package()
if TYPE_CHECKING or not _custom_package_path:
from ._urns.urn_defs import * # noqa: F401
if IS_SPHINX_BUILD:
# Set __module__ to the current module so that Sphinx will document the
# classes as belonging to this module instead of the custom package.
for _cls in list(globals().values()):
if hasattr(_cls, "__module__") and ("datahub.metadata._urns.urn_defs" in _cls.__module__ or _cls is Urn):
_cls.__module__ = __name__
else:
_custom_package = importlib.import_module(_custom_package_path)
globals().update(_custom_package.__dict__)
"""
)
# Generate URN classes.
urn_dir = Path(outdir) / "_urns"
write_urn_classes(
[aspect for aspect in aspects.values() if aspect["Aspect"].get("keyForEntity")],
urn_dir,
)
# Save raw schema files in codegen as well.
schema_save_dir = Path(outdir) / "schemas"
schema_save_dir.mkdir()
for schema_out_file, schema in schemas.items():
(schema_save_dir / f"{schema_out_file}.avsc").write_text(
json.dumps(schema, indent=2)
)
# Keep a copy of a few raw avsc files.
required_avsc_schemas = {"MetadataChangeEvent", "MetadataChangeProposal"}
schema_save_dir = Path(outdir) / "schemas"
save_raw_schemas(
schema_save_dir,
{

View File

@ -90,6 +90,7 @@ setup(
entry_points={{
"datahub.custom_packages": [
"models={python_package_name}.models.schema_classes",
"urns={python_package_name}.models._urns.urn_defs",
],
}},
)

View File

@ -75,7 +75,7 @@ class DataProcessInstance:
def __post_init__(self):
self.urn = DataProcessInstanceUrn.create_from_id(
dataprocessinstance_id=DataProcessInstanceKey(
id=DataProcessInstanceKey(
cluster=self.cluster,
orchestrator=self.orchestrator,
id=self.id,

View File

@ -45,6 +45,7 @@ from datahub.metadata.schema_classes import (
TagAssociationClass,
)
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.field_paths import get_simple_field_path_from_v2_field_path
from datahub.utilities.urns.urn import Urn, guess_entity_type
DATASET_ENTITY_TYPE = DatasetUrn.ENTITY_TYPE
@ -436,9 +437,7 @@ class CSVEnricherSource(Source):
field_match = False
for field_info in current_editable_schema_metadata.editableSchemaFieldInfo:
if (
DatasetUrn.get_simple_field_path_from_v2_field_path(
field_info.fieldPath
)
get_simple_field_path_from_v2_field_path(field_info.fieldPath)
== field_path
):
# we have some editable schema metadata for this field

View File

@ -113,7 +113,7 @@ def create_id(path: List[str], default_id: Optional[str], enable_auto_id: bool)
id_: str = ".".join(path)
if UrnEncoder.contains_reserved_char(id_):
if UrnEncoder.contains_extended_reserved_char(id_):
enable_auto_id = True
if enable_auto_id:

View File

@ -1086,9 +1086,7 @@ class TableauSource(StatefulIngestionSourceBase):
def is_snowflake_urn(self, urn: str) -> bool:
return (
DatasetUrn.create_from_string(urn)
.get_data_platform_urn()
.get_platform_name()
DatasetUrn.create_from_string(urn).get_data_platform_urn().platform_name
== "snowflake"
)

View File

@ -10,6 +10,7 @@ else:
_CUSTOM_PACKAGE_GROUP_KEY = "datahub.custom_packages"
_MODELS_KEY = "models"
_URNS_KEY = "urns"
class CustomPackageException(Exception):
@ -41,3 +42,7 @@ def _get_custom_package_for_name(name: str) -> Optional[str]:
def get_custom_models_package() -> Optional[str]:
return _get_custom_package_for_name(_MODELS_KEY)
def get_custom_urns_package() -> Optional[str]:
return _get_custom_package_for_name(_URNS_KEY)

View File

@ -0,0 +1,9 @@
from typing import TYPE_CHECKING
try:
# Via https://stackoverflow.com/a/65147676
if not TYPE_CHECKING and __sphinx_build__:
IS_SPHINX_BUILD = True
except NameError:
IS_SPHINX_BUILD = False

View File

@ -37,7 +37,7 @@ from datahub.metadata.schema_classes import (
TimeTypeClass,
)
from datahub.utilities.file_backed_collections import ConnectionWrapper, FileBackedDict
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.field_paths import get_simple_field_path_from_v2_field_path
logger = logging.getLogger(__name__)
@ -443,15 +443,14 @@ class SchemaResolver(Closeable):
cls, schema_metadata: SchemaMetadataClass
) -> SchemaInfo:
return {
DatasetUrn.get_simple_field_path_from_v2_field_path(col.fieldPath): (
get_simple_field_path_from_v2_field_path(col.fieldPath): (
# The actual types are more of a "nice to have".
col.nativeDataType
or "str"
)
for col in schema_metadata.fields
# TODO: We can't generate lineage to columns nested within structs yet.
if "."
not in DatasetUrn.get_simple_field_path_from_v2_field_path(col.fieldPath)
if "." not in get_simple_field_path_from_v2_field_path(col.fieldPath)
}
@classmethod
@ -459,17 +458,14 @@ class SchemaResolver(Closeable):
cls, schema: GraphQLSchemaMetadata
) -> SchemaInfo:
return {
DatasetUrn.get_simple_field_path_from_v2_field_path(field["fieldPath"]): (
get_simple_field_path_from_v2_field_path(field["fieldPath"]): (
# The actual types are more of a "nice to have".
field["nativeDataType"]
or "str"
)
for field in schema["fields"]
# TODO: We can't generate lineage to columns nested within structs yet.
if "."
not in DatasetUrn.get_simple_field_path_from_v2_field_path(
field["fieldPath"]
)
if "." not in get_simple_field_path_from_v2_field_path(field["fieldPath"])
}
def close(self) -> None:

View File

@ -23,4 +23,8 @@ class UrnEncoder:
@staticmethod
def contains_reserved_char(value: str) -> bool:
return bool(set(value).intersection(RESERVED_CHARS))
@staticmethod
def contains_extended_reserved_char(value: str) -> bool:
return bool(set(value).intersection(RESERVED_CHARS_EXTENDED))

View File

@ -0,0 +1,234 @@
import functools
import urllib.parse
from abc import abstractmethod
from typing import ClassVar, Dict, List, Optional, Type, TypeVar
from deprecated import deprecated
from datahub.utilities.urns.error import InvalidUrnError
URN_TYPES: Dict[str, Type["_SpecificUrn"]] = {}
def _split_entity_id(entity_id: str) -> List[str]:
if not (entity_id.startswith("(") and entity_id.endswith(")")):
return [entity_id]
parts = []
start_paren_count = 1
part_start = 1
for i in range(1, len(entity_id)):
c = entity_id[i]
if c == "(":
start_paren_count += 1
elif c == ")":
start_paren_count -= 1
if start_paren_count < 0:
raise InvalidUrnError(f"{entity_id}, mismatched paren nesting")
elif c == ",":
if start_paren_count != 1:
continue
if i - part_start <= 0:
raise InvalidUrnError(f"{entity_id}, empty part disallowed")
parts.append(entity_id[part_start:i])
part_start = i + 1
if start_paren_count != 0:
raise InvalidUrnError(f"{entity_id}, mismatched paren nesting")
parts.append(entity_id[part_start:-1])
return parts
_UrnSelf = TypeVar("_UrnSelf", bound="Urn")
@functools.total_ordering
class Urn:
"""
URNs are globally unique identifiers used to refer to entities.
It will be in format of urn:li:<type>:<id> or urn:li:<type>:(<id1>,<id2>,...)
A note on encoding: certain characters, particularly commas and parentheses, are
not allowed in string portions of the URN. However, these are allowed when the urn
has another urn embedded within it. The main URN class ignores this possibility,
and assumes that the user provides a valid URN string. However, the specific URN
classes, such as DatasetUrn, will automatically encode these characters using
url-encoding when the URN is created and _allow_coercion is enabled (the default).
However, all from_string methods will try to preserve the string as-is, and will
raise an error if the string is invalid.
"""
# retained for backwards compatibility
URN_PREFIX: ClassVar[str] = "urn"
LI_DOMAIN: ClassVar[str] = "li"
_entity_type: str
_entity_ids: List[str]
def __init__(self, entity_type: str, entity_id: List[str]) -> None:
self._entity_type = entity_type
self._entity_ids = entity_id
if not self._entity_ids:
raise InvalidUrnError("Empty entity id.")
for part in self._entity_ids:
if not part:
raise InvalidUrnError("Empty entity id.")
@property
def entity_type(self) -> str:
return self._entity_type
@property
def entity_ids(self) -> List[str]:
return self._entity_ids
@classmethod
def from_string(cls: Type[_UrnSelf], urn_str: str) -> "_UrnSelf":
"""
Creates an Urn from its string representation.
Args:
urn_str: The string representation of the Urn.
Returns:
Urn of the given string representation.
Raises:
InvalidUrnError: If the string representation is in invalid format.
"""
# TODO: Add handling for url encoded urns e.g. urn%3A ...
if not urn_str.startswith("urn:li:"):
raise InvalidUrnError(
f"Invalid urn string: {urn_str}. Urns should start with 'urn:li:'"
)
parts: List[str] = urn_str.split(":", maxsplit=3)
if len(parts) != 4:
raise InvalidUrnError(
f"Invalid urn string: {urn_str}. Expect 4 parts from urn string but found {len(parts)}"
)
if "" in parts:
raise InvalidUrnError(
f"Invalid urn string: {urn_str}. There should not be empty parts in urn string."
)
_urn, _li, entity_type, entity_ids_str = parts
entity_ids = _split_entity_id(entity_ids_str)
UrnCls: Optional[Type["_SpecificUrn"]] = URN_TYPES.get(entity_type)
if UrnCls:
if not issubclass(UrnCls, cls):
# We want to return a specific subtype of Urn. If we're called
# with Urn.from_string(), that's fine. However, if we're called as
# DatasetUrn.from_string('urn:li:corpuser:foo'), that should throw an error.
raise InvalidUrnError(
f"Passed an urn of type {entity_type} to the from_string method of {cls.__name__}. Use Urn.from_string() or {UrnCls.__name__}.from_string() instead."
)
return UrnCls._parse_ids(entity_ids) # type: ignore
# Fallback for unknown types.
if cls != Urn:
raise InvalidUrnError(
f"Unknown urn type {entity_type} for urn {urn_str} of type {cls}"
)
return cls(entity_type, entity_ids)
def urn(self) -> str:
"""Get the string representation of the urn."""
if len(self._entity_ids) == 1:
return f"urn:li:{self._entity_type}:{self._entity_ids[0]}"
return f"urn:li:{self._entity_type}:({','.join(self._entity_ids)})"
def __str__(self) -> str:
return self.urn()
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.urn()})"
def urn_url_encoded(self) -> str:
return Urn.url_encode(self.urn())
def __eq__(self, other: object) -> bool:
if not isinstance(other, Urn):
return False
return self.urn() == other.urn()
def __lt__(self, other: object) -> bool:
if not isinstance(other, Urn):
raise TypeError(
f"'<' not supported between instances of '{type(self)}' and '{type(other)}'"
)
return self.urn() < other.urn()
def __hash__(self) -> int:
return hash(self.urn())
@classmethod
@deprecated(reason="prefer .from_string")
def create_from_string(cls: Type[_UrnSelf], urn_str: str) -> "_UrnSelf":
return cls.from_string(urn_str)
@deprecated(reason="prefer .entity_ids")
def get_entity_id(self) -> List[str]:
return self._entity_ids
@deprecated(reason="prefer .entity_type")
def get_type(self) -> str:
return self._entity_type
@deprecated(reason="no longer needed")
def get_domain(self) -> str:
return "li"
@deprecated(reason="no longer needed")
def get_entity_id_as_string(self) -> str:
urn = self.urn()
prefix = "urn:li:"
assert urn.startswith(prefix)
id_with_type = urn[len(prefix) :]
return id_with_type.split(":", maxsplit=1)[1]
@classmethod
@deprecated(reason="no longer needed")
def validate(cls, urn_str: str) -> None:
Urn.create_from_string(urn_str)
@staticmethod
def url_encode(urn: str) -> str:
# safe='' encodes '/' as '%2F'
return urllib.parse.quote(urn, safe="")
class _SpecificUrn(Urn):
ENTITY_TYPE: str = ""
def __init_subclass__(cls) -> None:
# Validate the subclass.
entity_type = cls.ENTITY_TYPE
if not entity_type:
raise ValueError(f'_SpecificUrn subclass {cls} must define "ENTITY_TYPE"')
# Register the urn type.
if entity_type in URN_TYPES:
raise ValueError(f"duplicate urn type registered: {entity_type}")
URN_TYPES[entity_type] = cls
return super().__init_subclass__()
@classmethod
def underlying_key_aspect_type(cls) -> Type:
raise NotImplementedError()
@classmethod
@abstractmethod
def _parse_ids(cls: Type[_UrnSelf], entity_ids: List[str]) -> _UrnSelf:
raise NotImplementedError()

View File

@ -1,40 +1 @@
from typing import List
from datahub.utilities.urns.error import InvalidUrnError
from datahub.utilities.urns.urn import Urn
class CorpGroupUrn(Urn):
"""
expected corp group urn format: urn:li:corpGroup:<group_id>. example: "urn:li:corpGroup:data"
"""
ENTITY_TYPE: str = "corpGroup"
def __init__(
self, entity_type: str, entity_id: List[str], domain: str = Urn.LI_DOMAIN
):
super().__init__(entity_type, entity_id, domain)
@classmethod
def create_from_string(cls, urn_str: str) -> "CorpGroupUrn":
urn: Urn = super().create_from_string(urn_str)
return cls(urn.get_type(), urn.get_entity_id(), urn.get_domain())
@classmethod
def create_from_id(cls, group_id: str) -> "CorpGroupUrn":
return cls(CorpGroupUrn.ENTITY_TYPE, [group_id])
@staticmethod
def _validate_entity_type(entity_type: str) -> None:
if entity_type != CorpGroupUrn.ENTITY_TYPE:
raise InvalidUrnError(
f"Entity type should be {CorpGroupUrn.ENTITY_TYPE} but found {entity_type}"
)
@staticmethod
def _validate_entity_id(entity_id: List[str]) -> None:
if len(entity_id) != 1:
raise InvalidUrnError(
f"Expect 1 part in entity id, but found{len(entity_id)}"
)
from datahub.metadata.urns import CorpGroupUrn # noqa: F401

View File

@ -1,40 +1 @@
from typing import List
from datahub.utilities.urns.error import InvalidUrnError
from datahub.utilities.urns.urn import Urn
class CorpuserUrn(Urn):
"""
expected corp user urn format: urn:li:corpuser:<corpuser_id>. example: "urn:li:corpuser:tom"
"""
ENTITY_TYPE: str = "corpuser"
def __init__(
self, entity_type: str, entity_id: List[str], domain: str = Urn.LI_DOMAIN
):
super().__init__(entity_type, entity_id, domain)
@classmethod
def create_from_string(cls, urn_str: str) -> "CorpuserUrn":
urn: Urn = super().create_from_string(urn_str)
return cls(urn.get_type(), urn.get_entity_id(), urn.get_domain())
@classmethod
def create_from_id(cls, user_id: str) -> "CorpuserUrn":
return cls(CorpuserUrn.ENTITY_TYPE, [user_id])
@staticmethod
def _validate_entity_type(entity_type: str) -> None:
if entity_type != CorpuserUrn.ENTITY_TYPE:
raise InvalidUrnError(
f"Entity type should be {CorpuserUrn.ENTITY_TYPE} but found {entity_type}"
)
@staticmethod
def _validate_entity_id(entity_id: List[str]) -> None:
if len(entity_id) != 1:
raise InvalidUrnError(
f"Expect 1 part in entity id, but found{len(entity_id)}"
)
from datahub.metadata.urns import CorpUserUrn as CorpuserUrn # noqa: F401

View File

@ -1,88 +1 @@
from typing import List, Optional
from datahub.configuration.source_common import ALL_ENV_TYPES
from datahub.utilities.urns.error import InvalidUrnError
from datahub.utilities.urns.urn import Urn
class DataFlowUrn(Urn):
"""
expected data flow urn format: urn:li:dataFlow:(<orchestrator>,<flow_id>,<env>). example:
urn:li:dataFlow:(airflow,ingest_user,prod)
"""
ENTITY_TYPE: str = "dataFlow"
def __init__(
self, entity_type: str, entity_id: List[str], domain: str = Urn.LI_DOMAIN
):
super().__init__(entity_type, entity_id, domain)
@classmethod
def create_from_string(cls, urn_str: str) -> "DataFlowUrn":
"""
Create a DataFlowUrn from the its string representation
:param urn_str: the string representation of the DataFlowUrn
:return: DataFlowUrn of the given string representation
:raises InvalidUrnError is the string representation is in invalid format
"""
urn: Urn = super().create_from_string(urn_str)
return cls(urn.get_type(), urn.get_entity_id(), urn.get_domain())
def get_orchestrator_name(self) -> str:
"""
:return: the orchestrator name for the Dataflow
"""
return self.get_entity_id()[0]
def get_flow_id(self) -> str:
"""
:return: the data flow id from this DataFlowUrn
"""
return self.get_entity_id()[1]
def get_env(self) -> str:
"""
:return: the environment where the DataFlow is run
"""
return self.get_entity_id()[2]
@classmethod
def create_from_ids(
cls,
orchestrator: str,
flow_id: str,
env: str,
platform_instance: Optional[str] = None,
) -> "DataFlowUrn":
entity_id: List[str]
if platform_instance:
entity_id = [
orchestrator,
f"{platform_instance}.{flow_id}",
env,
]
else:
entity_id = [orchestrator, flow_id, env]
return cls(DataFlowUrn.ENTITY_TYPE, entity_id)
@staticmethod
def _validate_entity_type(entity_type: str) -> None:
if entity_type != DataFlowUrn.ENTITY_TYPE:
raise InvalidUrnError(
f"Entity type should be {DataFlowUrn.ENTITY_TYPE} but found {entity_type}"
)
@staticmethod
def _validate_entity_id(entity_id: List[str]) -> None:
# expected entity id format (<platform_urn>,<table_name>,<env>)
if len(entity_id) != 3:
raise InvalidUrnError(
f"Expect 3 parts in the entity id but found {entity_id}"
)
env = entity_id[2].upper()
if env not in ALL_ENV_TYPES:
raise InvalidUrnError(
f"Invalid env:{env}. Allowed envs are {ALL_ENV_TYPES}"
)
from datahub.metadata.urns import DataFlowUrn # noqa: F401

View File

@ -1,51 +1 @@
from typing import List
from datahub.utilities.urns.data_flow_urn import DataFlowUrn
from datahub.utilities.urns.error import InvalidUrnError
from datahub.utilities.urns.urn import Urn
class DataJobUrn(Urn):
"""
expected Data job urn format: urn:li:dataJob:(<data_flow_urn>,<job_id>). example:
"urn:li:dataJob:(urn:li:dataFlow:(airflow,sample_flow,prod),sample_job)"
"""
ENTITY_TYPE: str = "dataJob"
def __init__(
self, entity_type: str, entity_id: List[str], domain: str = Urn.LI_DOMAIN
):
super().__init__(entity_type, entity_id, domain)
def get_data_flow_urn(self) -> DataFlowUrn:
return DataFlowUrn.create_from_string(self.get_entity_id()[0])
def get_job_id(self) -> str:
return self.get_entity_id()[1]
@classmethod
def create_from_string(cls, urn_str: str) -> "DataJobUrn":
urn: Urn = super().create_from_string(urn_str)
return cls(urn.get_type(), urn.get_entity_id(), urn.get_domain())
@classmethod
def create_from_ids(cls, data_flow_urn: str, job_id: str) -> "DataJobUrn":
return cls(DataJobUrn.ENTITY_TYPE, [data_flow_urn, job_id])
@staticmethod
def _validate_entity_type(entity_type: str) -> None:
if entity_type != DataJobUrn.ENTITY_TYPE:
raise InvalidUrnError(
f"Entity type should be {DataJobUrn.ENTITY_TYPE} but found {entity_type}"
)
@staticmethod
def _validate_entity_id(entity_id: List[str]) -> None:
if len(entity_id) != 2:
raise InvalidUrnError(
f"Expect 2 part in entity id, but found{len(entity_id)}"
)
data_flow_urn_str = entity_id[0]
DataFlowUrn.validate(data_flow_urn_str)
from datahub.metadata.urns import DataJobUrn # noqa: F401

View File

@ -1,34 +1 @@
from typing import List
from datahub.utilities.urns.error import InvalidUrnError
from datahub.utilities.urns.urn import Urn
class DataPlatformUrn(Urn):
"""
expected dataset urn format: urn:li:dataPlatform:<platform_name>. example: "urn:li:dataPlatform:hive"
"""
ENTITY_TYPE: str = "dataPlatform"
def __init__(self, entity_type: str, entity_id: List[str], domain: str = "li"):
super().__init__(entity_type, entity_id, domain)
@classmethod
def create_from_string(cls, urn_str: str) -> "DataPlatformUrn":
urn: Urn = super().create_from_string(urn_str)
return cls(urn.get_type(), urn.get_entity_id(), urn.get_domain())
@classmethod
def create_from_id(cls, platform_id: str) -> "DataPlatformUrn":
return cls(DataPlatformUrn.ENTITY_TYPE, [platform_id])
@staticmethod
def _validate_entity_type(entity_type: str) -> None:
if entity_type != DataPlatformUrn.ENTITY_TYPE:
raise InvalidUrnError(
f"Entity type should be {DataPlatformUrn.ENTITY_TYPE} but found {entity_type}"
)
def get_platform_name(self) -> str:
return self.get_entity_id()[0]
from datahub.metadata.urns import DataPlatformUrn # noqa: F401

View File

@ -1,46 +1 @@
from typing import List
from datahub.utilities.urns.error import InvalidUrnError
from datahub.utilities.urns.urn import Urn
class DataProcessInstanceUrn(Urn):
"""
expected domain urn format: urn:li:dataProcessInstance:<dataprocessinstance_key>
"""
ENTITY_TYPE: str = "dataProcessInstance"
def __init__(
self, entity_type: str, entity_id: List[str], domain_id: str = Urn.LI_DOMAIN
):
super().__init__(entity_type, entity_id, domain_id)
@classmethod
def create_from_string(cls, urn_str: str) -> "DataProcessInstanceUrn":
urn: Urn = super().create_from_string(urn_str)
return cls(urn.get_type(), urn.get_entity_id(), urn.get_domain())
@classmethod
def create_from_id(cls, dataprocessinstance_id: str) -> "DataProcessInstanceUrn":
return cls(DataProcessInstanceUrn.ENTITY_TYPE, [dataprocessinstance_id])
def get_dataprocessinstance_id(self) -> str:
"""
:return: the dataprocess instance id from this DatasetUrn
"""
return self.get_entity_id()[0]
@staticmethod
def _validate_entity_type(entity_type: str) -> None:
if entity_type != DataProcessInstanceUrn.ENTITY_TYPE:
raise InvalidUrnError(
f"Entity type should be {DataProcessInstanceUrn.ENTITY_TYPE} but found {entity_type}"
)
@staticmethod
def _validate_entity_id(entity_id: List[str]) -> None:
if len(entity_id) != 1:
raise InvalidUrnError(
f"Expect 1 part in entity id, but found{len(entity_id)}"
)
from datahub.metadata.urns import DataProcessInstanceUrn # noqa: F401

View File

@ -1,112 +1 @@
from typing import List, Optional
from datahub.configuration.source_common import ALL_ENV_TYPES
from datahub.utilities.urn_encoder import UrnEncoder
from datahub.utilities.urns.data_platform_urn import DataPlatformUrn
from datahub.utilities.urns.error import InvalidUrnError
from datahub.utilities.urns.urn import Urn
class DatasetUrn(Urn):
"""
expected dataset urn format: urn:li:dataset:(<platform_urn_str>,<table_name>,env). example:
urn:li:dataset:(urn:li:dataPlatform:hive,member,prod)
"""
ENTITY_TYPE: str = "dataset"
def __init__(self, entity_type: str, entity_id: List[str], domain: str = "li"):
super().__init__(entity_type, UrnEncoder.encode_string_array(entity_id), domain)
@classmethod
def create_from_string(cls, urn_str: str) -> "DatasetUrn":
"""
Create a DatasetUrn from the its string representation
:param urn_str: the string representation of the DatasetUrn
:return: DatasetUrn of the given string representation
:raises InvalidUrnError is the string representation is in invalid format
"""
urn: Urn = super().create_from_string(urn_str)
return cls(urn.get_type(), urn.get_entity_id(), urn.get_domain())
def get_data_platform_urn(self) -> DataPlatformUrn:
"""
:return: the DataPlatformUrn of where the Dataset is created
"""
return DataPlatformUrn.create_from_string(self.get_entity_id()[0])
def get_dataset_name(self) -> str:
"""
:return: the dataset name from this DatasetUrn
"""
return self.get_entity_id()[1]
def get_env(self) -> str:
"""
:return: the environment where the Dataset is created
"""
return self.get_entity_id()[2]
@classmethod
def create_from_ids(
cls,
platform_id: str,
table_name: str,
env: str,
platform_instance: Optional[str] = None,
) -> "DatasetUrn":
entity_id: List[str]
if platform_instance:
entity_id = [
str(DataPlatformUrn.create_from_id(platform_id)),
f"{platform_instance}.{table_name}",
env,
]
else:
entity_id = [
str(DataPlatformUrn.create_from_id(platform_id)),
table_name,
env,
]
return cls(DatasetUrn.ENTITY_TYPE, entity_id)
@staticmethod
def _validate_entity_type(entity_type: str) -> None:
if entity_type != DatasetUrn.ENTITY_TYPE:
raise InvalidUrnError(
f"Entity type should be {DatasetUrn.ENTITY_TYPE} but found {entity_type}"
)
@staticmethod
def _validate_entity_id(entity_id: List[str]) -> None:
# expected entity id format (<platform_urn>,<table_name>,<env>)
if len(entity_id) != 3:
raise InvalidUrnError(
f"Expect 3 parts in the entity id but found {entity_id}"
)
platform_urn_str = entity_id[0]
DataPlatformUrn.validate(platform_urn_str)
env = entity_id[2].upper()
if env not in ALL_ENV_TYPES:
raise InvalidUrnError(
f"Invalid env:{env}. Allowed envs are {ALL_ENV_TYPES}"
)
"""A helper function to extract simple . path notation from the v2 field path"""
@staticmethod
def get_simple_field_path_from_v2_field_path(field_path: str) -> str:
if field_path.startswith("[version=2.0]"):
# this is a v2 field path
tokens = [
t
for t in field_path.split(".")
if not (t.startswith("[") or t.endswith("]"))
]
path = ".".join(tokens)
return path
else:
# not a v2, we assume this is a simple path
return field_path
from datahub.metadata.urns import DatasetUrn # noqa: F401

View File

@ -1,40 +1 @@
from typing import List
from datahub.utilities.urns.error import InvalidUrnError
from datahub.utilities.urns.urn import Urn
class DomainUrn(Urn):
"""
expected domain urn format: urn:li:domain:<domain_id>. example: "urn:li:domain:product"
"""
ENTITY_TYPE: str = "domain"
def __init__(
self, entity_type: str, entity_id: List[str], domain: str = Urn.LI_DOMAIN
):
super().__init__(entity_type, entity_id, domain)
@classmethod
def create_from_string(cls, urn_str: str) -> "DomainUrn":
urn: Urn = super().create_from_string(urn_str)
return cls(urn.get_type(), urn.get_entity_id(), urn.get_domain())
@classmethod
def create_from_id(cls, domain_id: str) -> "DomainUrn":
return cls(DomainUrn.ENTITY_TYPE, [domain_id])
@staticmethod
def _validate_entity_type(entity_type: str) -> None:
if entity_type != DomainUrn.ENTITY_TYPE:
raise InvalidUrnError(
f"Entity type should be {DomainUrn.ENTITY_TYPE} but found {entity_type}"
)
@staticmethod
def _validate_entity_id(entity_id: List[str]) -> None:
if len(entity_id) != 1:
raise InvalidUrnError(
f"Expect 1 part in entity id, but found{len(entity_id)}"
)
from datahub.metadata.urns import DomainUrn # noqa: F401

View File

@ -1,3 +1,2 @@
class InvalidUrnError(Exception):
def __init__(self, msg: str):
super().__init__(msg)
pass

View File

@ -0,0 +1,15 @@
def get_simple_field_path_from_v2_field_path(field_path: str) -> str:
"""A helper function to extract simple . path notation from the v2 field path"""
if field_path.startswith("[version=2.0]"):
# this is a v2 field path
tokens = [
t
for t in field_path.split(".")
if not (t.startswith("[") or t.endswith("]"))
]
path = ".".join(tokens)
return path
else:
# not a v2, we assume this is a simple path
return field_path

View File

@ -1,46 +1 @@
from typing import List
from datahub.utilities.urns.error import InvalidUrnError
from datahub.utilities.urns.urn import Urn
class NotebookUrn(Urn):
"""
expected dataset urn format: urn:li:notebook:(<platform_name>,<notebook_id>). example: "urn:li:notebook:(querybook,1234)"
"""
ENTITY_TYPE: str = "notebook"
def __init__(
self, entity_type: str, entity_id: List[str], domain: str = Urn.LI_DOMAIN
):
super().__init__(entity_type, entity_id, domain)
@classmethod
def create_from_string(cls, urn_str: str) -> "NotebookUrn":
urn: Urn = super().create_from_string(urn_str)
return cls(urn.get_type(), urn.get_entity_id(), urn.get_domain())
@classmethod
def create_from_ids(cls, platform_id: str, notebook_id: str) -> "NotebookUrn":
return cls(NotebookUrn.ENTITY_TYPE, [platform_id, notebook_id])
@staticmethod
def _validate_entity_type(entity_type: str) -> None:
if entity_type != NotebookUrn.ENTITY_TYPE:
raise InvalidUrnError(
f"Entity type should be {NotebookUrn.ENTITY_TYPE} but found {entity_type}"
)
@staticmethod
def _validate_entity_id(entity_id: List[str]) -> None:
if len(entity_id) != 2:
raise InvalidUrnError(
f"Expect 2 parts in entity id, but found{len(entity_id)}"
)
def get_platform_id(self) -> str:
return self.get_entity_id()[0]
def get_notebook_id(self) -> str:
return self.get_entity_id()[1]
from datahub.metadata.urns import NotebookUrn # noqa: F401

View File

@ -1,40 +1 @@
from typing import List
from datahub.utilities.urns.error import InvalidUrnError
from datahub.utilities.urns.urn import Urn
class TagUrn(Urn):
"""
expected tag urn format: urn:li:tag:<tag_id>. example: "urn:li:tag:product"
"""
ENTITY_TYPE: str = "tag"
def __init__(
self, entity_type: str, entity_id: List[str], tag: str = Urn.LI_DOMAIN
):
super().__init__(entity_type, entity_id, tag)
@classmethod
def create_from_string(cls, urn_str: str) -> "TagUrn":
urn: Urn = super().create_from_string(urn_str)
return cls(urn.get_type(), urn.get_entity_id(), urn.get_domain())
@classmethod
def create_from_id(cls, tag_id: str) -> "TagUrn":
return cls(TagUrn.ENTITY_TYPE, [tag_id])
@staticmethod
def _validate_entity_type(entity_type: str) -> None:
if entity_type != TagUrn.ENTITY_TYPE:
raise InvalidUrnError(
f"Entity type should be {TagUrn.ENTITY_TYPE} but found {entity_type}"
)
@staticmethod
def _validate_entity_id(entity_id: List[str]) -> None:
if len(entity_id) != 1:
raise InvalidUrnError(
f"Expect 1 part in entity id, but found{len(entity_id)}"
)
from datahub.metadata.urns import TagUrn # noqa: F401

View File

@ -1,167 +1,6 @@
import urllib.parse
from typing import List
from datahub.utilities.urns.error import InvalidUrnError
from datahub.metadata.urns import Urn # noqa: F401
def guess_entity_type(urn: str) -> str:
assert urn.startswith("urn:li:"), "urns must start with urn:li:"
return urn.split(":")[2]
class Urn:
"""
URNs are Globally Unique Identifiers (GUID) used to represent an entity.
It will be in format of urn:<domain>:<type>:<id>
"""
URN_PREFIX: str = "urn"
# all the Datahub urn use li domain for now.
LI_DOMAIN: str = "li"
_entity_type: str
_domain: str
_entity_id: List[str]
def __init__(
self, entity_type: str, entity_id: List[str], urn_domain: str = LI_DOMAIN
):
if not entity_id:
raise InvalidUrnError("Empty entity id.")
self._validate_entity_type(entity_type)
self._validate_entity_id(entity_id)
self._entity_type = entity_type
self._domain = urn_domain
self._entity_id = entity_id
@classmethod
def create_from_string(cls, urn_str: str) -> "Urn":
"""
Create a Urn from the its string representation
:param urn_str: the string representation of the Urn
:return: Urn of the given string representation
:raises InvalidUrnError if the string representation is in invalid format
"""
# expect urn string in format of urn:<domain>:<type>:<id>
cls.validate(urn_str)
parts: List[str] = urn_str.split(":", 3)
return cls(parts[2], cls._get_entity_id_from_str(parts[3]), parts[1])
@classmethod
def validate(cls, urn_str: str) -> None:
"""
Validate if a string is in valid Urn format
:param urn_str: to be validated urn string
:raises InvalidUrnError if the string representation is in invalid format
"""
parts: List[str] = urn_str.split(":", 3)
if len(parts) != 4:
raise InvalidUrnError(
f"Invalid urn string: {urn_str}. Expect 4 parts from urn string but found {len(parts)}"
)
if "" in parts:
raise InvalidUrnError(
f"Invalid urn string: {urn_str}. There should not be empty parts in urn string."
)
if parts[0] != Urn.URN_PREFIX:
raise InvalidUrnError(
f'Invalid urn string: {urn_str}. Expect urn starting with "urn" but found {parts[0]}'
)
if "" in cls._get_entity_id_from_str(parts[3]):
raise InvalidUrnError(
f"Invalid entity id in urn string: {urn_str}. There should not be empty parts in entity id."
)
cls._validate_entity_type(parts[2])
cls._validate_entity_id(cls._get_entity_id_from_str(parts[3]))
@staticmethod
def url_encode(urn: str) -> str:
# safe='' encodes '/' as '%2F'
return urllib.parse.quote(urn, safe="")
def get_type(self) -> str:
return self._entity_type
def get_entity_id(self) -> List[str]:
return self._entity_id
def get_entity_id_as_string(self) -> str:
"""
:return: string representation of the entity ids. If there are more than one part in the entity id part, it will
return in this format (<part1>,<part2>,...)
"""
return self._entity_id_to_string()
def get_domain(self) -> str:
return self._domain
@staticmethod
def _get_entity_id_from_str(entity_id: str) -> List[str]:
if not (entity_id.startswith("(") and entity_id.endswith(")")):
return [entity_id]
parts = []
start_paren_count = 1
part_start = 1
for i in range(1, len(entity_id)):
c = entity_id[i]
if c == "(":
start_paren_count += 1
elif c == ")":
start_paren_count -= 1
if start_paren_count < 0:
raise InvalidUrnError(f"{entity_id}, mismatched paren nesting")
elif c == ",":
if start_paren_count != 1:
continue
if i - part_start <= 0:
raise InvalidUrnError(f"{entity_id}, empty part disallowed")
parts.append(entity_id[part_start:i])
part_start = i + 1
if start_paren_count != 0:
raise InvalidUrnError(f"{entity_id}, mismatched paren nesting")
parts.append(entity_id[part_start:-1])
return parts
@staticmethod
def _validate_entity_type(entity_type: str) -> None:
pass
@staticmethod
def _validate_entity_id(entity_id: List[str]) -> None:
pass
def __str__(self) -> str:
return f"{self.URN_PREFIX}:{self._domain}:{self._entity_type}:{self._entity_id_to_string()}"
def _entity_id_to_string(self) -> str:
if len(self._entity_id) == 1:
return self._entity_id[0]
result = ""
for part in self._entity_id:
result = result + str(part) + ","
return f"({result[:-1]})"
def __hash__(self) -> int:
return hash((self._domain, self._entity_type) + tuple(self._entity_id))
def __eq__(self, other: object) -> bool:
return (
(
self._entity_id == other._entity_id
and self._domain == other._domain
and self._entity_type == other._entity_type
)
if isinstance(other, Urn)
else False
)

View File

@ -131,9 +131,11 @@ def _modify_at_path(
def _lowercase_dataset_urn(dataset_urn: str) -> str:
cur_urn = DatasetUrn.create_from_string(dataset_urn)
cur_urn._entity_id[1] = cur_urn._entity_id[1].lower()
return str(cur_urn)
cur_urn = DatasetUrn.from_string(dataset_urn)
new_urn = DatasetUrn(
platform=cur_urn.platform, name=cur_urn.name.lower(), env=cur_urn.env
)
return str(new_urn)
def lowercase_dataset_urns(
@ -149,7 +151,7 @@ def lowercase_dataset_urns(
return _lowercase_dataset_urn(urn)
elif guess_entity_type(urn) == "schemaField":
cur_urn = Urn.create_from_string(urn)
cur_urn._entity_id[0] = _lowercase_dataset_urn(cur_urn._entity_id[0])
cur_urn._entity_ids[0] = _lowercase_dataset_urn(cur_urn._entity_ids[0])
return str(cur_urn)
return urn

View File

@ -4,7 +4,6 @@ from typing import Dict, List
import pydantic
import pytest
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.ingestion.source.state.checkpoint import Checkpoint, CheckpointStateBase
from datahub.ingestion.source.state.sql_common_state import (
BaseSQLAlchemyCheckpointState,
@ -59,12 +58,15 @@ def _assert_checkpoint_deserialization(
def _make_sql_alchemy_checkpoint_state() -> BaseSQLAlchemyCheckpointState:
# Note that the urns here purposely use a lowercase env, even though it's
# technically incorrect. This is purely for backwards compatibility testing, but
# all existing code uses correctly formed envs.
base_sql_alchemy_checkpoint_state_obj = BaseSQLAlchemyCheckpointState()
base_sql_alchemy_checkpoint_state_obj.add_checkpoint_urn(
type="table", urn=make_dataset_urn("mysql", "db1.t1", "prod")
type="table", urn="urn:li:dataset:(urn:li:dataPlatform:mysql,db1.t1,prod)"
)
base_sql_alchemy_checkpoint_state_obj.add_checkpoint_urn(
type="view", urn=make_dataset_urn("mysql", "db1.v1", "prod")
type="view", urn="urn:li:dataset:(urn:li:dataPlatform:mysql,db1.v1,prod)"
)
return base_sql_alchemy_checkpoint_state_obj

View File

@ -25,6 +25,6 @@ def test_kafka_state_migration() -> None:
}
)
assert state.urns == [
"urn:li:dataset:(urn:li:dataPlatform:kafka,test_topic1,test)",
"urn:li:dataset:(urn:li:dataPlatform:kafka,test_topic1,TEST)",
"urn:li:dataset:(urn:li:dataPlatform:kafka,topic_2,DEV)",
]

View File

@ -1,45 +0,0 @@
import unittest
from datahub.utilities.urns.error import InvalidUrnError
from datahub.utilities.urns.urn import Urn
class TestUrn(unittest.TestCase):
def test_parse_urn(self) -> None:
simple_urn_str = "urn:li:dataPlatform:abc"
urn = Urn.create_from_string(simple_urn_str)
assert urn.get_entity_id_as_string() == "abc"
assert urn.get_entity_id() == ["abc"]
assert urn.get_type() == "dataPlatform"
assert urn.get_domain() == "li"
assert urn.__str__() == simple_urn_str
assert urn == Urn("dataPlatform", ["abc"])
complex_urn_str = "urn:li:dataset:(urn:li:dataPlatform:abc,def,prod)"
urn = Urn.create_from_string(complex_urn_str)
assert urn.get_entity_id_as_string() == "(urn:li:dataPlatform:abc,def,prod)"
assert urn.get_entity_id() == ["urn:li:dataPlatform:abc", "def", "prod"]
assert urn.get_type() == "dataset"
assert urn.__str__() == "urn:li:dataset:(urn:li:dataPlatform:abc,def,prod)"
def test_url_encode_urn(self) -> None:
urn_with_slash: Urn = Urn.create_from_string(
"urn:li:dataset:(urn:li:dataPlatform:abc,def/ghi,prod)"
)
assert (
Urn.url_encode(str(urn_with_slash))
== "urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Aabc%2Cdef%2Fghi%2Cprod%29"
)
def test_invalid_urn(self) -> None:
with self.assertRaises(InvalidUrnError):
Urn.create_from_string("urn:li:abc")
with self.assertRaises(InvalidUrnError):
Urn.create_from_string("urn:li:abc:")
with self.assertRaises(InvalidUrnError):
Urn.create_from_string("urn:li:abc:()")
with self.assertRaises(InvalidUrnError):
Urn.create_from_string("urn:li:abc:(abc,)")

View File

@ -1,9 +1,12 @@
import unittest
import pytest
from datahub.utilities.urns.corp_group_urn import CorpGroupUrn
from datahub.utilities.urns.error import InvalidUrnError
@pytest.mark.filterwarnings("ignore::DeprecationWarning")
class TestCorpGroupUrn(unittest.TestCase):
def test_parse_urn(self) -> None:
corp_group_urn_str = "urn:li:corpGroup:abc"
@ -12,7 +15,7 @@ class TestCorpGroupUrn(unittest.TestCase):
assert corp_group_urn.get_entity_id() == ["abc"]
assert str(corp_group_urn) == corp_group_urn_str
assert corp_group_urn == CorpGroupUrn("corpGroup", ["abc"])
assert corp_group_urn == CorpGroupUrn(name="abc")
assert corp_group_urn == CorpGroupUrn.create_from_id("abc")
def test_invalid_urn(self) -> None:

View File

@ -1,9 +1,12 @@
import unittest
import pytest
from datahub.utilities.urns.corpuser_urn import CorpuserUrn
from datahub.utilities.urns.error import InvalidUrnError
@pytest.mark.filterwarnings("ignore::DeprecationWarning")
class TestCorpuserUrn(unittest.TestCase):
def test_parse_urn(self) -> None:
corpuser_urn_str = "urn:li:corpuser:abc"
@ -12,7 +15,7 @@ class TestCorpuserUrn(unittest.TestCase):
assert corpuser_urn.get_entity_id() == ["abc"]
assert str(corpuser_urn) == corpuser_urn_str
assert corpuser_urn == CorpuserUrn("corpuser", ["abc"])
assert corpuser_urn == CorpuserUrn("abc")
assert corpuser_urn == CorpuserUrn.create_from_id("abc")
def test_invalid_urn(self) -> None:

View File

@ -1,9 +1,12 @@
import unittest
import pytest
from datahub.utilities.urns.data_flow_urn import DataFlowUrn
from datahub.utilities.urns.error import InvalidUrnError
@pytest.mark.filterwarnings("ignore::DeprecationWarning")
class TestDataFlowUrn(unittest.TestCase):
def test_parse_urn(self) -> None:
data_flow_urn_str = "urn:li:dataFlow:(airflow,def,prod)"
@ -12,7 +15,7 @@ class TestDataFlowUrn(unittest.TestCase):
assert data_flow_urn.get_flow_id() == "def"
assert data_flow_urn.get_env() == "prod"
assert data_flow_urn.__str__() == "urn:li:dataFlow:(airflow,def,prod)"
assert data_flow_urn == DataFlowUrn("dataFlow", ["airflow", "def", "prod"])
assert data_flow_urn == DataFlowUrn("airflow", "def", "prod")
def test_invalid_urn(self) -> None:
with self.assertRaises(InvalidUrnError):
@ -20,8 +23,3 @@ class TestDataFlowUrn(unittest.TestCase):
with self.assertRaises(InvalidUrnError):
DataFlowUrn.create_from_string("urn:li:dataFlow:(airflow,flow_id)")
with self.assertRaises(InvalidUrnError):
DataFlowUrn.create_from_string(
"urn:li:dataFlow:(airflow,flow_id,invalidEnv)"
)

View File

@ -1,10 +1,13 @@
import unittest
import pytest
from datahub.utilities.urns.data_flow_urn import DataFlowUrn
from datahub.utilities.urns.data_job_urn import DataJobUrn
from datahub.utilities.urns.error import InvalidUrnError
@pytest.mark.filterwarnings("ignore::DeprecationWarning")
class TestDataJobUrn(unittest.TestCase):
def test_parse_urn(self) -> None:
data_job_urn_str = (
@ -17,7 +20,7 @@ class TestDataJobUrn(unittest.TestCase):
assert data_job_urn.get_job_id() == "job_id"
assert data_job_urn.__str__() == data_job_urn_str
assert data_job_urn == DataJobUrn(
"dataJob", ["urn:li:dataFlow:(airflow,flow_id,prod)", "job_id"]
"urn:li:dataFlow:(airflow,flow_id,prod)", "job_id"
)
def test_invalid_urn(self) -> None:

View File

@ -1,10 +1,13 @@
import unittest
import pytest
from datahub.utilities.urns.data_process_instance_urn import DataProcessInstanceUrn
from datahub.utilities.urns.error import InvalidUrnError
class TestDomainUrn(unittest.TestCase):
@pytest.mark.filterwarnings("ignore::DeprecationWarning")
class TestDataProcessInstanceUrn(unittest.TestCase):
def test_parse_urn(self) -> None:
dataprocessinstance_urn_str = "urn:li:dataProcessInstance:abc"
dataprocessinstance_urn = DataProcessInstanceUrn.create_from_string(
@ -14,9 +17,7 @@ class TestDomainUrn(unittest.TestCase):
assert dataprocessinstance_urn.get_entity_id() == ["abc"]
assert str(dataprocessinstance_urn) == dataprocessinstance_urn_str
assert dataprocessinstance_urn == DataProcessInstanceUrn(
"dataProcessInstance", ["abc"]
)
assert dataprocessinstance_urn == DataProcessInstanceUrn("abc")
assert dataprocessinstance_urn == DataProcessInstanceUrn.create_from_id("abc")
assert "abc" == dataprocessinstance_urn.get_dataprocessinstance_id()

View File

@ -1,26 +1,25 @@
import unittest
import pytest
from datahub.utilities.urns.data_platform_urn import DataPlatformUrn
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.error import InvalidUrnError
@pytest.mark.filterwarnings("ignore::DeprecationWarning")
class TestDatasetUrn(unittest.TestCase):
def test_parse_urn(self) -> None:
dataset_urn_str = "urn:li:dataset:(urn:li:dataPlatform:abc,def,prod)"
dataset_urn_str = "urn:li:dataset:(urn:li:dataPlatform:abc,def,PROD)"
dataset_urn = DatasetUrn.create_from_string(dataset_urn_str)
assert (
dataset_urn.get_data_platform_urn()
== DataPlatformUrn.create_from_string("urn:li:dataPlatform:abc")
)
assert dataset_urn.get_dataset_name() == "def"
assert dataset_urn.get_env() == "prod"
assert (
dataset_urn.__str__() == "urn:li:dataset:(urn:li:dataPlatform:abc,def,prod)"
)
assert dataset_urn == DatasetUrn(
"dataset", ["urn:li:dataPlatform:abc", "def", "prod"]
)
assert dataset_urn.get_env() == "PROD"
assert dataset_urn.__str__() == dataset_urn_str
assert dataset_urn == DatasetUrn("urn:li:dataPlatform:abc", "def", "prod")
def test_invalid_urn(self) -> None:
with self.assertRaises(InvalidUrnError):

View File

@ -1,9 +1,12 @@
import unittest
import pytest
from datahub.utilities.urns.domain_urn import DomainUrn
from datahub.utilities.urns.error import InvalidUrnError
@pytest.mark.filterwarnings("ignore::DeprecationWarning")
class TestDomainUrn(unittest.TestCase):
def test_parse_urn(self) -> None:
domain_urn_str = "urn:li:domain:abc"
@ -12,7 +15,7 @@ class TestDomainUrn(unittest.TestCase):
assert domain_urn.get_entity_id() == ["abc"]
assert str(domain_urn) == domain_urn_str
assert domain_urn == DomainUrn("domain", ["abc"])
assert domain_urn == DomainUrn("abc")
assert domain_urn == DomainUrn.create_from_id("abc")
def test_invalid_urn(self) -> None:

View File

@ -1,9 +1,12 @@
import unittest
import pytest
from datahub.utilities.urns.error import InvalidUrnError
from datahub.utilities.urns.notebook_urn import NotebookUrn
@pytest.mark.filterwarnings("ignore::DeprecationWarning")
class TestNotebookUrn(unittest.TestCase):
def test_parse_urn(self) -> None:
notebook_urn_str = "urn:li:notebook:(querybook,123)"
@ -12,7 +15,7 @@ class TestNotebookUrn(unittest.TestCase):
assert notebook_urn.get_notebook_id() == "123"
assert str(notebook_urn) == notebook_urn_str
assert notebook_urn == NotebookUrn("notebook", ["querybook", "123"])
assert notebook_urn == NotebookUrn("querybook", "123")
def test_invalid_urn(self) -> None:
with self.assertRaises(InvalidUrnError):

View File

@ -1,9 +1,12 @@
import unittest
import pytest
from datahub.utilities.urns.error import InvalidUrnError
from datahub.utilities.urns.tag_urn import TagUrn
@pytest.mark.filterwarnings("ignore::DeprecationWarning")
class TestTagUrn(unittest.TestCase):
def test_parse_urn(self) -> None:
tag_urn_str = "urn:li:tag:abc"
@ -12,7 +15,7 @@ class TestTagUrn(unittest.TestCase):
assert tag_urn.get_entity_id() == ["abc"]
assert str(tag_urn) == tag_urn_str
assert tag_urn == TagUrn("tag", ["abc"])
assert tag_urn == TagUrn("abc")
assert tag_urn == TagUrn.create_from_id("abc")
def test_invalid_urn(self) -> None:

View File

@ -0,0 +1,56 @@
import pytest
from datahub.metadata.urns import DatasetUrn, Urn
from datahub.utilities.urns.error import InvalidUrnError
pytestmark = pytest.mark.filterwarnings("ignore::DeprecationWarning")
def test_parse_urn() -> None:
simple_urn_str = "urn:li:dataPlatform:abc"
urn = Urn.create_from_string(simple_urn_str)
assert urn.get_entity_id_as_string() == "abc"
assert urn.get_entity_id() == ["abc"]
assert urn.get_type() == "dataPlatform"
assert urn.get_domain() == "li"
assert urn.__str__() == simple_urn_str
assert urn == Urn("dataPlatform", ["abc"])
complex_urn_str = "urn:li:dataset:(urn:li:dataPlatform:abc,def,prod)"
urn = Urn.create_from_string(complex_urn_str)
assert urn.get_entity_id_as_string() == "(urn:li:dataPlatform:abc,def,prod)"
assert urn.get_entity_id() == ["urn:li:dataPlatform:abc", "def", "prod"]
assert urn.get_type() == "dataset"
assert urn.__str__() == "urn:li:dataset:(urn:li:dataPlatform:abc,def,prod)"
def test_url_encode_urn() -> None:
urn_with_slash: Urn = Urn.create_from_string(
"urn:li:dataset:(urn:li:dataPlatform:abc,def/ghi,prod)"
)
assert (
Urn.url_encode(str(urn_with_slash))
== "urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Aabc%2Cdef%2Fghi%2Cprod%29"
)
def test_invalid_urn() -> None:
with pytest.raises(InvalidUrnError):
Urn.create_from_string("urn:li:abc")
with pytest.raises(InvalidUrnError):
Urn.create_from_string("urn:li:abc:")
with pytest.raises(InvalidUrnError):
Urn.create_from_string("urn:li:abc:()")
with pytest.raises(InvalidUrnError):
Urn.create_from_string("urn:li:abc:(abc,)")
def test_urn_type_dispatch() -> None:
urn = Urn.from_string("urn:li:dataset:(urn:li:dataPlatform:abc,def,prod)")
assert isinstance(urn, DatasetUrn)
with pytest.raises(InvalidUrnError, match="Passed an urn of type corpuser"):
DatasetUrn.from_string("urn:li:corpuser:foo")

View File

@ -400,7 +400,7 @@ entities:
- dataHubUpgradeRequest
- dataHubUpgradeResult
- name: inviteToken
category: core
category: internal
keyAspect: inviteTokenKey
aspects:
- inviteToken
@ -425,7 +425,7 @@ entities:
aspects:
- postInfo
- name: dataHubStepState
category: core
category: internal
keyAspect: dataHubStepStateKey
aspects:
- dataHubStepStateProperties