feat(redshift): Fetch lineage from unload queries (#7041)

Co-authored-by: Tamas Nemeth <treff7es@gmail.com>
This commit is contained in:
Mert Tunç 2023-01-24 14:03:17 +03:00 committed by GitHub
parent a6a597cc81
commit 6f5cd3f4a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -156,6 +156,10 @@ class RedshiftConfig(
default=True,
description="Whether lineage should be collected from copy commands",
)
include_unload_lineage: Optional[bool] = Field(
default=True,
description="Whether lineage should be collected from unload commands",
)
capture_lineage_query_parser_failures: Optional[bool] = Field(
default=False,
description="Whether to capture lineage query parser errors with dataset properties for debuggings",
@ -706,6 +710,14 @@ class RedshiftSource(SQLAlchemySource):
return table_path
return path
def _build_s3_path_from_row(self, db_row):
path = db_row["filename"].strip()
if urlparse(path).scheme != "s3":
raise ValueError(
f"Only s3 source supported with copy/unload. The source was: {path}"
)
return strip_s3_prefix(self._get_s3_path(path))
def _populate_lineage_map(
self, query: str, lineage_type: LineageCollectorType
) -> None:
@ -733,30 +745,40 @@ class RedshiftSource(SQLAlchemySource):
try:
for db_row in engine.execute(query):
if not self.config.schema_pattern.allowed(
db_row["target_schema"]
) or not self.config.table_pattern.allowed(db_row["target_table"]):
continue
if lineage_type != LineageCollectorType.UNLOAD:
if not self.config.schema_pattern.allowed(
db_row["target_schema"]
) or not self.config.table_pattern.allowed(db_row["target_table"]):
continue
# Target
target_path = (
f'{db_name}.{db_row["target_schema"]}.{db_row["target_table"]}'
)
if lineage_type == LineageCollectorType.UNLOAD:
try:
target_platform = LineageDatasetPlatform.S3
# Following call requires 'filename' key in db_row
target_path = self._build_s3_path_from_row(db_row)
except ValueError as e:
self.warn(logger, "non-s3-lineage", str(e))
continue
else:
target_platform = LineageDatasetPlatform.REDSHIFT
target_path = (
f'{db_name}.{db_row["target_schema"]}.{db_row["target_table"]}'
)
target = LineageItem(
dataset=LineageDataset(
platform=LineageDatasetPlatform.REDSHIFT, path=target_path
),
dataset=LineageDataset(platform=target_platform, path=target_path),
upstreams=set(),
collector_type=lineage_type,
query_parser_failed_sqls=list(),
)
sources: List[LineageDataset] = list()
# Source
if lineage_type in [
sources: List[LineageDataset] = list()
if lineage_type in {
lineage_type.QUERY_SQL_PARSER,
lineage_type.NON_BINDING_VIEW,
]:
}:
try:
sources = self._get_sources_from_query(
db_name=db_name, query=db_row["ddl"]
@ -771,16 +793,13 @@ class RedshiftSource(SQLAlchemySource):
)
else:
if lineage_type == lineage_type.COPY:
platform = LineageDatasetPlatform.S3
path = db_row["filename"].strip()
if urlparse(path).scheme != "s3":
self.warn(
logger,
"non-s3-lineage",
f"Only s3 source supported with copy. The source was: {path}.",
)
try:
platform = LineageDatasetPlatform.S3
# Following call requires 'filename' key in db_row
path = self._build_s3_path_from_row(db_row)
except ValueError as e:
self.warn(logger, "non-s3-lineage", str(e))
continue
path = strip_s3_prefix(self._get_s3_path(path))
else:
platform = LineageDatasetPlatform.REDSHIFT
path = f'{db_name}.{db_row["source_schema"]}.{db_row["source_table"]}'
@ -1014,6 +1033,34 @@ class RedshiftSource(SQLAlchemySource):
end_time=self.config.end_time.strftime(redshift_datetime_format),
)
list_unload_commands_sql = """
select
distinct
sti.database as cluster,
sti.schema as source_schema,
sti."table" as source_table,
unl.path as filename
from
stl_unload_log unl
join stl_scan sc on
sc.query = unl.query and
sc.starttime >= '{start_time}' and
sc.endtime < '{end_time}'
join SVV_TABLE_INFO sti on
sti.table_id = sc.tbl
where
unl.start_time >= '{start_time}' and
unl.end_time < '{end_time}' and
sti.database = '{db_name}'
and sc.type in (1, 2, 3)
order by cluster, source_schema, source_table, filename, unl.start_time asc
""".format(
# We need the original database name for filtering
db_name=self.config.database,
start_time=self.config.start_time.strftime(redshift_datetime_format),
end_time=self.config.end_time.strftime(redshift_datetime_format),
)
if not self._lineage_map:
self._lineage_map = defaultdict()
@ -1056,6 +1103,10 @@ class RedshiftSource(SQLAlchemySource):
self._populate_lineage_map(
query=list_copy_commands_sql, lineage_type=LineageCollectorType.COPY
)
if self.config.include_unload_lineage:
self._populate_lineage_map(
query=list_unload_commands_sql, lineage_type=LineageCollectorType.UNLOAD
)
def get_lineage_mcp(
self, dataset_urn: str