feat(ingest/unity): Support specifying catalogs directly; pass env correctly (#9110)

This commit is contained in:
Andrew Sikowitz 2023-11-16 12:41:12 -05:00 committed by GitHub
parent 15efa72728
commit 78abeb9beb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 44 additions and 5 deletions

View File

@ -68,6 +68,9 @@ qualified dataset name, i.e. `<project_name>.<dataset_name>`. We attempt to supp
pattern format by prepending `.*\\.` to dataset patterns lacking a period, so in most cases this pattern format by prepending `.*\\.` to dataset patterns lacking a period, so in most cases this
should not cause any issues. However, if you have a complex dataset pattern, we recommend you should not cause any issues. However, if you have a complex dataset pattern, we recommend you
manually convert it to the fully qualified format to avoid any potential issues. manually convert it to the fully qualified format to avoid any potential issues.
- #9110 - The Unity Catalog source will now generate urns based on `env` properly. If you have
been setting `env` in your recipe to something besides `PROD`, we will now generate urns
with that new env variable, invalidating your existing urns.
### Potential Downtime ### Potential Downtime

View File

@ -1,7 +1,7 @@
import logging import logging
import os import os
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional from typing import Any, Dict, List, Optional
import pydantic import pydantic
from pydantic import Field from pydantic import Field
@ -132,6 +132,14 @@ class UnityCatalogSourceConfig(
_metastore_id_pattern_removed = pydantic_removed_field("metastore_id_pattern") _metastore_id_pattern_removed = pydantic_removed_field("metastore_id_pattern")
catalogs: Optional[List[str]] = pydantic.Field(
default=None,
description=(
"Fixed list of catalogs to ingest."
" If not specified, catalogs will be ingested based on `catalog_pattern`."
),
)
catalog_pattern: AllowDenyPattern = Field( catalog_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(), default=AllowDenyPattern.allow_all(),
description="Regex patterns for catalogs to filter in ingestion. Specify regex to match the full `metastore.catalog` name.", description="Regex patterns for catalogs to filter in ingestion. Specify regex to match the full `metastore.catalog` name.",

View File

@ -112,6 +112,15 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
for catalog in response: for catalog in response:
yield self._create_catalog(metastore, catalog) yield self._create_catalog(metastore, catalog)
def catalog(
self, catalog_name: str, metastore: Optional[Metastore]
) -> Optional[Catalog]:
response = self._workspace_client.catalogs.get(catalog_name)
if not response:
logger.info(f"Catalog {catalog_name} not found")
return None
return self._create_catalog(metastore, response)
def schemas(self, catalog: Catalog) -> Iterable[Schema]: def schemas(self, catalog: Catalog) -> Iterable[Schema]:
response = self._workspace_client.schemas.list(catalog_name=catalog.name) response = self._workspace_client.schemas.list(catalog_name=catalog.name)
if not response: if not response:

View File

@ -188,9 +188,10 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
] ]
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.report.report_ingestion_stage_start("Start warehouse") self.report.report_ingestion_stage_start("Ingestion Setup")
wait_on_warehouse = None wait_on_warehouse = None
if self.config.is_profiling_enabled(): if self.config.is_profiling_enabled():
self.report.report_ingestion_stage_start("Start warehouse")
# Can take several minutes, so start now and wait later # Can take several minutes, so start now and wait later
wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse() wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse()
if wait_on_warehouse is None: if wait_on_warehouse is None:
@ -200,8 +201,9 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
) )
return return
self.report.report_ingestion_stage_start("Ingest service principals") if self.config.include_ownership:
self.build_service_principal_map() self.report.report_ingestion_stage_start("Ingest service principals")
self.build_service_principal_map()
if self.config.include_notebooks: if self.config.include_notebooks:
self.report.report_ingestion_stage_start("Ingest notebooks") self.report.report_ingestion_stage_start("Ingest notebooks")
yield from self.process_notebooks() yield from self.process_notebooks()
@ -317,7 +319,7 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
def process_catalogs( def process_catalogs(
self, metastore: Optional[Metastore] self, metastore: Optional[Metastore]
) -> Iterable[MetadataWorkUnit]: ) -> Iterable[MetadataWorkUnit]:
for catalog in self.unity_catalog_api_proxy.catalogs(metastore=metastore): for catalog in self._get_catalogs(metastore):
if not self.config.catalog_pattern.allowed(catalog.id): if not self.config.catalog_pattern.allowed(catalog.id):
self.report.catalogs.dropped(catalog.id) self.report.catalogs.dropped(catalog.id)
continue continue
@ -327,6 +329,17 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
self.report.catalogs.processed(catalog.id) self.report.catalogs.processed(catalog.id)
def _get_catalogs(self, metastore: Optional[Metastore]) -> Iterable[Catalog]:
if self.config.catalogs:
for catalog_name in self.config.catalogs:
catalog = self.unity_catalog_api_proxy.catalog(
catalog_name, metastore=metastore
)
if catalog:
yield catalog
else:
yield from self.unity_catalog_api_proxy.catalogs(metastore=metastore)
def process_schemas(self, catalog: Catalog) -> Iterable[MetadataWorkUnit]: def process_schemas(self, catalog: Catalog) -> Iterable[MetadataWorkUnit]:
for schema in self.unity_catalog_api_proxy.schemas(catalog=catalog): for schema in self.unity_catalog_api_proxy.schemas(catalog=catalog):
if not self.config.schema_pattern.allowed(schema.id): if not self.config.schema_pattern.allowed(schema.id):
@ -509,6 +522,7 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
platform=self.platform, platform=self.platform,
platform_instance=self.platform_instance_name, platform_instance=self.platform_instance_name,
name=str(table_ref), name=str(table_ref),
env=self.config.env,
) )
def gen_notebook_urn(self, notebook: Union[Notebook, NotebookId]) -> str: def gen_notebook_urn(self, notebook: Union[Notebook, NotebookId]) -> str:
@ -576,6 +590,7 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
instance=self.config.platform_instance, instance=self.config.platform_instance,
catalog=schema.catalog.name, catalog=schema.catalog.name,
metastore=schema.catalog.metastore.name, metastore=schema.catalog.metastore.name,
env=self.config.env,
) )
else: else:
return UnitySchemaKey( return UnitySchemaKey(
@ -583,6 +598,7 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
platform=self.platform, platform=self.platform,
instance=self.config.platform_instance, instance=self.config.platform_instance,
catalog=schema.catalog.name, catalog=schema.catalog.name,
env=self.config.env,
) )
def gen_metastore_key(self, metastore: Metastore) -> MetastoreKey: def gen_metastore_key(self, metastore: Metastore) -> MetastoreKey:
@ -590,6 +606,7 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
metastore=metastore.name, metastore=metastore.name,
platform=self.platform, platform=self.platform,
instance=self.config.platform_instance, instance=self.config.platform_instance,
env=self.config.env,
) )
def gen_catalog_key(self, catalog: Catalog) -> ContainerKey: def gen_catalog_key(self, catalog: Catalog) -> ContainerKey:
@ -600,12 +617,14 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
metastore=catalog.metastore.name, metastore=catalog.metastore.name,
platform=self.platform, platform=self.platform,
instance=self.config.platform_instance, instance=self.config.platform_instance,
env=self.config.env,
) )
else: else:
return CatalogKey( return CatalogKey(
catalog=catalog.name, catalog=catalog.name,
platform=self.platform, platform=self.platform,
instance=self.config.platform_instance, instance=self.config.platform_instance,
env=self.config.env,
) )
def _gen_domain_urn(self, dataset_name: str) -> Optional[str]: def _gen_domain_urn(self, dataset_name: str) -> Optional[str]: