feat(ingest/redshift): Adding way to filter s3 paths in Redshift Source (#10622)

This commit is contained in:
Tamas Nemeth 2024-06-04 22:08:22 +02:00 committed by GitHub
parent c04b3bc2e4
commit d569ca17f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 27 additions and 4 deletions

View File

@ -49,6 +49,11 @@ class S3LineageProviderConfig(ConfigModel):
description="Strip filename from s3 url. It only applies if path_specs are not specified.",
)
ignore_non_path_spec_path: bool = Field(
default=False,
description="Ignore paths that are not match in path_specs. It only applies if path_specs are specified.",
)
class S3DatasetLineageProviderConfigBase(ConfigModel):
"""

View File

@ -264,13 +264,23 @@ class RedshiftLineageExtractor:
# TODO: Remove this method.
self.report.warning(key, reason)
def _get_s3_path(self, path: str) -> str:
def _get_s3_path(self, path: str) -> Optional[str]:
if self.config.s3_lineage_config:
for path_spec in self.config.s3_lineage_config.path_specs:
if path_spec.allowed(path):
_, table_path = path_spec.extract_table_name_and_path(path)
return table_path
if (
self.config.s3_lineage_config.ignore_non_path_spec_path
and len(self.config.s3_lineage_config.path_specs) > 0
):
self.report.num_lineage_dropped_s3_path += 1
logger.debug(
f"Skipping s3 path {path} as it does not match any path spec."
)
return None
if self.config.s3_lineage_config.strip_urls:
if "/" in urlparse(path).path:
return str(path.rsplit("/", 1)[0])
@ -323,13 +333,14 @@ class RedshiftLineageExtractor:
),
)
def _build_s3_path_from_row(self, filename: str) -> str:
def _build_s3_path_from_row(self, filename: str) -> Optional[str]:
path = filename.strip()
if urlparse(path).scheme != "s3":
raise ValueError(
f"Only s3 source supported with copy/unload. The source was: {path}"
)
return strip_s3_prefix(self._get_s3_path(path))
s3_path = self._get_s3_path(path)
return strip_s3_prefix(s3_path) if s3_path else None
def _get_sources(
self,
@ -369,7 +380,11 @@ class RedshiftLineageExtractor:
)
self.report.num_lineage_dropped_not_support_copy_path += 1
return [], None
path = strip_s3_prefix(self._get_s3_path(path))
s3_path = self._get_s3_path(path)
if s3_path is None:
return [], None
path = strip_s3_prefix(s3_path)
urn = make_dataset_urn_with_platform_instance(
platform=platform.value,
name=path,
@ -539,6 +554,8 @@ class RedshiftLineageExtractor:
target_platform = LineageDatasetPlatform.S3
# Following call requires 'filename' key in lineage_row
target_path = self._build_s3_path_from_row(lineage_row.filename)
if target_path is None:
return None
urn = make_dataset_urn_with_platform_instance(
platform=target_platform.value,
name=target_path,

View File

@ -44,6 +44,7 @@ class RedshiftReport(
num_lineage_dropped_query_parser: int = 0
num_lineage_dropped_not_support_copy_path: int = 0
num_lineage_processed_temp_tables = 0
num_lineage_dropped_s3_path: int = 0
lineage_start_time: Optional[datetime] = None
lineage_end_time: Optional[datetime] = None