feat(ingest/unity): Ingest notebooks and their lineage (#8940)

This commit is contained in:
Andrew Sikowitz 2023-10-04 10:22:45 -04:00 committed by GitHub
parent 13508a9d88
commit d3346a04e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 257 additions and 72 deletions

View File

@ -13,6 +13,7 @@
* Ownership of or `SELECT` privilege on any tables and views you want to ingest
* [Ownership documentation](https://docs.databricks.com/data-governance/unity-catalog/manage-privileges/ownership.html)
* [Privileges documentation](https://docs.databricks.com/data-governance/unity-catalog/manage-privileges/privileges.html)
+ To ingest your workspace's notebooks and respective lineage, your service principal must have `CAN_READ` privileges on the folders containing the notebooks you want to ingest: [guide](https://docs.databricks.com/en/security/auth-authz/access-control/workspace-acl.html#folder-permissions).
+ To `include_usage_statistics` (enabled by default), your service principal must have `CAN_MANAGE` permissions on any SQL Warehouses you want to ingest: [guide](https://docs.databricks.com/security/auth-authz/access-control/sql-endpoint-acl.html).
+ To ingest `profiling` information with `call_analyze` (enabled by default), your service principal must have ownership or `MODIFY` privilege on any tables you want to profile.
* Alternatively, you can run [ANALYZE TABLE](https://docs.databricks.com/sql/language-manual/sql-ref-syntax-aux-analyze-table.html) yourself on any tables you want to profile, then set `call_analyze` to `false`.

View File

@ -250,7 +250,7 @@ usage_common = {
databricks = {
# 0.1.11 appears to have authentication issues with azure databricks
"databricks-sdk>=0.1.1, != 0.1.11",
"databricks-sdk>=0.9.0",
"pyspark",
"requests",
}

View File

@ -9,6 +9,7 @@ from datahub.emitter.mce_builder import (
make_container_urn,
make_data_platform_urn,
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
@ -125,6 +126,17 @@ class BucketKey(ContainerKey):
bucket_name: str
class NotebookKey(DatahubKey):
notebook_id: int
platform: str
instance: Optional[str]
def as_urn(self) -> str:
return make_dataset_urn_with_platform_instance(
platform=self.platform, platform_instance=self.instance, name=self.guid()
)
class DatahubKeyJSONEncoder(json.JSONEncoder):
# overload method default
def default(self, obj: Any) -> Any:

View File

@ -16,6 +16,9 @@ class DatasetSubTypes(str, Enum):
SALESFORCE_STANDARD_OBJECT = "Object"
POWERBI_DATASET_TABLE = "PowerBI Dataset Table"
# TODO: Create separate entity...
NOTEBOOK = "Notebook"
class DatasetContainerSubTypes(str, Enum):
# Generic SubTypes

View File

@ -127,11 +127,16 @@ class UnityCatalogSourceConfig(
description='Attach domains to catalogs, schemas or tables during ingestion using regex patterns. Domain key can be a guid like *urn:li:domain:ec428203-ce86-4db3-985d-5a8ee6df32ba* or a string like "Marketing".) If you provide strings, then datahub will attempt to resolve this name to a guid, and will error out if this fails. There can be multiple domain keys specified.',
)
include_table_lineage: Optional[bool] = pydantic.Field(
include_table_lineage: bool = pydantic.Field(
default=True,
description="Option to enable/disable lineage generation.",
)
include_notebooks: bool = pydantic.Field(
default=False,
description="Ingest notebooks, represented as DataHub datasets.",
)
include_ownership: bool = pydantic.Field(
default=False,
description="Option to enable/disable ownership generation for metastores, catalogs, schemas, and tables.",
@ -141,7 +146,7 @@ class UnityCatalogSourceConfig(
"include_table_ownership", "include_ownership"
)
include_column_lineage: Optional[bool] = pydantic.Field(
include_column_lineage: bool = pydantic.Field(
default=True,
description="Option to enable/disable lineage generation. Currently we have to call a rest call per column to get column level lineage due to the Databrick api which can slow down ingestion. ",
)

View File

@ -23,6 +23,7 @@ from databricks.sdk.service.sql import (
QueryStatementType,
QueryStatus,
)
from databricks.sdk.service.workspace import ObjectType
import datahub
from datahub.ingestion.source.unity.proxy_profiling import (
@ -33,6 +34,7 @@ from datahub.ingestion.source.unity.proxy_types import (
Catalog,
Column,
Metastore,
Notebook,
Query,
Schema,
ServicePrincipal,
@ -137,6 +139,21 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
for principal in self._workspace_client.service_principals.list():
yield self._create_service_principal(principal)
def workspace_notebooks(self) -> Iterable[Notebook]:
for obj in self._workspace_client.workspace.list("/", recursive=True):
if obj.object_type == ObjectType.NOTEBOOK:
yield Notebook(
id=obj.object_id,
path=obj.path,
language=obj.language,
created_at=datetime.fromtimestamp(
obj.created_at / 1000, tz=timezone.utc
),
modified_at=datetime.fromtimestamp(
obj.modified_at / 1000, tz=timezone.utc
),
)
def query_history(
self,
start_time: datetime,
@ -153,7 +170,7 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
"start_time_ms": start_time.timestamp() * 1000,
"end_time_ms": end_time.timestamp() * 1000,
},
"statuses": [QueryStatus.FINISHED.value],
"statuses": [QueryStatus.FINISHED],
"statement_types": [typ.value for typ in ALLOWED_STATEMENT_TYPES],
}
)
@ -196,61 +213,75 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
method, path, body={**body, "page_token": response["next_page_token"]}
)
def list_lineages_by_table(self, table_name: str) -> dict:
def list_lineages_by_table(
self, table_name: str, include_entity_lineage: bool
) -> dict:
"""List table lineage by table name."""
return self._workspace_client.api_client.do(
method="GET",
path="/api/2.0/lineage-tracking/table-lineage/get",
body={"table_name": table_name},
path="/api/2.0/lineage-tracking/table-lineage",
body={
"table_name": table_name,
"include_entity_lineage": include_entity_lineage,
},
)
def list_lineages_by_column(self, table_name: str, column_name: str) -> dict:
"""List column lineage by table name and column name."""
return self._workspace_client.api_client.do(
"GET",
"/api/2.0/lineage-tracking/column-lineage/get",
"/api/2.0/lineage-tracking/column-lineage",
body={"table_name": table_name, "column_name": column_name},
)
def table_lineage(self, table: Table) -> None:
def table_lineage(
self, table: Table, include_entity_lineage: bool
) -> Optional[dict]:
# Lineage endpoint doesn't exists on 2.1 version
try:
response: dict = self.list_lineages_by_table(
table_name=f"{table.schema.catalog.name}.{table.schema.name}.{table.name}"
table_name=table.ref.qualified_table_name,
include_entity_lineage=include_entity_lineage,
)
table.upstreams = {
TableReference(
table.schema.catalog.metastore.id,
item["catalog_name"],
item["schema_name"],
item["name"],
): {}
for item in response.get("upstream_tables", [])
}
for item in response.get("upstreams") or []:
if "tableInfo" in item:
table_ref = TableReference.create_from_lineage(
item["tableInfo"], table.schema.catalog.metastore.id
)
if table_ref:
table.upstreams[table_ref] = {}
for notebook in item.get("notebookInfos") or []:
table.upstream_notebooks.add(notebook["notebook_id"])
for item in response.get("downstreams") or []:
for notebook in item.get("notebookInfos") or []:
table.downstream_notebooks.add(notebook["notebook_id"])
return response
except Exception as e:
logger.error(f"Error getting lineage: {e}")
return None
def get_column_lineage(self, table: Table) -> None:
def get_column_lineage(self, table: Table, include_entity_lineage: bool) -> None:
try:
table_lineage_response: dict = self.list_lineages_by_table(
table_name=f"{table.schema.catalog.name}.{table.schema.name}.{table.name}"
table_lineage = self.table_lineage(
table, include_entity_lineage=include_entity_lineage
)
if table_lineage_response:
if table_lineage:
for column in table.columns:
response: dict = self.list_lineages_by_column(
table_name=f"{table.schema.catalog.name}.{table.schema.name}.{table.name}",
table_name=table.ref.qualified_table_name,
column_name=column.name,
)
for item in response.get("upstream_cols", []):
table_ref = TableReference(
table.schema.catalog.metastore.id,
item["catalog_name"],
item["schema_name"],
item["table_name"],
table_ref = TableReference.create_from_lineage(
item, table.schema.catalog.metastore.id
)
table.upstreams.setdefault(table_ref, {}).setdefault(
column.name, []
).append(item["name"])
if table_ref:
table.upstreams.setdefault(table_ref, {}).setdefault(
column.name, []
).append(item["name"])
except Exception as e:
logger.error(f"Error getting lineage: {e}")

View File

@ -1,8 +1,10 @@
# Supported types are available at
# https://api-docs.databricks.com/rest/latest/unity-catalog-api-specification-2-1.html?_ga=2.151019001.1795147704.1666247755-2119235717.1666247755
import dataclasses
import logging
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Optional
from typing import Dict, FrozenSet, List, Optional, Set
from databricks.sdk.service.catalog import (
CatalogType,
@ -11,6 +13,7 @@ from databricks.sdk.service.catalog import (
TableType,
)
from databricks.sdk.service.sql import QueryStatementType
from databricks.sdk.service.workspace import Language
from datahub.metadata.schema_classes import (
ArrayTypeClass,
@ -26,6 +29,8 @@ from datahub.metadata.schema_classes import (
TimeTypeClass,
)
logger = logging.getLogger(__name__)
DATA_TYPE_REGISTRY: dict = {
ColumnTypeName.BOOLEAN: BooleanTypeClass,
ColumnTypeName.BYTE: BytesTypeClass,
@ -66,6 +71,9 @@ OPERATION_STATEMENT_TYPES = {
ALLOWED_STATEMENT_TYPES = {*OPERATION_STATEMENT_TYPES.keys(), QueryStatementType.SELECT}
NotebookId = int
@dataclass
class CommonProperty:
id: str
@ -136,6 +144,19 @@ class TableReference:
table.name,
)
@classmethod
def create_from_lineage(cls, d: dict, metastore: str) -> Optional["TableReference"]:
try:
return cls(
metastore,
d["catalog_name"],
d["schema_name"],
d.get("table_name", d["name"]), # column vs table query output
)
except Exception as e:
logger.warning(f"Failed to create TableReference from {d}: {e}")
return None
def __str__(self) -> str:
return f"{self.metastore}.{self.catalog}.{self.schema}.{self.table}"
@ -166,6 +187,8 @@ class Table(CommonProperty):
view_definition: Optional[str]
properties: Dict[str, str]
upstreams: Dict[TableReference, Dict[str, List[str]]] = field(default_factory=dict)
upstream_notebooks: Set[NotebookId] = field(default_factory=set)
downstream_notebooks: Set[NotebookId] = field(default_factory=set)
ref: TableReference = field(init=False)
@ -228,3 +251,23 @@ class ColumnProfile:
self.max is not None,
)
)
@dataclass
class Notebook:
id: NotebookId
path: str
language: Language
created_at: datetime
modified_at: datetime
upstreams: FrozenSet[TableReference] = field(default_factory=frozenset)
@classmethod
def add_upstream(cls, upstream: TableReference, notebook: "Notebook") -> "Notebook":
return cls(
**{ # type: ignore
**dataclasses.asdict(notebook),
"upstreams": frozenset([*notebook.upstreams, upstream]),
}
)

View File

@ -5,21 +5,23 @@ from datahub.ingestion.api.report import EntityFilterReport
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
)
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
from datahub.utilities.lossy_collections import LossyDict, LossyList
@dataclass
class UnityCatalogReport(StaleEntityRemovalSourceReport):
class UnityCatalogReport(IngestionStageReport, StaleEntityRemovalSourceReport):
metastores: EntityFilterReport = EntityFilterReport.field(type="metastore")
catalogs: EntityFilterReport = EntityFilterReport.field(type="catalog")
schemas: EntityFilterReport = EntityFilterReport.field(type="schema")
tables: EntityFilterReport = EntityFilterReport.field(type="table/view")
table_profiles: EntityFilterReport = EntityFilterReport.field(type="table profile")
notebooks: EntityFilterReport = EntityFilterReport.field(type="notebook")
num_queries: int = 0
num_queries_dropped_parse_failure: int = 0
num_queries_dropped_missing_table: int = 0 # Can be due to pattern filter
num_queries_dropped_duplicate_table: int = 0
num_queries_missing_table: int = 0 # Can be due to pattern filter
num_queries_duplicate_table: int = 0
num_queries_parsed_by_spark_plan: int = 0
# Distinguish from Operations emitted for created / updated timestamps

View File

@ -2,7 +2,7 @@ import logging
import re
import time
from datetime import timedelta
from typing import Dict, Iterable, List, Optional, Set
from typing import Dict, Iterable, List, Optional, Set, Union
from urllib.parse import urljoin
from datahub.emitter.mce_builder import (
@ -18,6 +18,7 @@ from datahub.emitter.mcp_builder import (
CatalogKey,
ContainerKey,
MetastoreKey,
NotebookKey,
UnitySchemaKey,
add_dataset_to_container,
gen_containers,
@ -56,6 +57,8 @@ from datahub.ingestion.source.unity.proxy_types import (
Catalog,
Column,
Metastore,
Notebook,
NotebookId,
Schema,
ServicePrincipal,
Table,
@ -69,6 +72,7 @@ from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
ViewProperties,
)
from datahub.metadata.schema_classes import (
BrowsePathsClass,
DataPlatformInstanceClass,
DatasetLineageTypeClass,
DatasetPropertiesClass,
@ -88,6 +92,7 @@ from datahub.metadata.schema_classes import (
UpstreamClass,
UpstreamLineageClass,
)
from datahub.utilities.file_backed_collections import FileBackedDict
from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_column
from datahub.utilities.registries.domain_registry import DomainRegistry
@ -157,6 +162,7 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
# Global set of table refs
self.table_refs: Set[TableReference] = set()
self.view_refs: Set[TableReference] = set()
self.notebooks: FileBackedDict[Notebook] = FileBackedDict()
@staticmethod
def test_connection(config_dict: dict) -> TestConnectionReport:
@ -176,6 +182,7 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
]
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.report.report_ingestion_stage_start("Start warehouse")
wait_on_warehouse = None
if self.config.is_profiling_enabled():
# Can take several minutes, so start now and wait later
@ -187,10 +194,23 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
)
return
self.report.report_ingestion_stage_start("Ingest service principals")
self.build_service_principal_map()
if self.config.include_notebooks:
self.report.report_ingestion_stage_start("Ingest notebooks")
yield from self.process_notebooks()
yield from self.process_metastores()
if self.config.include_notebooks:
self.report.report_ingestion_stage_start("Notebook lineage")
for notebook in self.notebooks.values():
wu = self._gen_notebook_lineage(notebook)
if wu:
yield wu
if self.config.include_usage_statistics:
self.report.report_ingestion_stage_start("Ingest usage")
usage_extractor = UnityCatalogUsageExtractor(
config=self.config,
report=self.report,
@ -203,6 +223,7 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
)
if self.config.is_profiling_enabled():
self.report.report_ingestion_stage_start("Wait on warehouse")
assert wait_on_warehouse
timeout = timedelta(seconds=self.config.profiling.max_wait_secs)
wait_on_warehouse.result(timeout)
@ -212,6 +233,7 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
self.unity_catalog_api_proxy,
self.gen_dataset_urn,
)
self.report.report_ingestion_stage_start("Profiling")
yield from profiling_extractor.get_workunits(self.table_refs)
def build_service_principal_map(self) -> None:
@ -223,6 +245,56 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
"service-principals", f"Unable to fetch service principals: {e}"
)
def process_notebooks(self) -> Iterable[MetadataWorkUnit]:
for notebook in self.unity_catalog_api_proxy.workspace_notebooks():
self.notebooks[str(notebook.id)] = notebook
yield from self._gen_notebook_aspects(notebook)
def _gen_notebook_aspects(self, notebook: Notebook) -> Iterable[MetadataWorkUnit]:
mcps = MetadataChangeProposalWrapper.construct_many(
entityUrn=self.gen_notebook_urn(notebook),
aspects=[
DatasetPropertiesClass(
name=notebook.path.rsplit("/", 1)[-1],
customProperties={
"path": notebook.path,
"language": notebook.language.value,
},
externalUrl=urljoin(
self.config.workspace_url, f"#notebook/{notebook.id}"
),
created=TimeStampClass(int(notebook.created_at.timestamp() * 1000)),
lastModified=TimeStampClass(
int(notebook.modified_at.timestamp() * 1000)
),
),
SubTypesClass(typeNames=[DatasetSubTypes.NOTEBOOK]),
BrowsePathsClass(paths=notebook.path.split("/")),
# TODO: Add DPI aspect
],
)
for mcp in mcps:
yield mcp.as_workunit()
self.report.notebooks.processed(notebook.path)
def _gen_notebook_lineage(self, notebook: Notebook) -> Optional[MetadataWorkUnit]:
if not notebook.upstreams:
return None
return MetadataChangeProposalWrapper(
entityUrn=self.gen_notebook_urn(notebook),
aspect=UpstreamLineageClass(
upstreams=[
UpstreamClass(
dataset=self.gen_dataset_urn(upstream_ref),
type=DatasetLineageTypeClass.COPY,
)
for upstream_ref in notebook.upstreams
]
),
).as_workunit()
def process_metastores(self) -> Iterable[MetadataWorkUnit]:
metastore = self.unity_catalog_api_proxy.assigned_metastore()
yield from self.gen_metastore_containers(metastore)
@ -247,6 +319,7 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
self.report.schemas.dropped(schema.id)
continue
self.report.report_ingestion_stage_start(f"Ingest schema {schema.id}")
yield from self.gen_schema_containers(schema)
yield from self.process_tables(schema)
@ -282,13 +355,21 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
ownership = self._create_table_ownership_aspect(table)
data_platform_instance = self._create_data_platform_instance_aspect(table)
lineage: Optional[UpstreamLineageClass] = None
if self.config.include_column_lineage:
self.unity_catalog_api_proxy.get_column_lineage(table)
lineage = self._generate_column_lineage_aspect(dataset_urn, table)
self.unity_catalog_api_proxy.get_column_lineage(
table, include_entity_lineage=self.config.include_notebooks
)
elif self.config.include_table_lineage:
self.unity_catalog_api_proxy.table_lineage(table)
lineage = self._generate_lineage_aspect(dataset_urn, table)
self.unity_catalog_api_proxy.table_lineage(
table, include_entity_lineage=self.config.include_notebooks
)
lineage = self._generate_lineage_aspect(dataset_urn, table)
if self.config.include_notebooks:
for notebook_id in table.downstream_notebooks:
self.notebooks[str(notebook_id)] = Notebook.add_upstream(
table.ref, self.notebooks[str(notebook_id)]
)
yield from [
mcp.as_workunit()
@ -308,7 +389,7 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
)
]
def _generate_column_lineage_aspect(
def _generate_lineage_aspect(
self, dataset_urn: str, table: Table
) -> Optional[UpstreamLineageClass]:
upstreams: List[UpstreamClass] = []
@ -318,6 +399,7 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
):
upstream_urn = self.gen_dataset_urn(upstream_ref)
# Should be empty if config.include_column_lineage is False
finegrained_lineages.extend(
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
@ -331,41 +413,31 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
for d_col, u_cols in sorted(downstream_to_upstream_cols.items())
)
upstream_table = UpstreamClass(
upstream_urn,
DatasetLineageTypeClass.TRANSFORMED,
upstreams.append(
UpstreamClass(
dataset=upstream_urn,
type=DatasetLineageTypeClass.TRANSFORMED,
)
)
for notebook in table.upstream_notebooks:
upstreams.append(
UpstreamClass(
dataset=self.gen_notebook_urn(notebook),
type=DatasetLineageTypeClass.TRANSFORMED,
)
)
upstreams.append(upstream_table)
if upstreams:
return UpstreamLineageClass(
upstreams=upstreams, fineGrainedLineages=finegrained_lineages
upstreams=upstreams,
fineGrainedLineages=finegrained_lineages
if self.config.include_column_lineage
else None,
)
else:
return None
def _generate_lineage_aspect(
self, dataset_urn: str, table: Table
) -> Optional[UpstreamLineageClass]:
upstreams: List[UpstreamClass] = []
for upstream in sorted(table.upstreams.keys()):
upstream_urn = make_dataset_urn_with_platform_instance(
self.platform,
f"{table.schema.catalog.metastore.id}.{upstream}",
self.platform_instance_name,
)
upstream_table = UpstreamClass(
upstream_urn,
DatasetLineageTypeClass.TRANSFORMED,
)
upstreams.append(upstream_table)
if upstreams:
return UpstreamLineageClass(upstreams=upstreams)
else:
return None
def _get_domain_aspect(self, dataset_name: str) -> Optional[DomainsClass]:
domain_urn = self._gen_domain_urn(dataset_name)
if not domain_urn:
@ -389,6 +461,14 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
name=str(table_ref),
)
def gen_notebook_urn(self, notebook: Union[Notebook, NotebookId]) -> str:
notebook_id = notebook.id if isinstance(notebook, Notebook) else notebook
return NotebookKey(
notebook_id=notebook_id,
platform=self.platform,
instance=self.config.platform_instance,
).as_urn()
def gen_schema_containers(self, schema: Schema) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(f"{schema.catalog.name}.{schema.name}")

View File

@ -214,12 +214,15 @@ class UnityCatalogUsageExtractor:
self, tables: List[str], table_map: TableMap
) -> List[TableReference]:
"""Resolve tables to TableReferences, filtering out unrecognized or unresolvable table names."""
missing_table = False
duplicate_table = False
output = []
for table in tables:
table = str(table)
if table not in table_map:
logger.debug(f"Dropping query with unrecognized table: {table}")
self.report.num_queries_dropped_missing_table += 1
missing_table = True
else:
refs = table_map[table]
if len(refs) == 1:
@ -228,6 +231,11 @@ class UnityCatalogUsageExtractor:
logger.warning(
f"Could not resolve table ref for {table}: {len(refs)} duplicates."
)
self.report.num_queries_dropped_duplicate_table += 1
duplicate_table = True
if missing_table:
self.report.num_queries_missing_table += 1
if duplicate_table:
self.report.num_queries_duplicate_table += 1
return output