fix(ingest): domains - check whether urn based domain exists during resolution (#5373)

This commit is contained in:
Shirshanka Das 2022-07-11 15:08:26 -07:00 committed by GitHub
parent 070dfa0eaf
commit 489b5bb5b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 56 additions and 10 deletions

View File

@ -100,6 +100,41 @@ source:
- "long_tail_companions.ecommerce.*"
```
:::note
When bare domain names like `Analytics` is used, the ingestion system will first check if a domain like `urn:li:domain:Analytics` is provisioned, failing that; it will check for a provisioned domain that has the same name. If we are unable to resolve bare domain names to provisioned domains, then ingestion will refuse to proceeed until the domain is provisioned on DataHub.
:::
You can also provide fully-qualified domain names to ensure that no ingestion-time domain resolution is needed. For example, the following recipe shows an example using fully qualified domain names:
```yaml
source:
type: snowflake
config:
username: ${SNOW_USER}
password: ${SNOW_PASS}
account_id:
warehouse: COMPUTE_WH
role: accountadmin
database_pattern:
allow:
- "long_tail_companions"
schema_pattern:
deny:
- information_schema
profiling:
enabled: False
domain:
"urn:li:domain:6289fccc-4af2-4cbb-96ed-051e7d1de93c":
allow:
- "long_tail_companions.analytics.*"
"urn:li:domain:07155b15-cee6-4fda-b1c1-5a19a6b74c3a":
allow:
- "long_tail_companions.ecommerce.*"
```
## Searching by Domain

View File

@ -28,16 +28,27 @@ class DomainRegistry:
)
for domain_identifier in domains_needing_resolution:
assert graph
domain_urn = graph.get_domain_urn_by_name(domain_identifier)
if domain_urn:
self.domain_registry[domain_identifier] = domain_urn
# first try to check if this domain exists by urn
maybe_domain_urn = f"urn:li:domain:{domain_identifier}"
from datahub.metadata.schema_classes import DomainPropertiesClass
maybe_domain_properties = graph.get_aspect_v2(
maybe_domain_urn, DomainPropertiesClass, "domainProperties"
)
if maybe_domain_properties:
self.domain_registry[domain_identifier] = maybe_domain_urn
else:
logger.error(
f"Failed to retrieve domain id for domain {domain_identifier}"
)
raise ValueError(
f"domain {domain_identifier} doesn't seem to be provisioned on DataHub. Either provision it first and re-run ingestion, or provide a fully qualified domain id (e.g. urn:li:domain:ec428203-ce86-4db3-985d-5a8ee6df32ba) to skip this check."
)
# try to get this domain by name
domain_urn = graph.get_domain_urn_by_name(domain_identifier)
if domain_urn:
self.domain_registry[domain_identifier] = domain_urn
else:
logger.error(
f"Failed to retrieve domain id for domain {domain_identifier}"
)
raise ValueError(
f"domain {domain_identifier} doesn't seem to be provisioned on DataHub. Either provision it first and re-run ingestion, or provide a fully qualified domain id (e.g. urn:li:domain:ec428203-ce86-4db3-985d-5a8ee6df32ba) to skip this check."
)
def get_domain_urn(self, domain_identifier: str) -> str:
return self.domain_registry.get(domain_identifier) or domain_identifier