diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py index 3b9b069902..beeb453cda 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py @@ -814,8 +814,8 @@ class RedshiftLineageExtractor: tablename = table.name if ( - table.is_external_table - and schema.is_external_schema + table.is_external_table() + and schema.is_external_schema() and schema.external_platform ): # external_db_params = schema.option diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py index 1f43d0c3bf..a9b0408e89 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py @@ -403,8 +403,8 @@ class RedshiftSqlLineageV2(Closeable): for table in tables: schema = db_schemas[self.database][schema_name] if ( - table.is_external_table - and schema.is_external_schema + table.is_external_table() + and schema.is_external_schema() and schema.external_platform ): # external_db_params = schema.option @@ -416,14 +416,26 @@ class RedshiftSqlLineageV2(Closeable): platform_instance=self.config.platform_instance, env=self.config.env, ) - upstream_urn = mce_builder.make_dataset_urn_with_platform_instance( - upstream_platform, - f"{schema.external_database}.{table.name}", - platform_instance=( + if upstream_platform == self.platform: + upstream_schema = schema.get_upstream_schema_name() or "public" + upstream_dataset_name = ( + f"{schema.external_database}.{upstream_schema}.{table.name}" + ) + upstream_platform_instance = self.config.platform_instance + else: + upstream_dataset_name = ( + f"{schema.external_database}.{table.name}" + ) + upstream_platform_instance = ( self.config.platform_instance_map.get(upstream_platform) if self.config.platform_instance_map else None - ), + ) + + upstream_urn = mce_builder.make_dataset_urn_with_platform_instance( + upstream_platform, + upstream_dataset_name, + platform_instance=upstream_platform_instance, env=self.config.env, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py index 4ef7906267..20fb2f2983 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py @@ -48,7 +48,7 @@ class RedshiftProfiler(GenericProfiler): if not self.config.schema_pattern.allowed(schema): continue for table in tables[db].get(schema, {}): - if table.is_external_table: + if table.is_external_table() or self.report.is_shared_database: if not self.config.profiling.profile_external_tables: # Case 1: If user did not tell us to profile external tables, simply log this. self.report.profiling_skipped_other[schema] += 1 diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py index 0c101dbb5e..d767ed479d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py @@ -83,7 +83,9 @@ class RedshiftCommonQuery: # NOTE: Tables from shared database are not available in pg_catalog.pg_class @staticmethod def list_tables( - skip_external_tables: bool = False, is_shared_database: bool = False + database: str, + skip_external_tables: bool = False, + is_shared_database: bool = False, ) -> str: # NOTE: it looks like description is available only in pg_description # So this remains preferrred way @@ -123,7 +125,7 @@ class RedshiftCommonQuery: AND n.nspname != 'information_schema' """ - external_tables_query = """ + external_tables_query = f""" SELECT 'EXTERNAL_TABLE' as tabletype, NULL AS "schema_oid", schemaname AS "schema", @@ -142,10 +144,11 @@ class RedshiftCommonQuery: serde_parameters, NULL as table_description FROM pg_catalog.svv_external_tables + WHERE redshift_database_name='{database}' ORDER BY "schema", "relname" """ - shared_database_tables_query = """ + shared_database_tables_query = f""" SELECT table_type as tabletype, NULL AS "schema_oid", schema_name AS "schema", @@ -164,6 +167,7 @@ class RedshiftCommonQuery: NULL as serde_parameters, NULL as table_description FROM svv_redshift_tables + WHERE database_name='{database}' ORDER BY "schema", "relname" """ @@ -175,9 +179,11 @@ class RedshiftCommonQuery: return f"{tables_query} UNION {external_tables_query}" @staticmethod - def list_columns(is_shared_database: bool = False) -> str: + def list_columns( + database_name: str, schema_name: str, is_shared_database: bool = False + ) -> str: if is_shared_database: - return """ + return f""" SELECT schema_name as "schema", table_name as "table_name", @@ -198,9 +204,10 @@ class RedshiftCommonQuery: null as "table_oid" FROM SVV_REDSHIFT_COLUMNS WHERE 1 and schema = '{schema_name}' + AND database_name = '{database_name}' ORDER BY "schema", "table_name", "attnum" """ - return """ + return f""" SELECT n.nspname as "schema", c.relname as "table_name", @@ -275,6 +282,7 @@ class RedshiftCommonQuery: null as "table_oid" FROM SVV_EXTERNAL_COLUMNS WHERE 1 and schema = '{schema_name}' + AND redshift_database_name = '{database_name}' ORDER BY "schema", "table_name", "attnum" """ diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 5a35700188..28840b2e2c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -366,7 +366,7 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): self.db = self.data_dictionary.get_database_details(connection, database) self.report.is_shared_database = ( - self.db is not None and self.db.is_shared_database + self.db is not None and self.db.is_shared_database() ) with self.report.new_stage(METADATA_EXTRACTION): self.db_tables[database] = defaultdict() @@ -508,6 +508,7 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): schema_columns: Dict[str, Dict[str, List[RedshiftColumn]]] = {} schema_columns[schema.name] = self.data_dictionary.get_columns_for_schema( conn=connection, + database=database, schema=schema, is_shared_database=self.report.is_shared_database, ) @@ -829,9 +830,12 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): domain_config=self.config.domain, ) - def cache_tables_and_views(self, connection, database): + def cache_tables_and_views( + self, connection: redshift_connector.Connection, database: str + ) -> None: tables, views = self.data_dictionary.get_tables_and_views( conn=connection, + database=database, skip_external_tables=self.config.skip_external_tables, is_shared_database=self.report.is_shared_database, ) @@ -982,7 +986,7 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): self.datashares_helper.to_platform_resource(list(outbound_shares)) ) - if self.db and self.db.is_shared_database: + if self.db and self.db.is_shared_database(): inbound_share = self.db.get_inbound_share() if inbound_share is None: self.report.warning( @@ -996,8 +1000,8 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): ): lineage_extractor.aggregator.add(known_lineage) - # TODO: distinguish between definition level lineage and audit log based lineage - # definition level lineage should never be skipped + # TODO: distinguish between definition level lineage and audit log based lineage. + # Definition level lineage should never be skipped if not self._should_ingest_lineage(): return diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py index 73456d445d..99bb4db030 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py @@ -42,7 +42,6 @@ class RedshiftTable(BaseTable): serde_parameters: Optional[str] = None last_altered: Optional[datetime] = None - @property def is_external_table(self) -> bool: return self.type == "EXTERNAL_TABLE" @@ -56,7 +55,6 @@ class RedshiftView(BaseTable): size_in_bytes: Optional[int] = None rows_count: Optional[int] = None - @property def is_external_table(self) -> bool: return self.type == "EXTERNAL_TABLE" @@ -71,10 +69,28 @@ class RedshiftSchema: external_platform: Optional[str] = None external_database: Optional[str] = None - @property def is_external_schema(self) -> bool: return self.type == "external" + def get_upstream_schema_name(self) -> Optional[str]: + """Gets the schema name from the external schema option. + + Returns: + Optional[str]: The schema name from the external schema option + if this is an external schema and has a valid option format, None otherwise. + """ + + if not self.is_external_schema() or not self.option: + return None + + # For external schema on redshift, option is in form + # {"SCHEMA":"tickit"} + schema_match = re.search(r'"SCHEMA"\s*:\s*"([^"]*)"', self.option) + if not schema_match: + return None + else: + return schema_match.group(1) + @dataclass class PartialInboundDatashare: @@ -117,7 +133,6 @@ class RedshiftDatabase: type: str options: Optional[str] = None - @property def is_shared_database(self) -> bool: return self.type == "shared" @@ -128,7 +143,7 @@ class RedshiftDatabase: def get_inbound_share( self, ) -> Optional[Union[InboundDatashare, PartialInboundDatashare]]: - if not self.is_shared_database or not self.options: + if not self.is_shared_database() or not self.options: return None # Convert into single regex ?? @@ -323,6 +338,7 @@ class RedshiftDataDictionary: def get_tables_and_views( self, conn: redshift_connector.Connection, + database: str, skip_external_tables: bool = False, is_shared_database: bool = False, ) -> Tuple[Dict[str, List[RedshiftTable]], Dict[str, List[RedshiftView]]]: @@ -336,6 +352,7 @@ class RedshiftDataDictionary: cur = RedshiftDataDictionary.get_query_result( conn, RedshiftCommonQuery.list_tables( + database=database, skip_external_tables=skip_external_tables, is_shared_database=is_shared_database, ), @@ -484,14 +501,17 @@ class RedshiftDataDictionary: @staticmethod def get_columns_for_schema( conn: redshift_connector.Connection, + database: str, schema: RedshiftSchema, is_shared_database: bool = False, ) -> Dict[str, List[RedshiftColumn]]: cursor = RedshiftDataDictionary.get_query_result( conn, RedshiftCommonQuery.list_columns( - is_shared_database=is_shared_database - ).format(schema_name=schema.name), + database_name=database, + schema_name=schema.name, + is_shared_database=is_shared_database, + ), ) table_columns: Dict[str, List[RedshiftColumn]] = {} diff --git a/metadata-ingestion/tests/unit/redshift/test_redshift_datashares.py b/metadata-ingestion/tests/unit/redshift/test_redshift_datashares.py index 3167341f4a..7e11a8fa00 100644 --- a/metadata-ingestion/tests/unit/redshift/test_redshift_datashares.py +++ b/metadata-ingestion/tests/unit/redshift/test_redshift_datashares.py @@ -16,7 +16,10 @@ from datahub.ingestion.source.redshift.datashares import ( RedshiftTable, RedshiftView, ) -from datahub.ingestion.source.redshift.redshift_schema import PartialInboundDatashare +from datahub.ingestion.source.redshift.redshift_schema import ( + PartialInboundDatashare, + RedshiftDatabase, +) from datahub.ingestion.source.redshift.report import RedshiftReport from datahub.metadata.schema_classes import ( PlatformResourceInfoClass, @@ -484,3 +487,47 @@ class TestDatasharesHelper: list(report.warnings)[0].title == "Downstream lineage to outbound datashare may not work" ) + + def test_database_get_inbound_datashare_success(self): + db = RedshiftDatabase( + name="db", + type="shared", + options='{"datashare_name":"xxx","datashare_producer_account":"1234","datashare_producer_namespace":"yyy"}', + ) + + assert db.get_inbound_share() == InboundDatashare( + share_name="xxx", + producer_namespace="yyy", + consumer_database="db", + ) + + def test_database_get_partial_inbound_datashare_success(self): + db = RedshiftDatabase( + name="db", + type="shared", + options='{"datashare_name":"xxx","datashare_producer_account":"1234","datashare_producer_namespace":"yy', + ) + + assert db.get_inbound_share() == PartialInboundDatashare( + share_name="xxx", + producer_namespace_prefix="yy", + consumer_database="db", + ) + + def test_database_no_inbound_datashare(self): + db = RedshiftDatabase( + name="db", + type="local", + options=None, + ) + + assert db.get_inbound_share() is None + + def test_shared_database_no_inbound_datashare(self): + db = RedshiftDatabase( + name="db", + type="shared", + options=None, + ) + + assert db.get_inbound_share() is None diff --git a/metadata-ingestion/tests/unit/redshift/test_redshift_lineage.py b/metadata-ingestion/tests/unit/redshift/test_redshift_lineage.py index 27045dfc65..df46d64ef6 100644 --- a/metadata-ingestion/tests/unit/redshift/test_redshift_lineage.py +++ b/metadata-ingestion/tests/unit/redshift/test_redshift_lineage.py @@ -15,7 +15,10 @@ from datahub.ingestion.source.redshift.lineage import ( RedshiftLineageExtractor, parse_alter_table_rename, ) -from datahub.ingestion.source.redshift.redshift_schema import TempTableRow +from datahub.ingestion.source.redshift.redshift_schema import ( + RedshiftSchema, + TempTableRow, +) from datahub.ingestion.source.redshift.report import RedshiftReport from datahub.metadata.schema_classes import NumberTypeClass, SchemaFieldDataTypeClass from datahub.sql_parsing.schema_resolver import SchemaResolver @@ -794,3 +797,39 @@ def test_collapse_temp_recursive_cll_lineage_with_circular_reference(): assert len(datasets) == 1 # Here we only interested if it fails or not + + +def test_external_schema_get_upstream_schema_success(): + schema = RedshiftSchema( + name="schema", + database="XXXXXXXX", + type="external", + option='{"SCHEMA":"sales_schema"}', + external_platform="redshift", + ) + + assert schema.get_upstream_schema_name() == "sales_schema" + + +def test_external_schema_no_upstream_schema(): + schema = RedshiftSchema( + name="schema", + database="XXXXXXXX", + type="external", + option=None, + external_platform="redshift", + ) + + assert schema.get_upstream_schema_name() is None + + +def test_local_schema_no_upstream_schema(): + schema = RedshiftSchema( + name="schema", + database="XXXXXXXX", + type="local", + option='{"some_other_option":"x"}', + external_platform=None, + ) + + assert schema.get_upstream_schema_name() is None