From d569ca17f11e0134f1fe3dfc70d478ce1fd3b62f Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Tue, 4 Jun 2024 22:08:22 +0200 Subject: [PATCH] feat(ingest/redshift): Adding way to filter s3 paths in Redshift Source (#10622) --- .../ingestion/source/redshift/config.py | 5 ++++ .../ingestion/source/redshift/lineage.py | 25 ++++++++++++++++--- .../ingestion/source/redshift/report.py | 1 + 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py index 74507d8500..2ff73323a1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py @@ -49,6 +49,11 @@ class S3LineageProviderConfig(ConfigModel): description="Strip filename from s3 url. It only applies if path_specs are not specified.", ) + ignore_non_path_spec_path: bool = Field( + default=False, + description="Ignore paths that are not match in path_specs. It only applies if path_specs are specified.", + ) + class S3DatasetLineageProviderConfigBase(ConfigModel): """ diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py index 6c6267e80e..87deab7228 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py @@ -264,13 +264,23 @@ class RedshiftLineageExtractor: # TODO: Remove this method. self.report.warning(key, reason) - def _get_s3_path(self, path: str) -> str: + def _get_s3_path(self, path: str) -> Optional[str]: if self.config.s3_lineage_config: for path_spec in self.config.s3_lineage_config.path_specs: if path_spec.allowed(path): _, table_path = path_spec.extract_table_name_and_path(path) return table_path + if ( + self.config.s3_lineage_config.ignore_non_path_spec_path + and len(self.config.s3_lineage_config.path_specs) > 0 + ): + self.report.num_lineage_dropped_s3_path += 1 + logger.debug( + f"Skipping s3 path {path} as it does not match any path spec." + ) + return None + if self.config.s3_lineage_config.strip_urls: if "/" in urlparse(path).path: return str(path.rsplit("/", 1)[0]) @@ -323,13 +333,14 @@ class RedshiftLineageExtractor: ), ) - def _build_s3_path_from_row(self, filename: str) -> str: + def _build_s3_path_from_row(self, filename: str) -> Optional[str]: path = filename.strip() if urlparse(path).scheme != "s3": raise ValueError( f"Only s3 source supported with copy/unload. The source was: {path}" ) - return strip_s3_prefix(self._get_s3_path(path)) + s3_path = self._get_s3_path(path) + return strip_s3_prefix(s3_path) if s3_path else None def _get_sources( self, @@ -369,7 +380,11 @@ class RedshiftLineageExtractor: ) self.report.num_lineage_dropped_not_support_copy_path += 1 return [], None - path = strip_s3_prefix(self._get_s3_path(path)) + s3_path = self._get_s3_path(path) + if s3_path is None: + return [], None + + path = strip_s3_prefix(s3_path) urn = make_dataset_urn_with_platform_instance( platform=platform.value, name=path, @@ -539,6 +554,8 @@ class RedshiftLineageExtractor: target_platform = LineageDatasetPlatform.S3 # Following call requires 'filename' key in lineage_row target_path = self._build_s3_path_from_row(lineage_row.filename) + if target_path is None: + return None urn = make_dataset_urn_with_platform_instance( platform=target_platform.value, name=target_path, diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/report.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/report.py index 2e6cb8051c..3012f4949b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/report.py @@ -44,6 +44,7 @@ class RedshiftReport( num_lineage_dropped_query_parser: int = 0 num_lineage_dropped_not_support_copy_path: int = 0 num_lineage_processed_temp_tables = 0 + num_lineage_dropped_s3_path: int = 0 lineage_start_time: Optional[datetime] = None lineage_end_time: Optional[datetime] = None