mirror of
https://github.com/datahub-project/datahub.git
synced 2025-10-27 08:54:32 +00:00
fix(ingest/redshift): fix unload lineage in lineage_v2 (#11620)
Co-authored-by: Tamas Nemeth <treff7es@gmail.com>
This commit is contained in:
parent
e706d1511b
commit
a67981beda
@ -57,6 +57,7 @@ class RedshiftSqlLineageV2(Closeable):
|
||||
self.context = context
|
||||
|
||||
self.database = database
|
||||
self.known_urns: Set[str] = set() # will be set later
|
||||
|
||||
self.aggregator = SqlParsingAggregator(
|
||||
platform=self.platform,
|
||||
@ -68,6 +69,7 @@ class RedshiftSqlLineageV2(Closeable):
|
||||
generate_operations=False,
|
||||
usage_config=self.config,
|
||||
graph=self.context.graph,
|
||||
is_temp_table=self._is_temp_table,
|
||||
)
|
||||
self.report.sql_aggregator = self.aggregator.report
|
||||
|
||||
@ -87,7 +89,16 @@ class RedshiftSqlLineageV2(Closeable):
|
||||
self.report.lineage_end_time,
|
||||
) = self._lineage_v1.get_time_window()
|
||||
|
||||
self.known_urns: Set[str] = set() # will be set later
|
||||
def _is_temp_table(self, name: str) -> bool:
|
||||
return (
|
||||
DatasetUrn.create_from_ids(
|
||||
self.platform,
|
||||
name,
|
||||
env=self.config.env,
|
||||
platform_instance=self.config.platform_instance,
|
||||
).urn()
|
||||
not in self.known_urns
|
||||
)
|
||||
|
||||
def build(
|
||||
self,
|
||||
@ -107,15 +118,6 @@ class RedshiftSqlLineageV2(Closeable):
|
||||
for schema, tables in schemas.items()
|
||||
for table in tables
|
||||
}
|
||||
self.aggregator._is_temp_table = (
|
||||
lambda name: DatasetUrn.create_from_ids(
|
||||
self.platform,
|
||||
name,
|
||||
env=self.config.env,
|
||||
platform_instance=self.config.platform_instance,
|
||||
).urn()
|
||||
not in self.known_urns
|
||||
)
|
||||
|
||||
# Handle all the temp tables up front.
|
||||
if self.config.resolve_temp_table_in_lineage:
|
||||
|
||||
@ -537,8 +537,13 @@ class SqlParsingAggregator(Closeable):
|
||||
return query
|
||||
|
||||
@functools.lru_cache(maxsize=128)
|
||||
def _name_from_urn(self, urn: UrnStr) -> str:
|
||||
name = DatasetUrn.from_string(urn).name
|
||||
def _name_from_urn(self, urn: UrnStr) -> Optional[str]:
|
||||
urn_obj = DatasetUrn.from_string(urn)
|
||||
if urn_obj.platform != self.platform.urn():
|
||||
# If this is external (e.g. s3), we don't know the name.
|
||||
return None
|
||||
|
||||
name = urn_obj.name
|
||||
if (
|
||||
platform_instance := self._schema_resolver.platform_instance
|
||||
) and name.startswith(platform_instance):
|
||||
@ -549,14 +554,22 @@ class SqlParsingAggregator(Closeable):
|
||||
def is_temp_table(self, urn: UrnStr) -> bool:
|
||||
if self._is_temp_table is None:
|
||||
return False
|
||||
return self._is_temp_table(self._name_from_urn(urn))
|
||||
name = self._name_from_urn(urn)
|
||||
if name is None:
|
||||
# External tables are not temp tables.
|
||||
return False
|
||||
return self._is_temp_table(name)
|
||||
|
||||
def is_allowed_table(self, urn: UrnStr) -> bool:
|
||||
def is_allowed_table(self, urn: UrnStr, allow_external: bool = True) -> bool:
|
||||
if self.is_temp_table(urn):
|
||||
return False
|
||||
if self._is_allowed_table is None:
|
||||
return True
|
||||
return self._is_allowed_table(self._name_from_urn(urn))
|
||||
name = self._name_from_urn(urn)
|
||||
if name is None:
|
||||
# Treat external tables specially.
|
||||
return allow_external
|
||||
return self._is_allowed_table(name)
|
||||
|
||||
def add(
|
||||
self,
|
||||
@ -852,7 +865,7 @@ class SqlParsingAggregator(Closeable):
|
||||
upstream_fields = parsed.column_usage or {}
|
||||
for upstream_urn in parsed.upstreams:
|
||||
# If the upstream table is a temp table or otherwise denied by filters, don't log usage for it.
|
||||
if not self.is_allowed_table(upstream_urn) or (
|
||||
if not self.is_allowed_table(upstream_urn, allow_external=False) or (
|
||||
require_out_table_schema
|
||||
and not self._schema_resolver.has_urn(upstream_urn)
|
||||
):
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user