mirror of
https://github.com/datahub-project/datahub.git
synced 2025-09-26 01:23:16 +00:00
feat(ingest/unity-catalog): Support external S3 lineage (#9025)
This commit is contained in:
parent
6366b63e48
commit
9fec6024fb
@ -34,21 +34,26 @@ def get_bucket_relative_path(s3_uri: str) -> str:
|
|||||||
return "/".join(strip_s3_prefix(s3_uri).split("/")[1:])
|
return "/".join(strip_s3_prefix(s3_uri).split("/")[1:])
|
||||||
|
|
||||||
|
|
||||||
def make_s3_urn(s3_uri: str, env: str) -> str:
|
def make_s3_urn(s3_uri: str, env: str, remove_extension: bool = True) -> str:
|
||||||
s3_name = strip_s3_prefix(s3_uri)
|
s3_name = strip_s3_prefix(s3_uri)
|
||||||
|
|
||||||
if s3_name.endswith("/"):
|
if s3_name.endswith("/"):
|
||||||
s3_name = s3_name[:-1]
|
s3_name = s3_name[:-1]
|
||||||
|
|
||||||
name, extension = os.path.splitext(s3_name)
|
name, extension = os.path.splitext(s3_name)
|
||||||
|
if remove_extension and extension != "":
|
||||||
if extension != "":
|
|
||||||
extension = extension[1:] # remove the dot
|
extension = extension[1:] # remove the dot
|
||||||
return f"urn:li:dataset:(urn:li:dataPlatform:s3,{name}_{extension},{env})"
|
return f"urn:li:dataset:(urn:li:dataPlatform:s3,{name}_{extension},{env})"
|
||||||
|
|
||||||
return f"urn:li:dataset:(urn:li:dataPlatform:s3,{s3_name},{env})"
|
return f"urn:li:dataset:(urn:li:dataPlatform:s3,{s3_name},{env})"
|
||||||
|
|
||||||
|
|
||||||
|
def make_s3_urn_for_lineage(s3_uri: str, env: str) -> str:
|
||||||
|
# Ideally this is the implementation for all S3 URNs
|
||||||
|
# Don't feel comfortable changing `make_s3_urn` for glue, sagemaker, and athena
|
||||||
|
return make_s3_urn(s3_uri, env, remove_extension=False)
|
||||||
|
|
||||||
|
|
||||||
def get_bucket_name(s3_uri: str) -> str:
|
def get_bucket_name(s3_uri: str) -> str:
|
||||||
if not is_s3_uri(s3_uri):
|
if not is_s3_uri(s3_uri):
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
|
@ -21,7 +21,7 @@ from snowflake.connector import SnowflakeConnection
|
|||||||
import datahub.emitter.mce_builder as builder
|
import datahub.emitter.mce_builder as builder
|
||||||
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
||||||
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
||||||
from datahub.ingestion.source.aws.s3_util import make_s3_urn
|
from datahub.ingestion.source.aws.s3_util import make_s3_urn_for_lineage
|
||||||
from datahub.ingestion.source.snowflake.constants import (
|
from datahub.ingestion.source.snowflake.constants import (
|
||||||
LINEAGE_PERMISSION_ERROR,
|
LINEAGE_PERMISSION_ERROR,
|
||||||
SnowflakeEdition,
|
SnowflakeEdition,
|
||||||
@ -652,7 +652,9 @@ class SnowflakeLineageExtractor(
|
|||||||
# For now, populate only for S3
|
# For now, populate only for S3
|
||||||
if external_lineage_entry.startswith("s3://"):
|
if external_lineage_entry.startswith("s3://"):
|
||||||
external_upstream_table = UpstreamClass(
|
external_upstream_table = UpstreamClass(
|
||||||
dataset=make_s3_urn(external_lineage_entry, self.config.env),
|
dataset=make_s3_urn_for_lineage(
|
||||||
|
external_lineage_entry, self.config.env
|
||||||
|
),
|
||||||
type=DatasetLineageTypeClass.COPY,
|
type=DatasetLineageTypeClass.COPY,
|
||||||
)
|
)
|
||||||
external_upstreams.append(external_upstream_table)
|
external_upstreams.append(external_upstream_table)
|
||||||
|
@ -166,6 +166,14 @@ class UnityCatalogSourceConfig(
|
|||||||
description="Option to enable/disable lineage generation.",
|
description="Option to enable/disable lineage generation.",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
include_external_lineage: bool = pydantic.Field(
|
||||||
|
default=True,
|
||||||
|
description=(
|
||||||
|
"Option to enable/disable lineage generation for external tables."
|
||||||
|
" Only external S3 tables are supported at the moment."
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
include_notebooks: bool = pydantic.Field(
|
include_notebooks: bool = pydantic.Field(
|
||||||
default=False,
|
default=False,
|
||||||
description="Ingest notebooks, represented as DataHub datasets.",
|
description="Ingest notebooks, represented as DataHub datasets.",
|
||||||
|
@ -33,6 +33,7 @@ from datahub.ingestion.source.unity.proxy_types import (
|
|||||||
ALLOWED_STATEMENT_TYPES,
|
ALLOWED_STATEMENT_TYPES,
|
||||||
Catalog,
|
Catalog,
|
||||||
Column,
|
Column,
|
||||||
|
ExternalTableReference,
|
||||||
Metastore,
|
Metastore,
|
||||||
Notebook,
|
Notebook,
|
||||||
Query,
|
Query,
|
||||||
@ -248,6 +249,13 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
|
|||||||
)
|
)
|
||||||
if table_ref:
|
if table_ref:
|
||||||
table.upstreams[table_ref] = {}
|
table.upstreams[table_ref] = {}
|
||||||
|
elif "fileInfo" in item:
|
||||||
|
external_ref = ExternalTableReference.create_from_lineage(
|
||||||
|
item["fileInfo"]
|
||||||
|
)
|
||||||
|
if external_ref:
|
||||||
|
table.external_upstreams.add(external_ref)
|
||||||
|
|
||||||
for notebook in item.get("notebookInfos") or []:
|
for notebook in item.get("notebookInfos") or []:
|
||||||
table.upstream_notebooks.add(notebook["notebook_id"])
|
table.upstream_notebooks.add(notebook["notebook_id"])
|
||||||
|
|
||||||
|
@ -10,6 +10,7 @@ from databricks.sdk.service.catalog import (
|
|||||||
CatalogType,
|
CatalogType,
|
||||||
ColumnTypeName,
|
ColumnTypeName,
|
||||||
DataSourceFormat,
|
DataSourceFormat,
|
||||||
|
SecurableType,
|
||||||
TableType,
|
TableType,
|
||||||
)
|
)
|
||||||
from databricks.sdk.service.sql import QueryStatementType
|
from databricks.sdk.service.sql import QueryStatementType
|
||||||
@ -176,6 +177,35 @@ class TableReference:
|
|||||||
return f"{self.catalog}/{self.schema}/{self.table}"
|
return f"{self.catalog}/{self.schema}/{self.table}"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True, order=True)
|
||||||
|
class ExternalTableReference:
|
||||||
|
path: str
|
||||||
|
has_permission: bool
|
||||||
|
name: Optional[str]
|
||||||
|
type: Optional[SecurableType]
|
||||||
|
storage_location: Optional[str]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def create_from_lineage(cls, d: dict) -> Optional["ExternalTableReference"]:
|
||||||
|
try:
|
||||||
|
securable_type: Optional[SecurableType]
|
||||||
|
try:
|
||||||
|
securable_type = SecurableType(d.get("securable_type", "").lower())
|
||||||
|
except ValueError:
|
||||||
|
securable_type = None
|
||||||
|
|
||||||
|
return cls(
|
||||||
|
path=d["path"],
|
||||||
|
has_permission=d.get("has_permission") or True,
|
||||||
|
name=d.get("securable_name"),
|
||||||
|
type=securable_type,
|
||||||
|
storage_location=d.get("storage_location"),
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Failed to create ExternalTableReference from {d}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Table(CommonProperty):
|
class Table(CommonProperty):
|
||||||
schema: Schema
|
schema: Schema
|
||||||
@ -193,6 +223,7 @@ class Table(CommonProperty):
|
|||||||
view_definition: Optional[str]
|
view_definition: Optional[str]
|
||||||
properties: Dict[str, str]
|
properties: Dict[str, str]
|
||||||
upstreams: Dict[TableReference, Dict[str, List[str]]] = field(default_factory=dict)
|
upstreams: Dict[TableReference, Dict[str, List[str]]] = field(default_factory=dict)
|
||||||
|
external_upstreams: Set[ExternalTableReference] = field(default_factory=set)
|
||||||
upstream_notebooks: Set[NotebookId] = field(default_factory=set)
|
upstream_notebooks: Set[NotebookId] = field(default_factory=set)
|
||||||
downstream_notebooks: Set[NotebookId] = field(default_factory=set)
|
downstream_notebooks: Set[NotebookId] = field(default_factory=set)
|
||||||
|
|
||||||
|
@ -19,6 +19,8 @@ class UnityCatalogReport(IngestionStageReport, StaleEntityRemovalSourceReport):
|
|||||||
notebooks: EntityFilterReport = EntityFilterReport.field(type="notebook")
|
notebooks: EntityFilterReport = EntityFilterReport.field(type="notebook")
|
||||||
|
|
||||||
num_column_lineage_skipped_column_count: int = 0
|
num_column_lineage_skipped_column_count: int = 0
|
||||||
|
num_external_upstreams_lacking_permissions: int = 0
|
||||||
|
num_external_upstreams_unsupported: int = 0
|
||||||
|
|
||||||
num_queries: int = 0
|
num_queries: int = 0
|
||||||
num_queries_dropped_parse_failure: int = 0
|
num_queries_dropped_parse_failure: int = 0
|
||||||
|
@ -41,6 +41,7 @@ from datahub.ingestion.api.source import (
|
|||||||
TestConnectionReport,
|
TestConnectionReport,
|
||||||
)
|
)
|
||||||
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
||||||
|
from datahub.ingestion.source.aws.s3_util import make_s3_urn_for_lineage
|
||||||
from datahub.ingestion.source.common.subtypes import (
|
from datahub.ingestion.source.common.subtypes import (
|
||||||
DatasetContainerSubTypes,
|
DatasetContainerSubTypes,
|
||||||
DatasetSubTypes,
|
DatasetSubTypes,
|
||||||
@ -455,6 +456,28 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if self.config.include_external_lineage:
|
||||||
|
for external_ref in table.external_upstreams:
|
||||||
|
if not external_ref.has_permission or not external_ref.path:
|
||||||
|
self.report.num_external_upstreams_lacking_permissions += 1
|
||||||
|
logger.warning(
|
||||||
|
f"Lacking permissions for external file upstream on {table.ref}"
|
||||||
|
)
|
||||||
|
elif external_ref.path.startswith("s3://"):
|
||||||
|
upstreams.append(
|
||||||
|
UpstreamClass(
|
||||||
|
dataset=make_s3_urn_for_lineage(
|
||||||
|
external_ref.path, self.config.env
|
||||||
|
),
|
||||||
|
type=DatasetLineageTypeClass.COPY,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self.report.num_external_upstreams_unsupported += 1
|
||||||
|
logger.warning(
|
||||||
|
f"Unsupported external file upstream on {table.ref}: {external_ref.path}"
|
||||||
|
)
|
||||||
|
|
||||||
if upstreams:
|
if upstreams:
|
||||||
return UpstreamLineageClass(
|
return UpstreamLineageClass(
|
||||||
upstreams=upstreams,
|
upstreams=upstreams,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user