diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py index 323575754e..cf4e3a5b01 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py @@ -107,7 +107,17 @@ class RedshiftLineageExtractor: parser = LineageRunner(query) for table in parser.source_tables: - source_schema, source_table = str(table).split(".") + split = str(table).split(".") + if len(split) == 3: + db_name, source_schema, source_table = split + elif len(split) == 2: + source_schema, source_table = split + else: + raise ValueError( + f"Invalid table name {table} in query {query}. " + f"Expected format: [db_name].[schema].[table] or [schema].[table] or [table]." + ) + if source_schema == "": source_schema = str(self.config.default_schema) @@ -149,22 +159,19 @@ class RedshiftLineageExtractor: try: sources = self._get_sources_from_query(db_name=db_name, query=ddl) except Exception as e: - self.warn( - logger, - "parsing-query", - f"Error parsing query {ddl} for getting lineage ." - f"\nError was {e}.", + logger.warning( + f"Error parsing query {ddl} for getting lineage. Error was {e}." ) + self.report.num_lineage_dropped_query_parser += 1 else: if lineage_type == lineage_type.COPY and filename is not None: platform = LineageDatasetPlatform.S3 path = filename.strip() if urlparse(path).scheme != "s3": - self.warn( - logger, - "non-s3-lineage", - f"Only s3 source supported with copy. The source was: {path}.", + logger.warning( + "Only s3 source supported with copy. The source was: {path}." ) + self.report.num_lineage_dropped_not_support_copy_path += 1 return sources path = strip_s3_prefix(self._get_s3_path(path)) elif source_schema is not None and source_table is not None: @@ -316,11 +323,10 @@ class RedshiftLineageExtractor: or schema not in all_tables[db] or not any(table == t.name for t in all_tables[db][schema]) ): - self.warn( - logger, - "missing-table", - f"{source.path} missing table", + logger.debug( + f"{source.path} missing table, dropping from lineage.", ) + self.report.num_lineage_tables_dropped += 1 continue targe_source.append(source) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py index fd90866927..6fad9547f8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py @@ -4,7 +4,7 @@ from datetime import datetime from typing import Dict, Iterable, List, Optional, Union, cast from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance -from datahub.emitter.mcp_builder import wrap_aspect_as_workunit +from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.ge_data_profiler import GEProfilerRequest from datahub.ingestion.source.redshift.config import RedshiftConfig @@ -49,7 +49,6 @@ class RedshiftProfiler(GenericProfiler): Dict[str, Dict[str, List[RedshiftView]]], ], ) -> Iterable[MetadataWorkUnit]: - # Extra default SQLAlchemy option for better connection pooling and threading. # https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow if self.config.profiling.enabled: @@ -63,7 +62,6 @@ class RedshiftProfiler(GenericProfiler): if not self.config.schema_pattern.allowed(schema): continue for table in tables[db].get(schema, {}): - # Emit the profile work unit profile_request = self.get_redshift_profile_request( table, schema, db @@ -100,12 +98,9 @@ class RedshiftProfiler(GenericProfiler): dataset_urn, int(datetime.now().timestamp() * 1000) ) - yield wrap_aspect_as_workunit( - "dataset", - dataset_urn, - "datasetProfile", - profile, - ) + yield MetadataChangeProposalWrapper( + entityUrn=dataset_urn, aspect=profile + ).as_workunit() def get_redshift_profile_request( self, diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 653990bc70..8e136bfd0e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -339,7 +339,7 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): ) -> redshift_connector.Connection: client_options = config.extra_client_options host, port = config.host_port.split(":") - return redshift_connector.connect( + conn = redshift_connector.connect( host=host, port=int(port), user=config.username, @@ -348,6 +348,10 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): **client_options, ) + conn.autocommit = True + + return conn + def get_workunits(self) -> Iterable[MetadataWorkUnit]: return auto_stale_entity_removal( self.stale_entity_removal_handler, @@ -494,7 +498,6 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): logger.info("process views") if schema.name in self.db_views[schema.database]: for view in self.db_views[schema.database][schema.name]: - logger.info(f"View: {view}") view.columns = schema_columns[schema.name].get(view.name, []) yield from self._process_view( table=view, database=database, schema=schema diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/report.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/report.py index 1c372a22f5..3c50a51d2c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/report.py @@ -24,6 +24,9 @@ class RedshiftReport(ProfilingSqlReport): views_in_mem_size: Dict[str, str] = field(default_factory=TopKDict) num_operational_stats_skipped: int = 0 num_usage_stat_skipped: int = 0 + num_lineage_tables_dropped: int = 0 + num_lineage_dropped_query_parser: int = 0 + num_lineage_dropped_not_support_copy_path: int = 0 def report_dropped(self, key: str) -> None: self.filtered.append(key) diff --git a/metadata-ingestion/tests/unit/test_redshift_lineage.py b/metadata-ingestion/tests/unit/test_redshift_lineage.py new file mode 100644 index 0000000000..c7d6ac18e0 --- /dev/null +++ b/metadata-ingestion/tests/unit/test_redshift_lineage.py @@ -0,0 +1,104 @@ +from datahub.ingestion.source.redshift.config import RedshiftConfig +from datahub.ingestion.source.redshift.lineage import RedshiftLineageExtractor +from datahub.ingestion.source.redshift.report import RedshiftReport + + +def test_get_sources_from_query(): + config = RedshiftConfig(host_port="localhost:5439", database="test") + report = RedshiftReport() + + test_query = """ + select * from my_schema.my_table + """ + lineage_extractor = RedshiftLineageExtractor(config, report) + lineage_datasets = lineage_extractor._get_sources_from_query( + db_name="test", query=test_query + ) + assert len(lineage_datasets) == 1 + + lineage = lineage_datasets[0] + assert lineage.path == "test.my_schema.my_table" + + +def test_get_sources_from_query_with_only_table_name(): + config = RedshiftConfig(host_port="localhost:5439", database="test") + report = RedshiftReport() + + test_query = """ + select * from my_table + """ + lineage_extractor = RedshiftLineageExtractor(config, report) + lineage_datasets = lineage_extractor._get_sources_from_query( + db_name="test", query=test_query + ) + assert len(lineage_datasets) == 1 + + lineage = lineage_datasets[0] + assert lineage.path == "test.public.my_table" + + +def test_get_sources_from_query_with_database(): + config = RedshiftConfig(host_port="localhost:5439", database="test") + report = RedshiftReport() + + test_query = """ + select * from test.my_schema.my_table + """ + lineage_extractor = RedshiftLineageExtractor(config, report) + lineage_datasets = lineage_extractor._get_sources_from_query( + db_name="test", query=test_query + ) + assert len(lineage_datasets) == 1 + + lineage = lineage_datasets[0] + assert lineage.path == "test.my_schema.my_table" + + +def test_get_sources_from_query_with_non_default_database(): + config = RedshiftConfig(host_port="localhost:5439", database="test") + report = RedshiftReport() + + test_query = """ + select * from test2.my_schema.my_table + """ + lineage_extractor = RedshiftLineageExtractor(config, report) + lineage_datasets = lineage_extractor._get_sources_from_query( + db_name="test", query=test_query + ) + assert len(lineage_datasets) == 1 + + lineage = lineage_datasets[0] + assert lineage.path == "test2.my_schema.my_table" + + +def test_get_sources_from_query_with_only_table(): + config = RedshiftConfig(host_port="localhost:5439", database="test") + report = RedshiftReport() + + test_query = """ + select * from my_table + """ + lineage_extractor = RedshiftLineageExtractor(config, report) + lineage_datasets = lineage_extractor._get_sources_from_query( + db_name="test", query=test_query + ) + assert len(lineage_datasets) == 1 + + lineage = lineage_datasets[0] + assert lineage.path == "test.public.my_table" + + +def test_get_sources_from_query_with_four_part_table_should_throw_exception(): + config = RedshiftConfig(host_port="localhost:5439", database="test") + report = RedshiftReport() + + test_query = """ + select * from database.schema.my_table.test + """ + lineage_extractor = RedshiftLineageExtractor(config, report) + try: + lineage_extractor._get_sources_from_query(db_name="test", query=test_query) + except ValueError: + pass + + assert f"{test_query} should have thrown a ValueError exception but it didn't"